你将如何定义在Golang一次执行的goroutines池?

TL; TR:请到最后一部分,告诉我你将如何解决这个问题。

我今天早上从Python开始使用Golang。 我想多次调用一个来自Go的闭源代码,带有一些并发性,带有不同的命令行参数。 我的结果代码工作得很好,但我想获得您的input,以改善它。 由于我处于早期学习阶段,所以我也会解释一下我的工作stream程。

为了简单起见,这里假设这个“外部闭源程序”是zenity ,一个Linux命令行工具,可以从命令行显示graphics消息框。

从Go调用可执行文件

所以,在Go中,我会这样做:

 package main import "os/exec" func main() { cmd := exec.Command("zenity", "--info", "--text='Hello World'") cmd.Run() } 

这应该是正确的。 请注意.Run().Start()后跟.Wait()的function相同。 这很好,但如果我只想执行一次这个程序,整个编程的东西就不值得。 所以让我们多次这样做。

调用可执行文件多次

现在,我有这个工作,我想多次调用我的程序,自定义命令行参数(这里只是为了简单起见)。

 package main import ( "os/exec" "strconv" ) func main() { NumEl := 8 // Number of times the external program is called for i:=0; i<NumEl; i++ { cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'") cmd.Run() } } 

好的,我们做到了! 但是我仍然看不到Go over Python的优点…这段代码实际上是以串行方式执行的。 我有一个多核CPU,我想利用它。 所以让我们添加一些并发的goroutines。

Goroutines,或使我的程序并行的方法

a)第一次尝试:只要添加“去”到处

让我们重写我们的代码,使事情更容易调用和重用,并添加着名的go关键字:

 package main import ( "os/exec" "strconv" ) func main() { NumEl := 8 for i:=0; i<NumEl; i++ { go callProg(i) // <--- There! } } func callProg(i int) { cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'") cmd.Run() } 

没有! 问题是什么? 所有的门厅都立即执行。 我真的不知道为什么zenity没有执行,但是AFAIK,Go程序在zenity外部程序之前退出,甚至可以初始化。 这是通过使用time.Sleep :等待几秒钟就足以让8个zenity的实例启动自己。 我不知道这是否可以被视为一个错误。

更糟的是,真正想要调用的真正的程序需要一段时间来执行。 如果我在4核CPU上并行执行这个程序的8个实例,会浪费一些时间来做很多上下文切换…我不知道Go的exec.Command有多简单,但是exec.Command 将会在8个时间里启动zenity 8个不同的线程。 更糟糕的是,我想要执行这个程序超过10万次。 一次在门厅做所有这些都不会有效。 不过,我想利用我的四核CPU!

b)第二次尝试:使用goroutines池

在线资源倾向于推荐使用sync.WaitGroup来进行这种工作。 这种方法的问题是,你基本上正在处理一批goroutines:如果我创build了4个成员的WaitGroup,Go程序将等待所有 4个外部程序完成,然后调用一批新的4个程序。 这是不高效的:CPU再次被浪费了。

其他一些资源build议使用缓冲通道来完成这项工作:

 package main import ( "os/exec" "strconv" ) func main() { NumEl := 8 // Number of times the external program is called NumCore := 4 // Number of available cores c := make(chan bool, NumCore - 1) for i:=0; i<NumEl; i++ { go callProg(i, c) c <- true // At the NumCoreth iteration, c is blocking } } func callProg(i int, c chan bool) { defer func () {<- c}() cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'") cmd.Run() } 

这看起来很难看。 渠道不是为了这个目的:我正在利用一个副作用。 我喜欢defer的概念,但我讨厌不得不声明一个函数(即使是lambda)来从我创build的虚拟通道中popup一个值。 哦,当然,使用虚拟通道本身就很丑陋。

c)第三次尝试:当所有的孩子都死了的时候死亡

现在我们快完成了。 我只是要考虑到另一个副作用:Go程序在所有的优秀的popup窗口closures之前closures。 这是因为当循环完成时(第8次迭代),没有什么能阻止程序的完成。 这一次, sync.WaitGroup会很有用。

 package main import ( "os/exec" "strconv" "sync" ) func main() { NumEl := 8 // Number of times the external program is called NumCore := 4 // Number of available cores c := make(chan bool, NumCore - 1) wg := new(sync.WaitGroup) wg.Add(NumEl) // Set the number of goroutines to (0 + NumEl) for i:=0; i<NumEl; i++ { go callProg(i, c, wg) c <- true // At the NumCoreth iteration, c is blocking } wg.Wait() // Wait for all the children to die close(c) } func callProg(i int, c chan bool, wg *sync.WaitGroup) { defer func () { <- c wg.Done() // Decrease the number of alive goroutines }() cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'") cmd.Run() } 

完成。

我的问题

  • 你是否知道其他合适的方法来限制一次执行的门厅数量?

我不是指线程; Go如何pipe理内部的goroutines是不相关的。 我的意思是限制一次启动的goroutines的数量: exec.Command每次调用时exec.Command创build一个新的线程,所以我应该控制它的调用次数。

  • 这段代码对你看起来不错吗?
  • 你知道在这种情况下如何避免使用虚拟频道吗?

我无法说服自己,这样的虚拟渠道是要走的路。

我会产生4个工作程序,从一个公共通道读取任务。 比其他人更快的Goroutines(因为他们的安排不同或碰巧得到简单的任务)会比其他人从这个渠道获得更多的任务。 除此之外,我会使用一个sync.WaitGroup等待所有工作人员完成。 剩下的部分就是任务的创build。 你可以在这里看到这个方法的一个示例实现:

 package main import ( "os/exec" "strconv" "sync" ) func main() { tasks := make(chan *exec.Cmd, 64) // spawn four worker goroutines var wg sync.WaitGroup for i := 0; i < 4; i++ { wg.Add(1) go func() { for cmd := range tasks { cmd.Run() } wg.Done() }() } // generate some tasks for i := 0; i < 10; i++ { tasks <- exec.Command("zenity", "--info", "--text='Hello from iteration n."+strconv.Itoa(i)+"'") } close(tasks) // wait for the workers to finish wg.Wait() } 

可能还有其他可行的方法,但我认为这是一个很容易理解的非常干净的解决scheme。

一个简单的方法来限制(同时执行f() N次,但最大maxConcurrency ),只是一个scheme:

 package main import ( "sync" ) const maxConcurrency = 4 // for example var throttle = make(chan int, maxConcurrency) func main() { const N = 100 // for example var wg sync.WaitGroup for i := 0; i < N; i++ { throttle <- 1 // whatever number wg.Add(1) go f(i, &wg, throttle) } wg.Wait() } func f(i int, wg *sync.WaitGroup, throttle chan int) { defer wg.Done() // whatever processing println(i) <-throttle } 

操场

我不会叫throttle通道“哑”。 恕我直言,这是一个优雅的方式(这不是我的发明,当然),如何限制并发。

顺便说一句:请注意,你忽略了从cmd.Run()返回的错误。

试试这个: https : //github.com/korovkin/limiter

  limiter := NewConcurrencyLimiter(10) limiter.Execute(func() { zenity(...) }) limiter.Wait()