从Linux上的命令队列并行处理(bash,python,ruby等等)

我有一个200个命令的列表/队列,我需要在Linux服务器的shell中运行。

我只想一次运行最多10个进程(从队列中)。 一些进程需要几秒钟才能完成,其他进程需要更长的时间。

当一个进程完成后,我希望下一个命令从队列中“popup”并执行。

有没有人有解决这个问题的代码?

进一步阐述:

有200件工作需要完成,在某种排队中。 我最多只想做10件工作。 当一个线程完成一项工作时,它应该询问队列中的下一项工作。 如果队列中没有更多的工作,线程就会死亡。 当所有的线程都已经死亡,这意味着所有的工作已经完成。

我试图解决的实际问题是使用imapsync将200邮箱从旧邮件服务器同步到新的邮件服务器。 一些用户有大邮箱,需要很长时间tto同步,其他用户有很小的邮箱和快速同步。

我想可以使用make和make -j xx命令来做到这一点。

也许这样的makefile

 all : usera userb userc.... usera: imapsync usera userb: imapsync userb .... 

make -j 10 -f makefile

在shell中,可以使用xargs排队并行命令处理。 例如,平时睡3次,每次睡1秒,共执行10次睡眠

 echo {1..10} | xargs -d ' ' -n1 -P3 sh -c 'sleep 1s' _ 

它会共睡4秒。 如果你有一个名字列表,并且想把名字传给执行的命令,再次执行3个命令并行执行

 cat names | xargs -n1 -P3 process_name 

会执行命令process_name aliceprocess_name bob等。

并行是为了这个目的而精心制作的。

 cat userlist | parallel imapsync 

Parallel与其他解决scheme相比,其中一个优点是它确保输出不混合。 在Parallel中做traceroute工作正常,例如:

 (echo foss.org.my; echo www.debian.org; echo www.freenetproject.org) | parallel traceroute 

对于这种工作PPSS写的是:并行处理shell脚本。 谷歌这个名字,你会发现它,我不会linkpam。

GNU make(也可能是其他的实现)具有-j参数,它控制一次运行多less个作业。 当一个工作完成后,将开始另一个。

那么,如果他们在很大程度上是相互独立的,我会认为:

 Initialize an array of jobs pending (queue, ...) - 200 entries Initialize an array of jobs running - empty while (jobs still pending and queue of jobs running still has space) take a job off the pending queue launch it in background if (queue of jobs running is full) wait for a job to finish remove from jobs running queue while (queue of jobs is not empty) wait for job to finish remove from jobs running queue 

请注意,主循环中的尾部testing意味着,如果“作业运行队列”在while循环迭代时有空间,可以防止循环过早终止。 我认为这个逻辑是正确的。

我可以很容易地看到如何在C中做到这一点 – 在Perl中也不会那么困难(因此在其他脚本语言(Python,Ruby,Tcl等)中并不太难)。 我一点也不确定我是否想在shell中执行它 – shell中的wait命令等待所有的孩子终止,而不是让某个孩子终止。

在Python中,你可以尝试:

 import Queue, os, threading # synchronised queue queue = Queue.Queue(0) # 0 means no maximum size # do stuff to initialise queue with strings # representing os commands queue.put('sleep 10') queue.put('echo Sleeping..') # etc # or use python to generate commands, eg # for username in ['joe', 'bob', 'fred']: # queue.put('imapsync %s' % username) def go(): while True: try: # False here means no blocking: raise exception if queue empty command = queue.get(False) # Run command. python also has subprocess module which is more # featureful but I am not very familiar with it. # os.system is easy :-) os.system(command) except Queue.Empty: return for i in range(10): # change this to run more/fewer threads threading.Thread(target=go).start() 

未经testing…

(当然,python本身是单线程的,但是在等待IO的时候仍然可以得到multithreading的好处)。

https://savannah.gnu.org/projects/parallel(gnu parallel)和pssh可能会有帮助。

如果你打算使用Python,我推荐使用Twisted 。

特别扭曲的亚军 。

Python的多处理模块似乎很适合你的问题。 这是一个支持按stream程进行线程化的高级包。

在zsh中使用简单的函数在不超过4个子壳中并行化作业,使用/ tmp中的locking文件。

唯一不重要的部分是第一次testing中的glob标志:

  • #q :在testing中启用文件名#q
  • [4] :仅返回第四个结果
  • N :忽略空结果错误

将它转换为posix应该很容易,不过会稍微冗长些。

不要忘记用\"来摆脱工作中的任何引用。

 #!/bin/zsh setopt extendedglob para() { lock=/tmp/para_$$_$((paracnt++)) # sleep as long as the 4th lock file exists until [[ -z /tmp/para_$$_*(#q[4]N) ]] { sleep 0.1 } # Launch the job in a subshell ( touch $lock ; eval $* ; rm $lock ) & # Wait for subshell start and lock creation until [[ -f $lock ]] { sleep 0.001 } } para "print A0; sleep 1; print Z0" para "print A1; sleep 2; print Z1" para "print A2; sleep 3; print Z2" para "print A3; sleep 4; print Z3" para "print A4; sleep 3; print Z4" para "print A5; sleep 2; print Z5" # wait for all subshells to terminate wait 

你能平行阐述你的意思吗? 这听起来像你需要在队列中实现某种locking,所以你的input不被select两次,等命令只运行一次。

大多数队列系统都会作弊 – 他们只是写一个巨型待办事项列表,然后select例如十个项目,工作,然后select下一个项目。 没有并行化。

如果你提供更多的细节,我相信我们可以帮助你。