Go语言快速入门Go语言快速入门
首页
基础篇
进阶篇
高阶篇
实战篇
Go官方网站
编程指南
首页
基础篇
进阶篇
高阶篇
实战篇
Go官方网站
编程指南
  • 进阶篇

    • 🚀 进阶篇
    • 方法
    • 接口
    • 错误处理
    • Goroutine
    • Channel
    • 包管理
    • 单元测试

Goroutine

Goroutine是Go的杀手锉!超轻量级的并发,比线程轻1000倍!我第一次接触 goroutine 的时候真的被惊艳到了。以前写 Java 多线程代码,各种线程池、同步、锁,复杂得不行。Go 只需要一个 go 关键字,就能轻松启动成千上万的并发任务!

什么是Goroutine?

Goroutine是Go实现的轻量级线程,由Go运行时管理,不是操作系统线程。

// 启动一个goroutine,就是在函数调用前加 go
go doSomething()

go func() {
    fmt.Println("我在goroutine中运行")
}()

Goroutine vs 线程

特性系统线程Goroutine
内存占用~1MB~2KB
创建速度慢快
切换开销大(内核态)小(用户态)
数量限制数千个数十万个
管理操作系统Go运行时

Java程序员注意

Java的线程是1:1映射到系统线程的,创建开销大。 Go的goroutine是M:N模型,多个goroutine复用少量系统线程。

// Java - 创建线程
new Thread(() -> {
    System.out.println("Hello");
}).start();
// Go - 创建goroutine(轻量100倍!)
go func() {
    fmt.Println("Hello")
}()

启动Goroutine

package main

import (
    "fmt"
    "time"
)

func sayHello() {
    fmt.Println("Hello from goroutine!")
}

func main() {
    // 启动goroutine
    go sayHello()
    
    // 匿名函数
    go func() {
        fmt.Println("Hello from anonymous goroutine!")
    }()
    
    // 带参数
    go func(name string) {
        fmt.Println("Hello,", name)
    }("张三")
    
    // 主goroutine等待一下,否则程序会立即退出
    time.Sleep(time.Second)
}

注意

main函数结束,所有goroutine都会被终止!要确保goroutine有机会执行完。

并发 vs 并行

  • 并发(Concurrency):同时处理多件事的能力(结构上)
  • 并行(Parallelism):同时执行多件事(物理上)
// 并发:多个goroutine交替执行
// 并行:多个goroutine在多核上同时执行

// Go运行时会自动利用多核
runtime.GOMAXPROCS(runtime.NumCPU())  // 默认已经是这样

等待Goroutine完成

使用 sync.WaitGroup

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    
    for i := 1; i <= 5; i++ {
        wg.Add(1)  // 增加计数器
        
        go func(id int) {
            defer wg.Done()  // 完成时减少计数器
            fmt.Printf("Worker %d 开始工作\n", id)
            // 模拟工作
            fmt.Printf("Worker %d 完成工作\n", id)
        }(i)
    }
    
    wg.Wait()  // 等待所有goroutine完成
    fmt.Println("所有工作完成!")
}

WaitGroup 使用要点

var wg sync.WaitGroup

// ✅ 正确:在goroutine外Add
wg.Add(1)
go func() {
    defer wg.Done()
}()

// ❌ 错误:在goroutine内Add(可能还没Add就Wait了)
go func() {
    wg.Add(1)
    defer wg.Done()
}()

// ✅ 批量Add
wg.Add(5)
for i := 0; i < 5; i++ {
    go worker(&wg)
}

Goroutine和闭包

常见错误:循环变量

// ❌ 错误:所有goroutine共享同一个变量
for i := 0; i < 5; i++ {
    go func() {
        fmt.Println(i)  // 可能都打印5
    }()
}

// ✅ 正确:传参
for i := 0; i < 5; i++ {
    go func(n int) {
        fmt.Println(n)  // 0, 1, 2, 3, 4(顺序不定)
    }(i)
}

// ✅ 正确:创建新变量
for i := 0; i < 5; i++ {
    i := i  // 创建新变量
    go func() {
        fmt.Println(i)
    }()
}

并发安全

竞态条件

package main

import (
    "fmt"
    "sync"
)

func main() {
    counter := 0
    var wg sync.WaitGroup
    
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter++  // ❌ 竞态条件!
        }()
    }
    
    wg.Wait()
    fmt.Println("Counter:", counter)  // 可能不是1000!
}

使用 sync.Mutex

package main

import (
    "fmt"
    "sync"
)

