这是Go的一个惯用的工作线程池吗?
我试图用goroutines写一个简单的工作池。
- 我写的地道代码是惯用的吗? 如果不是,那么应该改变什么?
- 我希望能够将工作线程的最大数量设置为5,并且阻止,直到一个工作人员变得可用,如果5个人都忙的话。 我怎样才能把这个扩展到最多只有5名工人? 我是否产生了静态的5个
work_channel
,并给每个work_channel
?
码:
package main import ( "fmt" "math/rand" "sync" "time" ) func worker(id string, work string, o chan string, wg *sync.WaitGroup) { defer wg.Done() sleepMs := rand.Intn(1000) fmt.Printf("worker '%s' received: '%s', sleep %dms\n", id, work, sleepMs) time.Sleep(time.Duration(sleepMs) * time.Millisecond) o <- work + fmt.Sprintf("-%dms", sleepMs) } func main() { var work_channel = make(chan string) var results_channel = make(chan string) // create goroutine per item in work_channel go func() { var c = 0 var wg sync.WaitGroup for work := range work_channel { wg.Add(1) go worker(fmt.Sprintf("%d", c), work, results_channel, &wg) c++ } wg.Wait() fmt.Println("closing results channel") close(results_channel) }() // add work to the work_channel go func() { for c := 'a'; c < 'z'; c++ { work_channel <- fmt.Sprintf("%c", c) } close(work_channel) fmt.Println("sent work to work_channel") }() for x := range results_channel { fmt.Printf("result: %s\n", x) } }
你的解决scheme在任何意义上都不是一个工作程序池:你的代码不会限制并发的goroutine,也不会“重用”goroutines(当收到一个新的job时,它总会启动一个新的goroutines)。
生产者 – 消费者模式
正如我在Bruteforce MD5密码破解者发布的 ,你可以使用生产者 – 消费者模式 。 你可以有一个指定的生产者例程来生成作业(要做/计算的事情),并将它们发送到工作频道。 你可以有一个固定的消费者例程库(例如5个),它将循环传递作业的通道,并且每个会执行/完成接收到的作业。
当生产和发送所有工作时, 生产者协调员可以简单地closuresjobs
渠道,正确地告知消费者没有更多的工作要来临。 通道上的for ... range
构造处理“closures”事件并正确终止。 请注意,closures频道之前发送的所有作业仍将被传送。
这将导致一个干净的devise,会导致固定的(但是任意的)goroutines数量,并且总是会使用100%的CPU(如果goroutines的数量大于#个CPU核心)。 它还具有可以通过正确select信道容量(缓冲信道)和消费者门户的数量来“节stream”的优点。
请注意,这个模型有一个指定的生产者goroutine不是强制性的。 你也可以有多个goroutines来生成作业,但是你必须同步它们,以便在所有的生产者例程完成作业时closuresjobs
通道 – 否则尝试在jobs
通道已经closures时发送另一个作业,导致运行时恐慌。 通常生产工作便宜,而且生产速度要快得多,所以这个模型可以在一个办公室生产,而很多人正在消费/执行,这在实践中是很好的。
处理结果:
如果工作有成果,您可以select有一个指定的结果渠道,其结果可以交付(“送回”),或者您可以select在工作完成/完成时在消费者处理结果。 后者甚至可以通过具有处理结果的“callback”函数来实现。 重要的是结果是可以独立处理还是需要合并(例如map-reduce框架)或者是合并的。
如果使用results
通道,还需要一个从其接收值的goroutine,防止消费者被阻止(如果results
缓冲区被填充,则会发生)。
results
通道
而不是发送简单的string
值作为工作和结果,我会创build一个包装types,可以容纳任何额外的信息,所以它更加灵活:
type Job struct { Id int Work string Result string }
请注意, Job
结构也包装结果,所以当我们发回结果时,它也包含原始的Job
作为上下文 – 通常非常有用 。 另外请注意,只在通道上发送指针( *Job
)而不是Job
值是有利的,所以不需要对Job
进行“无数次”的拷贝,而且Job
结构体的大小变得不相关。
下面是这个生产者 – 消费者如何看起来像:
我会使用2个sync.WaitGroup
值,他们的angular色将遵循:
var wg, wg2 sync.WaitGroup
生产者负责生成要执行的工作:
func produce(jobs chan<- *Job) { defer wg.Done() // Generate jobs: id := 0 for c := 'a'; c <= 'z'; c++ { id++ jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)} } close(jobs) }
完成后(没有更多的工作), jobs
渠道closures,这表明消费者没有更多的工作将到达。
请注意, produce()
将jobs
通道视为仅发送 ,因为生产者只需要这样做:在其上发送作业(除了closures作业,但在只发送通道上也是如此)。 在生产者意外收到将编译时间错误(编译时提前检测)。
消费者的责任是只要能够接收到工作就可以得到工作,并执行它们:
func consume(id int, jobs <-chan *Job, results chan<- *Job) { defer wg.Done() for job := range jobs { sleepMs := rand.Intn(1000) fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs) time.Sleep(time.Duration(sleepMs) * time.Millisecond) job.Result = job.Work + fmt.Sprintf("-%dms", sleepMs) results <- job } }
请注意, consume()
将jobs
通道视为仅接收 ; 消费者只需要从中接收 。 类似地, results
频道仅发送给消费者。
另外请注意, results
通道不能在这里closures,因为有多个消费者例程,只有第一个试图closures它会成功,进一步会导致运行时恐慌! results
通道可以(必须)在所有的用户程序结束后closures,因为那样我们可以确定在results
通道上不会有更多的值(结果)。
我们有需要分析的结果:
func analyze(results <-chan *Job) { defer wg2.Done() for job := range results { fmt.Printf("result: %s\n", job.Result) } }
正如你所看到的,只要他们可能会来,也会得到结果(直到results
通道被closures)。 分析仪的results
通道只能接收 。
请注意使用通道types:只要使用通道types,只需使用单向通道types即可在编译时尽早检测并防止错误。 只有使用双向通道types,如果你确实需要两个方向。
这就是所有这些都粘在一起的:
func main() { jobs := make(chan *Job, 100) // Buffered channel results := make(chan *Job, 100) // Buffered channel // Start consumers: for i := 0; i < 5; i++ { // 5 consumers wg.Add(1) go consume(i, jobs, results) } // Start producing go produce(jobs) // Start analyzing: wg2.Add(1) go analyze(results) wg.Wait() // Wait all consumers to finish processing jobs // All jobs are processed, no more values will be sent on results: close(results) wg2.Wait() // Wait analyzer to analyze all results }
示例输出:
这是一个输出示例:
正如你所看到的,结果在即将到来的所有工作之前就会得到分析:
worker #1 received: 'e', sleep 81ms worker #2 received: 'b', sleep 887ms worker #3 received: 'c', sleep 847ms worker #4 received: 'd', sleep 59ms worker #0 received: 'a', sleep 81ms worker #4 received: 'f', sleep 318ms result: d-59ms worker #0 received: 'h', sleep 540ms worker #1 received: 'g', sleep 425ms result: e-81ms result: a-81ms worker #4 received: 'i', sleep 456ms result: f-318ms worker #1 received: 'j', sleep 300ms result: g-425ms worker #0 received: 'k', sleep 694ms result: h-540ms worker #1 received: 'l', sleep 511ms result: j-300ms worker #4 received: 'm', sleep 162ms result: i-456ms worker #3 received: 'n', sleep 89ms result: c-847ms worker #2 received: 'o', sleep 728ms result: b-887ms worker #3 received: 'p', sleep 274ms result: n-89ms worker #4 received: 'q', sleep 211ms result: m-162ms worker #4 received: 'r', sleep 445ms result: q-211ms result: p-274ms worker #3 received: 's', sleep 237ms worker #0 received: 't', sleep 106ms result: k-694ms worker #1 received: 'u', sleep 495ms result: l-511ms result: t-106ms worker #0 received: 'v', sleep 466ms worker #3 received: 'w', sleep 528ms result: s-237ms worker #2 received: 'x', sleep 258ms result: o-728ms result: r-445ms worker #4 received: 'y', sleep 47ms worker #4 received: 'z', sleep 947ms result: y-47ms result: u-495ms result: x-258ms result: v-466ms result: w-528ms
尝试Go游乐场上的完整应用程序。
没有results
频道
如果我们不使用results
通道,代码会大大简化,但消费者例程会马上处理结果(在我们的例子中打印)。 在这种情况下,我们不需要2个sync.WaitGroup
值(第二个只需要等待分析器完成)。
没有results
渠道,完整的解决scheme是这样的:
var wg sync.WaitGroup type Job struct { Id int Work string } func produce(jobs chan<- *Job) { defer wg.Done() // Generate jobs: id := 0 for c := 'a'; c <= 'z'; c++ { id++ jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)} } close(jobs) } func consume(id int, jobs <-chan *Job) { defer wg.Done() for job := range jobs { sleepMs := rand.Intn(1000) fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs) time.Sleep(time.Duration(sleepMs) * time.Millisecond) fmt.Printf("result: %s\n", job.Work+fmt.Sprintf("-%dms", sleepMs)) } } func main() { jobs := make(chan *Job, 100) // Buffered channel // Start consumers: for i := 0; i < 5; i++ { // 5 consumers wg.Add(1) go consume(i, jobs) } // Start producing go produce(jobs) wg.Wait() // Wait all consumers to finish processing jobs }
输出与results
通道“相似”(但当然执行/完成顺序是随机的)。
在Go Playground上试试这个变种。
您可以实现计数信号来限制goroutine并发。
var tokens = make(chan struct{}, 20) func worker(id string, work string, o chan string, wg *sync.WaitGroup) { defer wg.Done() tokens <- struct{}{} // acquire a token before performing work sleepMs := rand.Intn(1000) fmt.Printf("worker '%s' received: '%s', sleep %dms\n", id, work, sleepMs) time.Sleep(time.Duration(sleepMs) * time.Millisecond) <-tokens // release the token o <- work + fmt.Sprintf("-%dms", sleepMs) }
这是用来限制工人数量的一般devise。 你当然可以改变释放/获取令牌的位置,以适应你的代码。