Go语言快速入门Go语言快速入门
首页
基础篇
进阶篇
高阶篇
实战篇
Go官方网站
编程指南
首页
基础篇
进阶篇
高阶篇
实战篇
Go官方网站
编程指南
  • 进阶篇

    • 🚀 进阶篇
    • 方法
    • 接口
    • 错误处理
    • Goroutine
    • Channel
    • 包管理
    • 单元测试

Channel

Channel是goroutine之间通信的管道,是Go并发的灵魂!我刚开始学并发的时候,最头疼的就是各种锁、同步原语。Channel 的设计让我眼前一亮,原来并发可以这么优雅!“不要通过共享内存来通信,而要通过通信来共享内存”,这句话真是太经典了!

Go的并发哲学

不要通过共享内存来通信,而要通过通信来共享内存。 — Go 并发格言

// ❌ 传统方式:共享内存 + 锁
var data int
var mu sync.Mutex

mu.Lock()
data = 42
mu.Unlock()

// ✅ Go方式:通过channel通信
ch := make(chan int)
ch <- 42      // 发送
data := <-ch  // 接收

创建Channel

// 无缓冲channel
ch1 := make(chan int)

// 有缓冲channel(容量为5)
ch2 := make(chan int, 5)

// 声明(nil channel)
var ch3 chan int

// 只读channel
var readOnly <-chan int

// 只写channel
var writeOnly chan<- int

Channel 操作

发送和接收

ch := make(chan int)

// 发送(在另一个goroutine)
go func() {
    ch <- 42  // 发送值
}()

// 接收
value := <-ch
fmt.Println(value)  // 42

关闭Channel

ch := make(chan int, 3)
ch <- 1
ch <- 2
ch <- 3

close(ch)  // 关闭channel

// 关闭后仍可接收
fmt.Println(<-ch)  // 1
fmt.Println(<-ch)  // 2
fmt.Println(<-ch)  // 3
fmt.Println(<-ch)  // 0(零值,channel已关闭且空)

// 检测channel是否关闭
value, ok := <-ch
if !ok {
    fmt.Println("channel已关闭")
}

关闭Channel的规则

  • 只有发送方应该关闭channel
  • 关闭已关闭的channel会panic
  • 向已关闭的channel发送会panic
  • 从已关闭的channel接收会返回零值

无缓冲 vs 有缓冲

无缓冲Channel(同步)

发送和接收必须同时就绪,否则阻塞。

ch := make(chan int)  // 无缓冲

go func() {
    fmt.Println("准备发送...")
    ch <- 42  // 阻塞,直到有人接收
    fmt.Println("发送完成")
}()

time.Sleep(time.Second)
fmt.Println("准备接收...")
fmt.Println(<-ch)  // 此时发送方才能继续

有缓冲Channel(异步)

缓冲区满时发送阻塞,空时接收阻塞。

ch := make(chan int, 3)  // 缓冲容量3

ch <- 1  // 不阻塞
ch <- 2  // 不阻塞
ch <- 3  // 不阻塞
// ch <- 4  // 会阻塞!缓冲区满

fmt.Println(len(ch))  // 3(当前元素数)
fmt.Println(cap(ch))  // 3(容量)

遍历Channel

for range

ch := make(chan int)

go func() {
    for i := 1; i <= 5; i++ {
        ch <- i
    }
    close(ch)  // 关闭channel
}()

// 遍历直到channel关闭
for value := range ch {
    fmt.Println(value)
}
// 输出: 1 2 3 4 5

select 语句

同时处理多个channel操作:

ch1 := make(chan int)
ch2 := make(chan string)

go func() {
    time.Sleep(100 * time.Millisecond)
    ch1 <- 1
}()

go func() {
    time.Sleep(200 * time.Millisecond)
    ch2 <- "hello"
}()

// 哪个先就绪就执行哪个
select {
case v := <-ch1:
    fmt.Println("从ch1收到:", v)
case v := <-ch2:
    fmt.Println("从ch2收到:", v)
}

非阻塞操作

select {
case msg := <-ch:
    fmt.Println("收到消息:", msg)
default:
    fmt.Println("没有消息,继续其他工作")
}

超时处理