func main() {
    counter := 0
    var mu sync.Mutex
    var wg sync.WaitGroup
    
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            mu.Lock()    // 加锁
            counter++
            mu.Unlock()  // 解锁
        }()
    }
    
    wg.Wait()
    fmt.Println("Counter:", counter)  // 一定是1000
}

使用 sync.RWMutex

读写锁:读操作可以并发,写操作互斥

type SafeMap struct {
    mu   sync.RWMutex
    data map[string]int
}

func (m *SafeMap) Get(key string) (int, bool) {
    m.mu.RLock()         // 读锁
    defer m.mu.RUnlock()
    val, ok := m.data[key]
    return val, ok
}

func (m *SafeMap) Set(key string, value int) {
    m.mu.Lock()          // 写锁
    defer m.mu.Unlock()
    m.data[key] = value
}

使用 atomic 包

原子操作,适合简单计数器:

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

func main() {
    var counter int64  // 注意:用int64
    var wg sync.WaitGroup
    
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            atomic.AddInt64(&counter, 1)  // 原子加1
        }()
    }
    
    wg.Wait()
    fmt.Println("Counter:", atomic.LoadInt64(&counter))
}

实战案例:并发下载

package main

import (
    "fmt"
    "sync"
    "time"
)

// 模拟下载文件
func download(url string, wg *sync.WaitGroup) {
    defer wg.Done()
    
    fmt.Printf("开始下载: %s\n", url)
    
    // 模拟下载耗时
    time.Sleep(time.Duration(100+len(url)*10) * time.Millisecond)
    
    fmt.Printf("✅ 下载完成: %s\n", url)
}

func main() {
    urls := []string{
        "https://example.com/file1.zip",
        "https://example.com/file2.zip",
        "https://example.com/file3.zip",
        "https://example.com/file4.zip",
        "https://example.com/file5.zip",
    }
    
    var wg sync.WaitGroup
    
    start := time.Now()
    
    // 并发下载
    for _, url := range urls {
        wg.Add(1)
        go download(url, &wg)
    }
    
    wg.Wait()
    
    elapsed := time.Since(start)
    fmt.Printf("\n全部下载完成! 总耗时: %v\n", elapsed)
}

实战案例:并发爬虫

package main

import (
    "fmt"
    "sync"
    "time"
)

// 爬取结果
type CrawlResult struct {
    URL     string
    Title   string
    Success bool
    Error   error
}

// 模拟爬取页面
func crawl(url string) CrawlResult {
    // 模拟网络延迟
    time.Sleep(100 * time.Millisecond)
    
    // 模拟成功率80%
    if time.Now().UnixNano()%5 == 0 {
        return CrawlResult{
            URL:     url,
            Success: false,
            Error:   fmt.Errorf("connection timeout"),
        }
    }
    
    return CrawlResult{
        URL:     url,
        Title:   fmt.Sprintf("Page: %s", url),
        Success: true,
    }
}

// 并发爬虫
func crawlAll(urls []string, concurrency int) []CrawlResult {
    results := make([]CrawlResult, len(urls))
    var mu sync.Mutex
    var wg sync.WaitGroup
    
    // 信号量:限制并发数
    sem := make(chan struct{}, concurrency)
    
    for i, url := range urls {
        wg.Add(1)
        go func(idx int, u string) {
            defer wg.Done()
            
            sem <- struct{}{}        // 获取信号量
            defer func() { <-sem }() // 释放信号量
            
            result := crawl(u)
            
            mu.Lock()
            results[idx] = result
            mu.Unlock()
        }(i, url)
    }
    
    wg.Wait()
    return results
}

func main() {
    urls := []string{
        "https://golang.org",
        "https://google.com",
        "https://github.com",
        "https://stackoverflow.com",
        "https://reddit.com",
        "https://twitter.com",
        "https://facebook.com",
        "https://amazon.com",
        "https://apple.com",
        "https://microsoft.com",
    }
    
    fmt.Printf("开始爬取 %d 个页面...\n\n", len(urls))
    
    start := time.Now()
    results := crawlAll(urls, 3)  // 最多3个并发
    elapsed := time.Since(start)
    
    // 统计结果
    success := 0
    failed := 0
    
    for _, r := range results {
        if r.Success {
            fmt.Printf("✅ %s - %s\n", r.URL, r.Title)
            success++
        } else {
            fmt.Printf("❌ %s - %v\n", r.URL, r.Error)
            failed++
        }
    }
    
    fmt.Printf("\n爬取完成! 成功: %d, 失败: %d, 耗时: %v\n", success, failed, elapsed)
}

实战案例:工作池模式

package main

import (
    "fmt"
    "sync"
    "time"
)

