如何收听N频道? (dynamicselect语句)

开始执行两个goroutines的无限循环,我可以使用下面的代码:

收到味精后,将开始一个新的goroutine,并永远继续下去。

c1 := make(chan string) c2 := make(chan string) go DoShit(c1, 5) go DoShit(c2, 2) for ; true; { select { case msg1 := <-c1: fmt.Println("received ", msg1) go DoShit(c1, 1) case msg2 := <-c2: fmt.Println("received ", msg2) go DoShit(c2, 9) } } 

现在我想对N个例程有相同的行为,但在这种情况下select语句将如何查找?

这是我开始使用的代码,但是我很困惑如何编写select语句

 numChans := 2 //I keep the channels in this slice, and want to "loop" over them in the select statemnt var chans = [] chan string{} for i:=0;i<numChans;i++{ tmp := make(chan string); chans = append(chans, tmp); go DoShit(tmp, i + 1) //How shall the select statment be coded for this case? for ; true; { select { case msg1 := <-c1: fmt.Println("received ", msg1) go DoShit(c1, 1) case msg2 := <-c2: fmt.Println("received ", msg2) go DoShit(c2, 9) } } 

您可以使用reflection包中的Selectfunction来做到这一点:

func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)

select执行由案例列表描述的select操作。 与Go select语句一样,它会阻塞,直到至less有一个情况可以继续,进行统一的伪随机select,然后执行该情况。 它返回所选情况的索引,如果是接收操作,则返回接收到的值,并返回一个布尔值,指示该值是否与通道上的发送相对应(与通道已closures时接收到的零值相对)。

传入一组SelectCase结构体, SelectCase结构体标识要select的通道,操作的方向以及在发送操作的情况下发送的值。

所以你可以做这样的事情:

 cases := make([]reflect.SelectCase, len(chans)) for i, ch := range chans { cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)} } chosen, value, ok := reflect.Select(cases) # ok will be true if the channel has not been closed. ch := chans[chosen] msg := value.String() 

你可以在这里尝试一个更丰富的例子: http : //play.golang.org/p/8zwvSk4kjx

您可以通过将每个通道封装在一个将“消息”转发到共享“聚合”通道的goroutine中来实现。 例如:

 agg := make(chan string) for _, ch := range chans { go func(c chan string) { for msg := range c { agg <- msg } }(ch) } select { case msg <- agg: fmt.Println("received ", msg) } 

如果您需要知道消息源自哪个通道,则可以在将其转发到聚合通道之前将其包含在结构中,并附带任何额外的信息。

在我的(有限的)testing中,这种方法大大超出了使用reflection包的性能:

 $ go test dynamic_select_test.go -test.bench=. ... BenchmarkReflectSelect 1 5265109013 ns/op BenchmarkGoSelect 20 81911344 ns/op ok command-line-arguments 9.463s 

基准代码在这里

为了扩大对先前答案的一些评论并在此提供更清楚的比较,给出了相同的input,从中读取的一片通道以及用于调用每个值的function的两种方法的示例,这些值也需要知道渠道的价值来自。

这些方法之间有三个主要区别:

  • 复杂。 虽然这可能部分地成为读者的偏好,但我发现渠道方式更具有惯用,直接和可读性。

  • 性能。 在我的Xeon amd64系统上,goroutines +通道执行的reflection解决scheme大约有两个数量级(一般来说,Go中的reflection通常比较慢,只能在绝对需要时使用)。 当然,如果处理结果的函数或input通道中的值写入有明显的延迟,那么这个性能差异很容易变得微不足道。

  • 阻塞/缓冲语义。 这个的重要性取决于用例。 大多数情况下,它或者不重要,或者goroutine合并解决scheme中的轻微额外缓冲可能有助于吞吐量。 然而,如果希望拥有只有一个写入器被解锁的语义,并且任何其他写入器被解除封锁之前完全处理了它的值,那么只能通过reflection解决scheme来实现。

请注意,如果发送通道的“id”不是必需的,或者源通道永远不会closures,那么这两种方法都可以被简化。

Goroutine合并频道:

 // Process1 calls `fn` for each value received from any of the `chans` // channels. The arguments to `fn` are the index of the channel the // value came from and the string value. Process1 returns once all the // channels are closed. func Process1(chans []<-chan string, fn func(int, string)) { // Setup type item struct { int // index of which channel this came from string // the actual string item } merged := make(chan item) var wg sync.WaitGroup wg.Add(len(chans)) for i, c := range chans { go func(i int, c <-chan string) { // Reads and buffers a single item from `c` before // we even know if we can write to `merged`. // // Go doesn't provide a way to do something like: // merged <- (<-c) // atomically, where we delay the read from `c` // until we can write to `merged`. The read from // `c` will always happen first (blocking as // required) and then we block on `merged` (with // either the above or the below syntax making // no difference). for s := range c { merged <- item{i, s} } // If/when this input channel is closed we just stop // writing to the merged channel and via the WaitGroup // let it be known there is one fewer channel active. wg.Done() }(i, c) } // One extra goroutine to watch for all the merging goroutines to // be finished and then close the merged channel. go func() { wg.Wait() close(merged) }() // "select-like" loop for i := range merged { // Process each value fn(i.int, i.string) } } 

reflectionselect:

 // Process2 is identical to Process1 except that it uses the reflect // package to select and read from the input channels which guarantees // there is only one value "in-flight" (ie when `fn` is called only // a single send on a single channel will have succeeded, the rest will // be blocked). It is approximately two orders of magnitude slower than // Process1 (which is still insignificant if their is a significant // delay between incoming values or if `fn` runs for a significant // time). func Process2(chans []<-chan string, fn func(int, string)) { // Setup cases := make([]reflect.SelectCase, len(chans)) // `ids` maps the index within cases to the original `chans` index. ids := make([]int, len(chans)) for i, c := range chans { cases[i] = reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c), } ids[i] = i } // Select loop for len(cases) > 0 { // A difference here from the merging goroutines is // that `v` is the only value "in-flight" that any of // the workers have sent. All other workers are blocked // trying to send the single value they have calculated // where-as the goroutine version reads/buffers a single // extra value from each worker. i, v, ok := reflect.Select(cases) if !ok { // Channel cases[i] has been closed, remove it // from our slice of cases and update our ids // mapping as well. cases = append(cases[:i], cases[i+1:]...) ids = append(ids[:i], ids[i+1:]...) continue } // Process each value fn(ids[i], v.String()) } } 

[ Go游乐场的完整代码]