select {
case result := <-ch:
    fmt.Println("收到结果:", result)
case <-time.After(3 * time.Second):
    fmt.Println("超时了!")
}

常用Channel模式

1. 信号/通知

done := make(chan struct{})

go func() {
    // 做一些工作
    time.Sleep(time.Second)
    close(done)  // 发送完成信号
}()

<-done  // 等待完成
fmt.Println("工作完成")

2. 生产者-消费者

func producer(ch chan<- int) {
    for i := 0; i < 10; i++ {
        ch <- i
    }
    close(ch)
}

func consumer(ch <-chan int) {
    for v := range ch {
        fmt.Println("消费:", v)
    }
}

func main() {
    ch := make(chan int, 5)
    
    go producer(ch)
    consumer(ch)
}

3. 扇出(一对多)

func fanOut(ch <-chan int, n int) []chan int {
    channels := make([]chan int, n)
    for i := 0; i < n; i++ {
        channels[i] = make(chan int)
        go func(c chan int) {
            for v := range ch {
                c <- v
            }
            close(c)
        }(channels[i])
    }
    return channels
}

4. 扇入(多对一)

func fanIn(channels ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup
    
    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                out <- v
            }
        }(ch)
    }
    
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

5. 流水线

// 阶段1:生成数字
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

// 阶段2:平方
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

// 阶段3:打印
func print(in <-chan int) {
    for n := range in {
        fmt.Println(n)
    }
}

func main() {
    // 流水线: 生成 -> 平方 -> 打印
    nums := generate(1, 2, 3, 4, 5)
    squares := square(nums)
    print(squares)
    // 输出: 1 4 9 16 25
}

实战案例:聊天室

package main

import (
    "bufio"
    "fmt"
    "os"
    "strings"
    "time"
)

// 消息
type Message struct {
    User    string
    Content string
    Time    time.Time
}

// 聊天室
type ChatRoom struct {
    messages chan Message
    users    map[string]chan Message
    join     chan string
    leave    chan string
}

func NewChatRoom() *ChatRoom {
    return &ChatRoom{
        messages: make(chan Message, 100),
        users:    make(map[string]chan Message),
        join:     make(chan string),
        leave:    make(chan string),
    }
}

// 运行聊天室
func (c *ChatRoom) Run() {
    for {
        select {
        case user := <-c.join:
            c.users[user] = make(chan Message, 10)
            c.broadcast(Message{
                User:    "系统",
                Content: fmt.Sprintf("%s 加入了聊天室", user),
                Time:    time.Now(),
            })
            
        case user := <-c.leave:
            if ch, ok := c.users[user]; ok {
                close(ch)
                delete(c.users, user)
                c.broadcast(Message{
                    User:    "系统",
                    Content: fmt.Sprintf("%s 离开了聊天室", user),
                    Time:    time.Now(),
                })
            }
            
        case msg := <-c.messages:
            c.broadcast(msg)
        }
    }
}

// 广播消息
func (c *ChatRoom) broadcast(msg Message) {
    for _, ch := range c.users {
        select {
        case ch <- msg:
        default:
            // 如果用户的channel满了,跳过
        }
    }
}

// 用户加入
func (c *ChatRoom) Join(user string) <-chan Message {
    c.join <- user
    return c.users[user]
}

// 用户离开
func (c *ChatRoom) Leave(user string) {
    c.leave <- user
}

// 发送消息
func (c *ChatRoom) Send(user, content string) {
    c.messages <- Message{
        User:    user,
        Content: content,
        Time:    time.Now(),
    }
}

func main() {
    room := NewChatRoom()
    go room.Run()
    
    // 模拟用户
    users := []string{"张三", "李四", "王五"}
    
    for _, user := range users {
        msgs := room.Join(user)
        
        // 接收消息的goroutine
        go func(name string, ch <-chan Message) {
            for msg := range ch {
                fmt.Printf("[%s] %s: %s\n",
                    msg.Time.Format("15:04:05"),
                    msg.User,
                    msg.Content)
            }
        }(user, msgs)
    }
    
    // 模拟聊天
    time.Sleep(100 * time.Millisecond)
    room.Send("张三", "大家好!")
    time.Sleep(50 * time.Millisecond)
    room.Send("李四", "你好啊,张三!")
    time.Sleep(50 * time.Millisecond)
    room.Send("王五", "今天天气不错")
    
    time.Sleep(100 * time.Millisecond)
    room.Leave("李四")
    
    time.Sleep(100 * time.Millisecond)
    room.Send("张三", "李四走了?")
    
    time.Sleep(time.Second)
}

