Goroutine
Goroutine是Go的杀手锉!超轻量级的并发,比线程轻1000倍!我第一次接触 goroutine 的时候真的被惊艳到了。以前写 Java 多线程代码,各种线程池、同步、锁,复杂得不行。Go 只需要一个 go 关键字,就能轻松启动成千上万的并发任务!
什么是Goroutine?
Goroutine是Go实现的轻量级线程,由Go运行时管理,不是操作系统线程。
// 启动一个goroutine,就是在函数调用前加 go
go doSomething()
go func() {
fmt.Println("我在goroutine中运行")
}()
Goroutine vs 线程
| 特性 | 系统线程 | Goroutine |
|---|---|---|
| 内存占用 | ~1MB | ~2KB |
| 创建速度 | 慢 | 快 |
| 切换开销 | 大(内核态) | 小(用户态) |
| 数量限制 | 数千个 | 数十万个 |
| 管理 | 操作系统 | Go运行时 |
Java程序员注意
Java的线程是1:1映射到系统线程的,创建开销大。 Go的goroutine是M:N模型,多个goroutine复用少量系统线程。
// Java - 创建线程
new Thread(() -> {
System.out.println("Hello");
}).start();
// Go - 创建goroutine(轻量100倍!)
go func() {
fmt.Println("Hello")
}()
启动Goroutine
package main
import (
"fmt"
"time"
)
func sayHello() {
fmt.Println("Hello from goroutine!")
}
func main() {
// 启动goroutine
go sayHello()
// 匿名函数
go func() {
fmt.Println("Hello from anonymous goroutine!")
}()
// 带参数
go func(name string) {
fmt.Println("Hello,", name)
}("张三")
// 主goroutine等待一下,否则程序会立即退出
time.Sleep(time.Second)
}
注意
main函数结束,所有goroutine都会被终止!要确保goroutine有机会执行完。
并发 vs 并行
- 并发(Concurrency):同时处理多件事的能力(结构上)
- 并行(Parallelism):同时执行多件事(物理上)
// 并发:多个goroutine交替执行
// 并行:多个goroutine在多核上同时执行
// Go运行时会自动利用多核
runtime.GOMAXPROCS(runtime.NumCPU()) // 默认已经是这样
等待Goroutine完成
使用 sync.WaitGroup
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1) // 增加计数器
go func(id int) {
defer wg.Done() // 完成时减少计数器
fmt.Printf("Worker %d 开始工作\n", id)
// 模拟工作
fmt.Printf("Worker %d 完成工作\n", id)
}(i)
}
wg.Wait() // 等待所有goroutine完成
fmt.Println("所有工作完成!")
}
WaitGroup 使用要点
var wg sync.WaitGroup
// ✅ 正确:在goroutine外Add
wg.Add(1)
go func() {
defer wg.Done()
}()
// ❌ 错误:在goroutine内Add(可能还没Add就Wait了)
go func() {
wg.Add(1)
defer wg.Done()
}()
// ✅ 批量Add
wg.Add(5)
for i := 0; i < 5; i++ {
go worker(&wg)
}
Goroutine和闭包
常见错误:循环变量
// ❌ 错误:所有goroutine共享同一个变量
for i := 0; i < 5; i++ {
go func() {
fmt.Println(i) // 可能都打印5
}()
}
// ✅ 正确:传参
for i := 0; i < 5; i++ {
go func(n int) {
fmt.Println(n) // 0, 1, 2, 3, 4(顺序不定)
}(i)
}
// ✅ 正确:创建新变量
for i := 0; i < 5; i++ {
i := i // 创建新变量
go func() {
fmt.Println(i)
}()
}
并发安全
竞态条件
package main
import (
"fmt"
"sync"
)
func main() {
counter := 0
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++ // ❌ 竞态条件!
}()
}
wg.Wait()
fmt.Println("Counter:", counter) // 可能不是1000!
}
使用 sync.Mutex
package main
import (
"fmt"
"sync"
)
func main() {
counter := 0
var mu sync.Mutex
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock() // 加锁
counter++
mu.Unlock() // 解锁
}()
}
wg.Wait()
fmt.Println("Counter:", counter) // 一定是1000
}
使用 sync.RWMutex
读写锁:读操作可以并发,写操作互斥
type SafeMap struct {
mu sync.RWMutex
data map[string]int
}
func (m *SafeMap) Get(key string) (int, bool) {
m.mu.RLock() // 读锁
defer m.mu.RUnlock()
val, ok := m.data[key]
return val, ok
}
func (m *SafeMap) Set(key string, value int) {
m.mu.Lock() // 写锁
defer m.mu.Unlock()
m.data[key] = value
}
使用 atomic 包
原子操作,适合简单计数器:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var counter int64 // 注意:用int64
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt64(&counter, 1) // 原子加1
}()
}
wg.Wait()
fmt.Println("Counter:", atomic.LoadInt64(&counter))
}
实战案例:并发下载
package main
import (
"fmt"
"sync"
"time"
)
// 模拟下载文件
func download(url string, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("开始下载: %s\n", url)
// 模拟下载耗时
time.Sleep(time.Duration(100+len(url)*10) * time.Millisecond)
fmt.Printf("✅ 下载完成: %s\n", url)
}
func main() {
urls := []string{
"https://example.com/file1.zip",
"https://example.com/file2.zip",
"https://example.com/file3.zip",
"https://example.com/file4.zip",
"https://example.com/file5.zip",
}
var wg sync.WaitGroup
start := time.Now()
// 并发下载
for _, url := range urls {
wg.Add(1)
go download(url, &wg)
}
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("\n全部下载完成! 总耗时: %v\n", elapsed)
}
实战案例:并发爬虫
package main
import (
"fmt"
"sync"
"time"
)
// 爬取结果
type CrawlResult struct {
URL string
Title string
Success bool
Error error
}
// 模拟爬取页面
func crawl(url string) CrawlResult {
// 模拟网络延迟
time.Sleep(100 * time.Millisecond)
// 模拟成功率80%
if time.Now().UnixNano()%5 == 0 {
return CrawlResult{
URL: url,
Success: false,
Error: fmt.Errorf("connection timeout"),
}
}
return CrawlResult{
URL: url,
Title: fmt.Sprintf("Page: %s", url),
Success: true,
}
}
// 并发爬虫
func crawlAll(urls []string, concurrency int) []CrawlResult {
results := make([]CrawlResult, len(urls))
var mu sync.Mutex
var wg sync.WaitGroup
// 信号量:限制并发数
sem := make(chan struct{}, concurrency)
for i, url := range urls {
wg.Add(1)
go func(idx int, u string) {
defer wg.Done()
sem <- struct{}{} // 获取信号量
defer func() { <-sem }() // 释放信号量
result := crawl(u)
mu.Lock()
results[idx] = result
mu.Unlock()
}(i, url)
}
wg.Wait()
return results
}
func main() {
urls := []string{
"https://golang.org",
"https://google.com",
"https://github.com",
"https://stackoverflow.com",
"https://reddit.com",
"https://twitter.com",
"https://facebook.com",
"https://amazon.com",
"https://apple.com",
"https://microsoft.com",
}
fmt.Printf("开始爬取 %d 个页面...\n\n", len(urls))
start := time.Now()
results := crawlAll(urls, 3) // 最多3个并发
elapsed := time.Since(start)
// 统计结果
success := 0
failed := 0
for _, r := range results {
if r.Success {
fmt.Printf("✅ %s - %s\n", r.URL, r.Title)
success++
} else {
fmt.Printf("❌ %s - %v\n", r.URL, r.Error)
failed++
}
}
fmt.Printf("\n爬取完成! 成功: %d, 失败: %d, 耗时: %v\n", success, failed, elapsed)
}
实战案例:工作池模式
package main
import (
"fmt"
"sync"
"time"
)
// 任务
type Job struct {
ID int
Data string
}
// 结果
type Result struct {
JobID int
Output string
Success bool
}
// 工作者
func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("👷 Worker %d 处理任务 %d\n", id, job.ID)
// 模拟处理
time.Sleep(100 * time.Millisecond)
results <- Result{
JobID: job.ID,
Output: fmt.Sprintf("处理结果: %s -> done", job.Data),
Success: true,
}
}
fmt.Printf("👷 Worker %d 退出\n", id)
}
func main() {
numJobs := 20
numWorkers := 3
jobs := make(chan Job, numJobs)
results := make(chan Result, numJobs)
var wg sync.WaitGroup
// 启动工作者
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
// 发送任务
for i := 1; i <= numJobs; i++ {
jobs <- Job{ID: i, Data: fmt.Sprintf("任务%d", i)}
}
close(jobs) // 关闭通道,通知工作者没有更多任务
// 等待所有工作者完成后关闭结果通道
go func() {
wg.Wait()
close(results)
}()
// 收集结果
for result := range results {
fmt.Printf("📦 结果: 任务%d - %s\n", result.JobID, result.Output)
}
fmt.Println("\n✅ 所有任务处理完成!")
}
Goroutine 泄漏
忘记关闭goroutine会导致内存泄漏:
// ❌ 泄漏:goroutine永远阻塞
func leak() {
ch := make(chan int)
go func() {
val := <-ch // 永远阻塞,因为没人发送
fmt.Println(val)
}()
// 函数返回,但goroutine还在
}
// ✅ 正确:使用context取消
func noLeak(ctx context.Context) {
ch := make(chan int)
go func() {
select {
case val := <-ch:
fmt.Println(val)
case <-ctx.Done():
fmt.Println("cancelled")
return
}
}()
}
最佳实践
1. 明确生命周期
// 确保每个goroutine都能正确退出
func worker(ctx context.Context) {
for {
select {
case <-ctx.Done():
return // 收到取消信号,退出
default:
// 继续工作
}
}
}
2. 限制并发数
// 使用有缓冲channel作为信号量
sem := make(chan struct{}, maxConcurrency)
for _, item := range items {
sem <- struct{}{} // 获取槽位
go func(it Item) {
defer func() { <-sem }() // 释放槽位
process(it)
}(item)
}
3. 不要在goroutine中panic
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("recovered: %v", r)
}
}()
riskyOperation()
}()
练习
- 创建5个goroutine,每个打印自己的编号
- 实现一个并发计数器,使用mutex保证安全
- 使用WaitGroup等待多个goroutine完成
参考答案
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
// 1. 打印编号
func printNumbers() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("Goroutine %d\n", id)
}(i)
}
wg.Wait()
}
// 2. 并发计数器
type Counter struct {
mu sync.Mutex
value int
}
func (c *Counter) Inc() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
// 或用atomic
type AtomicCounter struct {
value int64
}
func (c *AtomicCounter) Inc() {
atomic.AddInt64(&c.value, 1)
}
func (c *AtomicCounter) Value() int64 {
return atomic.LoadInt64(&c.value)
}
// 3. WaitGroup等待
func waitExample() {
var wg sync.WaitGroup
tasks := []string{"任务A", "任务B", "任务C"}
for _, task := range tasks {
wg.Add(1)
go func(t string) {
defer wg.Done()
fmt.Printf("开始: %s\n", t)
time.Sleep(100 * time.Millisecond)
fmt.Printf("完成: %s\n", t)
}(task)
}
wg.Wait()
fmt.Println("所有任务完成")
}
func main() {
fmt.Println("=== 打印编号 ===")
printNumbers()
fmt.Println("\n=== 并发计数器 ===")
counter := &Counter{}
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Inc()
}()
}
wg.Wait()
fmt.Printf("Counter: %d\n", counter.Value())
fmt.Println("\n=== WaitGroup ===")
waitExample()
}
goroutine让并发变得简单,但goroutine之间如何通信呢?下一节学习Channel!