// 任务
type Job struct {
    ID   int
    Data string
}

// 结果
type Result struct {
    JobID   int
    Output  string
    Success bool
}

// 工作者
func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for job := range jobs {
        fmt.Printf("👷 Worker %d 处理任务 %d\n", id, job.ID)
        
        // 模拟处理
        time.Sleep(100 * time.Millisecond)
        
        results <- Result{
            JobID:   job.ID,
            Output:  fmt.Sprintf("处理结果: %s -> done", job.Data),
            Success: true,
        }
    }
    
    fmt.Printf("👷 Worker %d 退出\n", id)
}

func main() {
    numJobs := 20
    numWorkers := 3
    
    jobs := make(chan Job, numJobs)
    results := make(chan Result, numJobs)
    
    var wg sync.WaitGroup
    
    // 启动工作者
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, jobs, results, &wg)
    }
    
    // 发送任务
    for i := 1; i <= numJobs; i++ {
        jobs <- Job{ID: i, Data: fmt.Sprintf("任务%d", i)}
    }
    close(jobs)  // 关闭通道,通知工作者没有更多任务
    
    // 等待所有工作者完成后关闭结果通道
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // 收集结果
    for result := range results {
        fmt.Printf("📦 结果: 任务%d - %s\n", result.JobID, result.Output)
    }
    
    fmt.Println("\n✅ 所有任务处理完成!")
}

Goroutine 泄漏

忘记关闭goroutine会导致内存泄漏:

// ❌ 泄漏:goroutine永远阻塞
func leak() {
    ch := make(chan int)
    go func() {
        val := <-ch  // 永远阻塞,因为没人发送
        fmt.Println(val)
    }()
    // 函数返回,但goroutine还在
}

// ✅ 正确:使用context取消
func noLeak(ctx context.Context) {
    ch := make(chan int)
    go func() {
        select {
        case val := <-ch:
            fmt.Println(val)
        case <-ctx.Done():
            fmt.Println("cancelled")
            return
        }
    }()
}

最佳实践

1. 明确生命周期

// 确保每个goroutine都能正确退出
func worker(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            return  // 收到取消信号,退出
        default:
            // 继续工作
        }
    }
}

2. 限制并发数

// 使用有缓冲channel作为信号量
sem := make(chan struct{}, maxConcurrency)

for _, item := range items {
    sem <- struct{}{}  // 获取槽位
    go func(it Item) {
        defer func() { <-sem }()  // 释放槽位
        process(it)
    }(item)
}

3. 不要在goroutine中panic

go func() {
    defer func() {
        if r := recover(); r != nil {
            log.Printf("recovered: %v", r)
        }
    }()
    
    riskyOperation()
}()

练习

  1. 创建5个goroutine,每个打印自己的编号
  2. 实现一个并发计数器,使用mutex保证安全
  3. 使用WaitGroup等待多个goroutine完成
参考答案
package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

// 1. 打印编号
func printNumbers() {
    var wg sync.WaitGroup
    
    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            fmt.Printf("Goroutine %d\n", id)
        }(i)
    }
    
    wg.Wait()
}

// 2. 并发计数器
type Counter struct {
    mu    sync.Mutex
    value int
}

func (c *Counter) Inc() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value++
}

func (c *Counter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

// 或用atomic
type AtomicCounter struct {
    value int64
}

func (c *AtomicCounter) Inc() {
    atomic.AddInt64(&c.value, 1)
}

func (c *AtomicCounter) Value() int64 {
    return atomic.LoadInt64(&c.value)
}

// 3. WaitGroup等待
func waitExample() {
    var wg sync.WaitGroup
    
    tasks := []string{"任务A", "任务B", "任务C"}
    
    for _, task := range tasks {
        wg.Add(1)
        go func(t string) {
            defer wg.Done()
            fmt.Printf("开始: %s\n", t)
            time.Sleep(100 * time.Millisecond)
            fmt.Printf("完成: %s\n", t)
        }(task)
    }
    
    wg.Wait()
    fmt.Println("所有任务完成")
}

func main() {
    fmt.Println("=== 打印编号 ===")
    printNumbers()
    
    fmt.Println("\n=== 并发计数器 ===")
    counter := &Counter{}
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Inc()
        }()
    }
    wg.Wait()
    fmt.Printf("Counter: %d\n", counter.Value())
    
    fmt.Println("\n=== WaitGroup ===")
    waitExample()
}

goroutine让并发变得简单,但goroutine之间如何通信呢?下一节学习Channel!

最近更新: 2025/12/27 13:26
Contributors: 王长安
Prev
错误处理
Next
Channel