实战案例:任务调度器

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// 任务
type Task struct {
    ID       int
    Name     string
    Duration time.Duration
}

// 任务结果
type TaskResult struct {
    TaskID  int
    Success bool
    Message string
}

// 调度器
type Scheduler struct {
    tasks    chan Task
    results  chan TaskResult
    workers  int
    ctx      context.Context
    cancel   context.CancelFunc
    wg       sync.WaitGroup
}

func NewScheduler(workers int) *Scheduler {
    ctx, cancel := context.WithCancel(context.Background())
    return &Scheduler{
        tasks:   make(chan Task, 100),
        results: make(chan TaskResult, 100),
        workers: workers,
        ctx:     ctx,
        cancel:  cancel,
    }
}

// 启动调度器
func (s *Scheduler) Start() {
    // 启动工作者
    for i := 1; i <= s.workers; i++ {
        s.wg.Add(1)
        go s.worker(i)
    }
    
    // 启动结果收集器
    go s.collectResults()
}

// 工作者
func (s *Scheduler) worker(id int) {
    defer s.wg.Done()
    
    for {
        select {
        case <-s.ctx.Done():
            fmt.Printf("Worker %d 停止\n", id)
            return
            
        case task, ok := <-s.tasks:
            if !ok {
                return
            }
            
            fmt.Printf("👷 Worker %d 执行任务 %d: %s\n", id, task.ID, task.Name)
            
            // 模拟任务执行
            time.Sleep(task.Duration)
            
            s.results <- TaskResult{
                TaskID:  task.ID,
                Success: true,
                Message: fmt.Sprintf("任务 %s 完成", task.Name),
            }
        }
    }
}

// 收集结果
func (s *Scheduler) collectResults() {
    for result := range s.results {
        status := "✅"
        if !result.Success {
            status = "❌"
        }
        fmt.Printf("%s 任务 %d: %s\n", status, result.TaskID, result.Message)
    }
}

// 提交任务
func (s *Scheduler) Submit(task Task) {
    s.tasks <- task
}

// 停止调度器
func (s *Scheduler) Stop() {
    close(s.tasks)
    s.wg.Wait()
    s.cancel()
    close(s.results)
}

func main() {
    scheduler := NewScheduler(3)
    scheduler.Start()
    
    // 提交任务
    tasks := []Task{
        {1, "发送邮件", 100 * time.Millisecond},
        {2, "生成报告", 200 * time.Millisecond},
        {3, "数据同步", 150 * time.Millisecond},
        {4, "清理缓存", 50 * time.Millisecond},
        {5, "备份数据", 300 * time.Millisecond},
    }
    
    for _, task := range tasks {
        scheduler.Submit(task)
    }
    
    // 等待任务完成
    time.Sleep(2 * time.Second)
    scheduler.Stop()
    
    fmt.Println("调度器已停止")
}

实战案例:限流器

package main

import (
    "fmt"
    "time"
)

// 令牌桶限流器
type RateLimiter struct {
    tokens   chan struct{}
    interval time.Duration
    stop     chan struct{}
}

func NewRateLimiter(rate int, interval time.Duration) *RateLimiter {
    rl := &RateLimiter{
        tokens:   make(chan struct{}, rate),
        interval: interval,
        stop:     make(chan struct{}),
    }
    
    // 初始填满令牌桶
    for i := 0; i < rate; i++ {
        rl.tokens <- struct{}{}
    }
    
    // 定时添加令牌
    go rl.refill(rate)
    
    return rl
}

func (rl *RateLimiter) refill(rate int) {
    ticker := time.NewTicker(rl.interval / time.Duration(rate))
    defer ticker.Stop()
    
    for {
        select {
        case <-rl.stop:
            return
        case <-ticker.C:
            select {
            case rl.tokens <- struct{}{}:
            default:
                // 令牌桶满了
            }
        }
    }
}

