多个goroutines在一个频道上收听
我有多个goroutines试图同时在同一个通道上接收。 似乎最后一个开始在渠道上接收的goroutine得到了价值。 这是在语言规范的某个地方还是未定义的行为?
c := make(chan string) for i := 0; i < 5; i++ { go func(i int) { <-c c <- fmt.Sprintf("goroutine %d", i) }(i) } c <- "hi" fmt.Println(<-c)
输出:
goroutine 4
在操场上的例子
编辑:
我只是意识到这比我想象的更复杂。 消息传遍所有的goroutines。
c := make(chan string) for i := 0; i < 5; i++ { go func(i int) { msg := <-c c <- fmt.Sprintf("%s, hi from %d", msg, i) }(i) } c <- "original" fmt.Println(<-c)
输出:
original, hi from 0, hi from 1, hi from 2, hi from 3, hi from 4
在操场上的例子
是的,这很复杂,但是有一些经验法则可以让事情变得更直接。
- 更喜欢使用正式的参数来传递你去传递的渠道 – 而不是访问全球范围内的渠道。 您可以通过这种方式获得更多的编译器检查,而且模块化也更好。
- 避免在特定的去程序 (包括“主” 程序)中在同一频道上进行阅读和写作 。 否则,死锁是一个更大的风险。
这是您的程序的替代版本,应用这两个准则。 这个案例在一个频道上演示了许多作者和一个读者:
c := make(chan string) for i := 1; i <= 5; i++ { go func(i int, co chan<- string) { for j := 1; j <= 5; j++ { co <- fmt.Sprintf("hi from %d.%d", i, j) } }(i, c) } for i := 1; i <= 25; i++ { fmt.Println(<-c) }
http://play.golang.org/p/quQn7xePLw
它创build了五个写入单个通道的例程,每个写入五次。 主程序读取所有二十五条消息 – 您可能会注意到它们出现的顺序通常不是顺序的(即并发性很明显)。
这个例子演示了Go频道的一个特点:有可能有多个作家共享一个频道; Go将自动交错消息。
一个频道上的一个作者和多个阅读者也是如此,如第二个例子所示:
c := make(chan int) var w sync.WaitGroup w.Add(5) for i := 1; i <= 5; i++ { go func(i int, ci <-chan int) { j := 1 for v := range ci { time.Sleep(time.Millisecond) fmt.Printf("%d.%d got %d\n", i, j, v) j += 1 } w.Done() }(i, c) } for i := 1; i <= 25; i++ { c <- i } close(c) w.Wait()
第二个例子包括对主要例程的一个等待,否则这个例程会立即退出,并导致其他五个例程提前终止(感谢olov进行这个更正) 。
在这两个例子中,都不需要缓冲。 仅将缓冲视为性能增强器通常是一个很好的原则。 如果你的程序在没有缓冲区的情况下不会死锁,它也不会与缓冲区发生死锁(但反过来并不总是对的)。 所以,作为另一个经验法则,开始没有缓冲,然后根据需要添加它 。
晚回复,但我希望这将有助于其他人像长投票,“全球”button,广播给大家?
Effective Go解释了这个问题:
接收器总是阻塞,直到有数据要接收。
这意味着你不能有超过1个goroutine监听1个频道,并期望所有goroutine接收相同的值。
运行这个代码示例 。
package main import "fmt" func main() { c := make(chan int) for i := 1; i <= 5; i++ { go func(i int) { for v := range c { fmt.Printf("count %d from goroutine #%d\n", v, i) } }(i) } for i := 1; i <= 25; i++ { c<-i } close(c) }
即使有5个goroutine正在收听频道,您也不会看到“count 1”。 这是因为当第一个goroutine阻塞通道时,所有其他的goroutine必须排队等候。 当通道被解除阻塞时,计数已经被接收并从通道中移除,所以下一个goroutine在行中得到下一个计数值。
这很复杂。
另外,看看GOMAXPROCS = NumCPU+1
会发生什么。 例如,
package main import ( "fmt" "runtime" ) func main() { runtime.GOMAXPROCS(runtime.NumCPU() + 1) fmt.Print(runtime.GOMAXPROCS(0)) c := make(chan string) for i := 0; i < 5; i++ { go func(i int) { msg := <-c c <- fmt.Sprintf("%s, hi from %d", msg, i) }(i) } c <- ", original" fmt.Println(<-c) }
输出:
5, original, hi from 0, hi from 4
而且,看看缓冲的频道会发生什么。 例如,
package main import "fmt" func main() { c := make(chan string, 5+1) for i := 0; i < 5; i++ { go func(i int) { msg := <-c c <- fmt.Sprintf("%s, hi from %d", msg, i) }(i) } c <- "original" fmt.Println(<-c) }
输出:
original
你也应该能够解释这些情况。
我研究了现有的解决scheme,并创build了简单的广播库https://github.com/grafov/bcast 。
group := bcast.NewGroup() // you created the broadcast group go bcast.Broadcasting(0) // the group accepts messages and broadcast it to all members member := group.Join() // then you join member(s) from other goroutine(s) member.Send("test message") // or send messages of any type to the group member1 := group.Join() // then you join member(s) from other goroutine(s) val := member1.Recv() // and for example listen for messages
对于多个goroutine在一个频道上收听,是的,这是可能的。 关键是消息本身,你可以定义一些这样的消息:
package main import ( "fmt" "sync" ) type obj struct { msg string receiver int } func main() { ch := make(chan *obj) // both block or non-block are ok var wg sync.WaitGroup receiver := 25 // specify receiver count sender := func() { o := &obj { msg: "hello everyone!", receiver: receiver, } ch <- o } recv := func(idx int) { defer wg.Done() o := <-ch fmt.Printf("%d received at %d\n", idx, o.receiver) o.receiver-- if o.receiver > 0 { ch <- o // forward to others } else { fmt.Printf("last receiver: %d\n", idx) } } go sender() for i:=0; i<reciever; i++ { wg.Add(1) go recv(i) } wg.Wait() }
输出是随机的:
5 received at 25 24 received at 24 6 received at 23 7 received at 22 8 received at 21 9 received at 20 10 received at 19 11 received at 18 12 received at 17 13 received at 16 14 received at 15 15 received at 14 16 received at 13 17 received at 12 18 received at 11 19 received at 10 20 received at 9 21 received at 8 22 received at 7 23 received at 6 2 received at 5 0 received at 4 1 received at 3 3 received at 2 4 received at 1 last receiver 4