Channel
Channel是goroutine之间通信的管道,是Go并发的灵魂!我刚开始学并发的时候,最头疼的就是各种锁、同步原语。Channel 的设计让我眼前一亮,原来并发可以这么优雅!“不要通过共享内存来通信,而要通过通信来共享内存”,这句话真是太经典了!
Go的并发哲学
不要通过共享内存来通信,而要通过通信来共享内存。 — Go 并发格言
// ❌ 传统方式:共享内存 + 锁
var data int
var mu sync.Mutex
mu.Lock()
data = 42
mu.Unlock()
// ✅ Go方式:通过channel通信
ch := make(chan int)
ch <- 42 // 发送
data := <-ch // 接收
创建Channel
// 无缓冲channel
ch1 := make(chan int)
// 有缓冲channel(容量为5)
ch2 := make(chan int, 5)
// 声明(nil channel)
var ch3 chan int
// 只读channel
var readOnly <-chan int
// 只写channel
var writeOnly chan<- int
Channel 操作
发送和接收
ch := make(chan int)
// 发送(在另一个goroutine)
go func() {
ch <- 42 // 发送值
}()
// 接收
value := <-ch
fmt.Println(value) // 42
关闭Channel
ch := make(chan int, 3)
ch <- 1
ch <- 2
ch <- 3
close(ch) // 关闭channel
// 关闭后仍可接收
fmt.Println(<-ch) // 1
fmt.Println(<-ch) // 2
fmt.Println(<-ch) // 3
fmt.Println(<-ch) // 0(零值,channel已关闭且空)
// 检测channel是否关闭
value, ok := <-ch
if !ok {
fmt.Println("channel已关闭")
}
关闭Channel的规则
- 只有发送方应该关闭channel
- 关闭已关闭的channel会panic
- 向已关闭的channel发送会panic
- 从已关闭的channel接收会返回零值
无缓冲 vs 有缓冲
无缓冲Channel(同步)
发送和接收必须同时就绪,否则阻塞。
ch := make(chan int) // 无缓冲
go func() {
fmt.Println("准备发送...")
ch <- 42 // 阻塞,直到有人接收
fmt.Println("发送完成")
}()
time.Sleep(time.Second)
fmt.Println("准备接收...")
fmt.Println(<-ch) // 此时发送方才能继续
有缓冲Channel(异步)
缓冲区满时发送阻塞,空时接收阻塞。
ch := make(chan int, 3) // 缓冲容量3
ch <- 1 // 不阻塞
ch <- 2 // 不阻塞
ch <- 3 // 不阻塞
// ch <- 4 // 会阻塞!缓冲区满
fmt.Println(len(ch)) // 3(当前元素数)
fmt.Println(cap(ch)) // 3(容量)
遍历Channel
for range
ch := make(chan int)
go func() {
for i := 1; i <= 5; i++ {
ch <- i
}
close(ch) // 关闭channel
}()
// 遍历直到channel关闭
for value := range ch {
fmt.Println(value)
}
// 输出: 1 2 3 4 5
select 语句
同时处理多个channel操作:
ch1 := make(chan int)
ch2 := make(chan string)
go func() {
time.Sleep(100 * time.Millisecond)
ch1 <- 1
}()
go func() {
time.Sleep(200 * time.Millisecond)
ch2 <- "hello"
}()
// 哪个先就绪就执行哪个
select {
case v := <-ch1:
fmt.Println("从ch1收到:", v)
case v := <-ch2:
fmt.Println("从ch2收到:", v)
}
非阻塞操作
select {
case msg := <-ch:
fmt.Println("收到消息:", msg)
default:
fmt.Println("没有消息,继续其他工作")
}
超时处理
select {
case result := <-ch:
fmt.Println("收到结果:", result)
case <-time.After(3 * time.Second):
fmt.Println("超时了!")
}
常用Channel模式
1. 信号/通知
done := make(chan struct{})
go func() {
// 做一些工作
time.Sleep(time.Second)
close(done) // 发送完成信号
}()
<-done // 等待完成
fmt.Println("工作完成")
2. 生产者-消费者
func producer(ch chan<- int) {
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
}
func consumer(ch <-chan int) {
for v := range ch {
fmt.Println("消费:", v)
}
}
func main() {
ch := make(chan int, 5)
go producer(ch)
consumer(ch)
}
3. 扇出(一对多)
func fanOut(ch <-chan int, n int) []chan int {
channels := make([]chan int, n)
for i := 0; i < n; i++ {
channels[i] = make(chan int)
go func(c chan int) {
for v := range ch {
c <- v
}
close(c)
}(channels[i])
}
return channels
}
4. 扇入(多对一)
func fanIn(channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
out <- v
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
5. 流水线
// 阶段1:生成数字
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// 阶段2:平方
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// 阶段3:打印
func print(in <-chan int) {
for n := range in {
fmt.Println(n)
}
}
func main() {
// 流水线: 生成 -> 平方 -> 打印
nums := generate(1, 2, 3, 4, 5)
squares := square(nums)
print(squares)
// 输出: 1 4 9 16 25
}
实战案例:聊天室
package main
import (
"bufio"
"fmt"
"os"
"strings"
"time"
)
// 消息
type Message struct {
User string
Content string
Time time.Time
}
// 聊天室
type ChatRoom struct {
messages chan Message
users map[string]chan Message
join chan string
leave chan string
}
func NewChatRoom() *ChatRoom {
return &ChatRoom{
messages: make(chan Message, 100),
users: make(map[string]chan Message),
join: make(chan string),
leave: make(chan string),
}
}
// 运行聊天室
func (c *ChatRoom) Run() {
for {
select {
case user := <-c.join:
c.users[user] = make(chan Message, 10)
c.broadcast(Message{
User: "系统",
Content: fmt.Sprintf("%s 加入了聊天室", user),
Time: time.Now(),
})
case user := <-c.leave:
if ch, ok := c.users[user]; ok {
close(ch)
delete(c.users, user)
c.broadcast(Message{
User: "系统",
Content: fmt.Sprintf("%s 离开了聊天室", user),
Time: time.Now(),
})
}
case msg := <-c.messages:
c.broadcast(msg)
}
}
}
// 广播消息
func (c *ChatRoom) broadcast(msg Message) {
for _, ch := range c.users {
select {
case ch <- msg:
default:
// 如果用户的channel满了,跳过
}
}
}
// 用户加入
func (c *ChatRoom) Join(user string) <-chan Message {
c.join <- user
return c.users[user]
}
// 用户离开
func (c *ChatRoom) Leave(user string) {
c.leave <- user
}
// 发送消息
func (c *ChatRoom) Send(user, content string) {
c.messages <- Message{
User: user,
Content: content,
Time: time.Now(),
}
}
func main() {
room := NewChatRoom()
go room.Run()
// 模拟用户
users := []string{"张三", "李四", "王五"}
for _, user := range users {
msgs := room.Join(user)
// 接收消息的goroutine
go func(name string, ch <-chan Message) {
for msg := range ch {
fmt.Printf("[%s] %s: %s\n",
msg.Time.Format("15:04:05"),
msg.User,
msg.Content)
}
}(user, msgs)
}
// 模拟聊天
time.Sleep(100 * time.Millisecond)
room.Send("张三", "大家好!")
time.Sleep(50 * time.Millisecond)
room.Send("李四", "你好啊,张三!")
time.Sleep(50 * time.Millisecond)
room.Send("王五", "今天天气不错")
time.Sleep(100 * time.Millisecond)
room.Leave("李四")
time.Sleep(100 * time.Millisecond)
room.Send("张三", "李四走了?")
time.Sleep(time.Second)
}
实战案例:任务调度器
package main
import (
"context"
"fmt"
"sync"
"time"
)
// 任务
type Task struct {
ID int
Name string
Duration time.Duration
}
// 任务结果
type TaskResult struct {
TaskID int
Success bool
Message string
}
// 调度器
type Scheduler struct {
tasks chan Task
results chan TaskResult
workers int
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
func NewScheduler(workers int) *Scheduler {
ctx, cancel := context.WithCancel(context.Background())
return &Scheduler{
tasks: make(chan Task, 100),
results: make(chan TaskResult, 100),
workers: workers,
ctx: ctx,
cancel: cancel,
}
}
// 启动调度器
func (s *Scheduler) Start() {
// 启动工作者
for i := 1; i <= s.workers; i++ {
s.wg.Add(1)
go s.worker(i)
}
// 启动结果收集器
go s.collectResults()
}
// 工作者
func (s *Scheduler) worker(id int) {
defer s.wg.Done()
for {
select {
case <-s.ctx.Done():
fmt.Printf("Worker %d 停止\n", id)
return
case task, ok := <-s.tasks:
if !ok {
return
}
fmt.Printf("👷 Worker %d 执行任务 %d: %s\n", id, task.ID, task.Name)
// 模拟任务执行
time.Sleep(task.Duration)
s.results <- TaskResult{
TaskID: task.ID,
Success: true,
Message: fmt.Sprintf("任务 %s 完成", task.Name),
}
}
}
}
// 收集结果
func (s *Scheduler) collectResults() {
for result := range s.results {
status := "✅"
if !result.Success {
status = "❌"
}
fmt.Printf("%s 任务 %d: %s\n", status, result.TaskID, result.Message)
}
}
// 提交任务
func (s *Scheduler) Submit(task Task) {
s.tasks <- task
}
// 停止调度器
func (s *Scheduler) Stop() {
close(s.tasks)
s.wg.Wait()
s.cancel()
close(s.results)
}
func main() {
scheduler := NewScheduler(3)
scheduler.Start()
// 提交任务
tasks := []Task{
{1, "发送邮件", 100 * time.Millisecond},
{2, "生成报告", 200 * time.Millisecond},
{3, "数据同步", 150 * time.Millisecond},
{4, "清理缓存", 50 * time.Millisecond},
{5, "备份数据", 300 * time.Millisecond},
}
for _, task := range tasks {
scheduler.Submit(task)
}
// 等待任务完成
time.Sleep(2 * time.Second)
scheduler.Stop()
fmt.Println("调度器已停止")
}
实战案例:限流器
package main
import (
"fmt"
"time"
)
// 令牌桶限流器
type RateLimiter struct {
tokens chan struct{}
interval time.Duration
stop chan struct{}
}
func NewRateLimiter(rate int, interval time.Duration) *RateLimiter {
rl := &RateLimiter{
tokens: make(chan struct{}, rate),
interval: interval,
stop: make(chan struct{}),
}
// 初始填满令牌桶
for i := 0; i < rate; i++ {
rl.tokens <- struct{}{}
}
// 定时添加令牌
go rl.refill(rate)
return rl
}
func (rl *RateLimiter) refill(rate int) {
ticker := time.NewTicker(rl.interval / time.Duration(rate))
defer ticker.Stop()
for {
select {
case <-rl.stop:
return
case <-ticker.C:
select {
case rl.tokens <- struct{}{}:
default:
// 令牌桶满了
}
}
}
}
// 获取令牌
func (rl *RateLimiter) Allow() bool {
select {
case <-rl.tokens:
return true
default:
return false
}
}
// 等待获取令牌
func (rl *RateLimiter) Wait() {
<-rl.tokens
}
// 停止限流器
func (rl *RateLimiter) Stop() {
close(rl.stop)
}
func main() {
// 每秒5个请求
limiter := NewRateLimiter(5, time.Second)
defer limiter.Stop()
// 模拟请求
for i := 1; i <= 15; i++ {
if limiter.Allow() {
fmt.Printf("✅ 请求 %d 通过\n", i)
} else {
fmt.Printf("❌ 请求 %d 被限流\n", i)
}
time.Sleep(100 * time.Millisecond)
}
}
Channel 最佳实践
1. 明确所有权
// 创建者负责关闭
func producer() <-chan int {
ch := make(chan int)
go func() {
defer close(ch) // 生产者关闭
for i := 0; i < 10; i++ {
ch <- i
}
}()
return ch
}
2. 避免死锁
// ❌ 死锁:无缓冲channel,同一goroutine发送和接收
ch := make(chan int)
ch <- 1 // 阻塞,等待接收
v := <-ch // 永远执行不到
// ✅ 正确:不同goroutine
ch := make(chan int)
go func() { ch <- 1 }()
v := <-ch
3. 使用select处理多个channel
select {
case <-done:
return
case msg := <-messages:
process(msg)
case <-time.After(timeout):
handleTimeout()
}
练习
- 实现一个简单的生产者-消费者模式
- 使用channel实现两个goroutine交替打印1-100
- 实现一个超时处理的函数调用
参考答案
package main
import (
"fmt"
"time"
)
// 1. 生产者-消费者
func producerConsumer() {
ch := make(chan int, 5)
done := make(chan struct{})
// 生产者
go func() {
for i := 1; i <= 10; i++ {
ch <- i
fmt.Printf("生产: %d\n", i)
}
close(ch)
}()
// 消费者
go func() {
for v := range ch {
fmt.Printf("消费: %d\n", v)
time.Sleep(50 * time.Millisecond)
}
close(done)
}()
<-done
}
// 2. 交替打印
func alternatePrint() {
ch1 := make(chan struct{})
ch2 := make(chan struct{})
done := make(chan struct{})
// 打印奇数
go func() {
for i := 1; i <= 100; i += 2 {
<-ch1
fmt.Printf("Goroutine1: %d\n", i)
ch2 <- struct{}{}
}
}()
// 打印偶数
go func() {
for i := 2; i <= 100; i += 2 {
<-ch2
fmt.Printf("Goroutine2: %d\n", i)
if i < 100 {
ch1 <- struct{}{}
} else {
close(done)
}
}
}()
ch1 <- struct{}{} // 启动
<-done
}
// 3. 超时处理
func doWithTimeout(fn func() string, timeout time.Duration) (string, error) {
result := make(chan string, 1)
go func() {
result <- fn()
}()
select {
case res := <-result:
return res, nil
case <-time.After(timeout):
return "", fmt.Errorf("执行超时")
}
}
func main() {
fmt.Println("=== 生产者-消费者 ===")
producerConsumer()
fmt.Println("\n=== 交替打印 ===")
alternatePrint()
fmt.Println("\n=== 超时处理 ===")
result, err := doWithTimeout(func() string {
time.Sleep(500 * time.Millisecond)
return "完成"
}, 1*time.Second)
if err != nil {
fmt.Println("错误:", err)
} else {
fmt.Println("结果:", result)
}
}
Channel是Go并发编程的核心,下一节学习包管理!