// 获取令牌
func (rl *RateLimiter) Allow() bool {
    select {
    case <-rl.tokens:
        return true
    default:
        return false
    }
}

// 等待获取令牌
func (rl *RateLimiter) Wait() {
    <-rl.tokens
}

// 停止限流器
func (rl *RateLimiter) Stop() {
    close(rl.stop)
}

func main() {
    // 每秒5个请求
    limiter := NewRateLimiter(5, time.Second)
    defer limiter.Stop()
    
    // 模拟请求
    for i := 1; i <= 15; i++ {
        if limiter.Allow() {
            fmt.Printf("✅ 请求 %d 通过\n", i)
        } else {
            fmt.Printf("❌ 请求 %d 被限流\n", i)
        }
        time.Sleep(100 * time.Millisecond)
    }
}

Channel 最佳实践

1. 明确所有权

// 创建者负责关闭
func producer() <-chan int {
    ch := make(chan int)
    go func() {
        defer close(ch)  // 生产者关闭
        for i := 0; i < 10; i++ {
            ch <- i
        }
    }()
    return ch
}

2. 避免死锁

// ❌ 死锁:无缓冲channel,同一goroutine发送和接收
ch := make(chan int)
ch <- 1   // 阻塞,等待接收
v := <-ch // 永远执行不到

// ✅ 正确:不同goroutine
ch := make(chan int)
go func() { ch <- 1 }()
v := <-ch

3. 使用select处理多个channel

select {
case <-done:
    return
case msg := <-messages:
    process(msg)
case <-time.After(timeout):
    handleTimeout()
}

练习

  1. 实现一个简单的生产者-消费者模式
  2. 使用channel实现两个goroutine交替打印1-100
  3. 实现一个超时处理的函数调用
参考答案
package main

import (
    "fmt"
    "time"
)

// 1. 生产者-消费者
func producerConsumer() {
    ch := make(chan int, 5)
    done := make(chan struct{})
    
    // 生产者
    go func() {
        for i := 1; i <= 10; i++ {
            ch <- i
            fmt.Printf("生产: %d\n", i)
        }
        close(ch)
    }()
    
    // 消费者
    go func() {
        for v := range ch {
            fmt.Printf("消费: %d\n", v)
            time.Sleep(50 * time.Millisecond)
        }
        close(done)
    }()
    
    <-done
}

// 2. 交替打印
func alternatePrint() {
    ch1 := make(chan struct{})
    ch2 := make(chan struct{})
    done := make(chan struct{})
    
    // 打印奇数
    go func() {
        for i := 1; i <= 100; i += 2 {
            <-ch1
            fmt.Printf("Goroutine1: %d\n", i)
            ch2 <- struct{}{}
        }
    }()
    
    // 打印偶数
    go func() {
        for i := 2; i <= 100; i += 2 {
            <-ch2
            fmt.Printf("Goroutine2: %d\n", i)
            if i < 100 {
                ch1 <- struct{}{}
            } else {
                close(done)
            }
        }
    }()
    
    ch1 <- struct{}{}  // 启动
    <-done
}

// 3. 超时处理
func doWithTimeout(fn func() string, timeout time.Duration) (string, error) {
    result := make(chan string, 1)
    
    go func() {
        result <- fn()
    }()
    
    select {
    case res := <-result:
        return res, nil
    case <-time.After(timeout):
        return "", fmt.Errorf("执行超时")
    }
}

func main() {
    fmt.Println("=== 生产者-消费者 ===")
    producerConsumer()
    
    fmt.Println("\n=== 交替打印 ===")
    alternatePrint()
    
    fmt.Println("\n=== 超时处理 ===")
    result, err := doWithTimeout(func() string {
        time.Sleep(500 * time.Millisecond)
        return "完成"
    }, 1*time.Second)
    
    if err != nil {
        fmt.Println("错误:", err)
    } else {
        fmt.Println("结果:", result)
    }
}

Channel是Go并发编程的核心,下一节学习包管理!

最近更新: 2025/12/27 13:26
Contributors: 王长安
Prev
Goroutine
Next
包管理