Skip to content
On this page

Go 学习笔记(8):生产者消费者模型

单生产者多消费者

当场景为单生产者多消费者的时候,生产者执行结束就可以直接关闭通道,此时消费者自动从通道中拿数据,直到通道为空就退出,不会形成阻塞。

go
package main

import (
	"fmt"
	"sync"
	"time"
)

var (
	// 定义共享数据的通道
	itemCh = make(chan int, 20)
	cwg    sync.WaitGroup
)

func producter(ch chan int) {
	for i := 0; i < 10; i++ {
		time.Sleep(time.Second) // 模拟真实场景的处理时间
		ch <- i
		fmt.Printf("set num %d to ch\n", i)
	}
	close(ch)
}

func consumer(ch chan int, name string) {
	defer cwg.Done()
	for n := range ch {
		time.Sleep(time.Second * 2) // 模拟真实场景的处理时间
		fmt.Printf("%s:get num %d from ch\n", name, n)
	}
}

func main() {
	go producter(itemCh)

	// 启动多个消费者
	for i := 0; i < 5; i++ {
		go consumer(itemCh, fmt.Sprintf("c-%d", i))
		cwg.Add(1)
	}

	cwg.Wait()

	fmt.Println("main done!")
}

多生产者多消费者

下面是一个多生产者和多消费者的场景例子,生产者数量和消费者数量都是不定的,此时需要考虑何时关闭通道,比较好的时机是利用计数器,当生产者的计数器清理则表示所有生产者都执行结束,此时就可以安全的关闭通道。这个例子可以作为多线程爬虫的标准写法参考:

go
package main

import (
	"fmt"
	"sync"
	"time"
)

var (
	// 定义共享数据的通道
	itemCh = make(chan int, 20)
	// 分别定义生产者和消费者的计数器
	pwg sync.WaitGroup
	cwg sync.WaitGroup
)

func producter(num int, ch chan int, name string) {
	defer pwg.Done()
	time.Sleep(time.Second) // 模拟真实场景的处理时间
	ch <- num
	fmt.Printf("%s:set num %d to ch\n", name, num)
}

func consumer(ch chan int, name string) {
	defer cwg.Done()
	for n := range ch {
		time.Sleep(time.Second * 2) // 模拟真实场景的处理时间
		fmt.Printf("%s:get num %d from ch\n", name, n)
	}
}

func main() {
	// 启动多个生产者
	for i := 0; i < 10; i++ {
		go producter(i, itemCh, fmt.Sprintf("p-%d", i))
		pwg.Add(1)
	}

	// 启动多个消费者
	for i := 0; i < 5; i++ {
		go consumer(itemCh, fmt.Sprintf("c-%d", i))
		cwg.Add(1)
	}

	// 等待生产者执行结束前必须让消费者和生产者都运行起来
	pwg.Wait()
	close(itemCh) // 等所有生产者执行结束就关闭通道

	cwg.Wait()

	fmt.Println("main done!")
}

注意

在多生产者模型中,如果不及时关闭通道,会导致消费者一直从通道中拿数据,进而导致通道不会被自动回收,消费者会一直阻塞。

参考文档