并行化最大进程数量的Bash脚本
比方说,我有一个在Bash循环:
for foo in `some-command` do do-something $foo done
do-something
是CPU绑定,我有一个不错的shiny4核心处理器。 我希望能够一次运行多达4 do-something
。
天真的做法似乎是:
for foo in `some-command` do do-something $foo & done
这样会同时运行所有的 do-something
,但是有一些缺点,主要是do-something也可能有一些重要的I / O,一次执行可能会减慢一点。 另一个问题是这个代码块会立即返回,所以当所有的do-something
完成时都无法做其他的工作。
你将如何编写这个循环,所以总是有X个do-something
运行?
取决于你想要做什么xargs也可以帮助(在这里:用pdf2ps转换文档):
cpus=$( ls -d /sys/devices/system/cpu/cpu[[:digit:]]* | wc -w ) find . -name \*.pdf | xargs --max-args=1 --max-procs=$cpus pdf2ps
从文档:
--max-procs=max-procs -P max-procs Run up to max-procs processes at a time; the default is 1. If max-procs is 0, xargs will run as many processes as possible at a time. Use the -n option with -P; otherwise chances are that only one exec will be done.
使用GNU并行http://www.gnu.org/software/parallel/你可以写:;
some-command | parallel do-something
GNU Parallel还支持在远程计算机上运行作业。 这将在远程计算机上每CPU核心运行一个 – 即使它们具有不同数量的核心:
some-command | parallel -S server1,server2 do-something
一个更高级的例子:在这里,我们列出了我们希望my_script运行的文件。 文件有扩展名(也许是.jpeg)。 我们希望将my_script的输出放在basename.out文件的旁边(例如foo.jpeg – > foo.out)。 我们希望为计算机的每个内核运行一次my_script,我们也想在本地计算机上运行它。 对于远程计算机,我们希望将要处理的文件传输到给定的计算机。 当my_script完成时,我们希望将foo.out传回来,然后我们要从远程计算机中删除foo.jpeg和foo.out:
cat list_of_files | \ parallel --trc {.}.out -S server1,server2,: \ "my_script {} > {.}.out"
GNU并行可以确保每个作业的输出不混合,所以你可以使用输出作为另一个程序的input:
some-command | parallel do-something | postprocess
有关更多示例,请参阅video: https : //www.youtube.com/playlist?list = PL284C9FF2488BC6D1
maxjobs = 4 parallelize(){ 而[$#-gt 0]; 做 jobcnt =(`jobs -p`) if [$ {#jobcnt [@]} -lt $ maxjobs]; 然后 做某事$ 1& 转移 其他 睡1 科幻 DONE 等待 } 并行化arg1 arg2“5个参数到第三个工作”arg4 ...
使用Makefile,然后使用make -jX
指定同时作业的数量,其中X是一次运行的作业数量,而不是简单的bash。
或者可以使用wait
(“ man wait
”):启动几个subprocess,调用wait
– 当subprocess结束时它会退出。
maxjobs = 10 foreach line in `cat file.txt` { jobsrunning = 0 while jobsrunning < maxjobs { do job & jobsrunning += 1 } wait } job ( ){ ... }
如果您需要存储作业的结果,则将其结果分配给一个variables。 wait
之后,只需检查variables包含的内容。
也许尝试一个并行化工具,而不是重写循环? 我是xjobs的忠实粉丝。 我一直使用xjobs在我们的networking上批量复制文件,通常是在build立一个新的数据库服务器的时候。 http://www.maier-komor.de/xjobs.html
这里可以插入.bashrc并用于日常的一个class轮的替代解决scheme:
function pwait() { while [ $(jobs -p | wc -l) -ge $1 ]; do sleep 1 done }
要使用它,所有人必须做的就是在作业和一个pwait调用之后,参数给出并行进程的数量:
for i in *; do do_something $i & pwait 10 done
用wait
而不是忙于等待jobs -p
的输出会更好,但似乎没有一个明显的解决scheme,等待任何给定的作业完成而不是全部完成。
在bash
这样做可能是不可能的,你可以很容易地做一个半正确的select。 bstark
给了一个公平的正确的接近,但他有以下缺陷:
- 分词:你不能传递任何在其参数中使用以下任何字符的作业:空格,制表符,换行符,星号,问号。 如果你这样做,事情可能会意外地中断。
- 它依赖于脚本的其余部分来不做任何背景。 如果你这样做,或者后来你添加了一些东西在后台发送的脚本,因为你忘了你不允许使用后台工作,因为他的片段,事情会打破。
另一个没有这些缺陷的近似值如下:
scheduleAll() { local job i=0 max=4 pids=() for job; do (( ++i % max == 0 )) && { wait "${pids[@]}" pids=() } bash -c "$job" & pids+=("$!") done wait "${pids[@]}" }
请注意,这一个很容易适应,也检查每个作业结束的退出代码,所以你可以警告用户,如果一个工作失败或根据失败的工作量或什么设置退出代码scheduleAll
。
这个代码的问题就是:
- 它一次安排四个(在这种情况下)工作,然后等待所有四个工作结束。 有些可能会比其他的更早完成,这将导致下一批四个作业等待,直到上一批的最长时间完成。
解决这个最后一个问题的解决scheme将不得不使用kill -0
来轮询是否有任何进程已经消失,而不是wait
并安排下一个任务。 然而,这引出了一个小小的新问题:在工作结束之间存在竞争条件,并且kill -0
检查是否结束。 如果工作结束了,系统上的另一个进程同时启动,随机的PID恰好是刚刚完成的工作,那么kill -0
将不会注意到你的工作已经完成,事情将会再次打破。
bash
不可能有完美的解决scheme。
如果您熟悉make
命令,大多数情况下您可以将您想要作为makefile运行的命令列表表示出来。 例如,如果您需要在文件* .input上运行$ SOME_COMMAND,而每个文件生成* .output,则可以使用makefile
INPUT = a.input b.input OUTPUT = $(INPUT:.input = .output) %。输出input $(SOME_COMMAND)$ <$ @ 全部:$(OUTPUT)
然后就跑了
make -j <NUMBER>
并行运行最多NUMBER个命令。
我工作的项目使用wait命令来控制并行shell(实际上是ksh)进程。 为了解决您对IO的担忧,在现代操作系统上,并行执行可能会提高效率。 如果所有进程正在读取磁盘上的同一个块,则只有第一个进程必须碰到物理硬件。 其他进程通常可以从内存中的操作系统的磁盘caching中检索该块。 显然,从内存中读取比从磁盘读取快几个数量级。 而且,这个好处不需要编码改变。
函数为bash:
parallel () { awk "BEGIN{print \"all: ALL_TARGETS\\n\"}{print \"TARGET_\"NR\":\\n\\t@-\"\$0\"\\n\"}END{printf \"ALL_TARGETS:\";for(i=1;i<=NR;i++){printf \" TARGET_%d\",i};print\"\\n\"}" | make $@ -f - all }
使用:
cat my_commands | parallel -j 4
这对于大多数目的来说可能是足够好的,但并不是最佳的。
#!/bin/bash n=0 maxjobs=10 for i in *.m4a ; do # ( DO SOMETHING ) & # limit jobs if (( $(($((++n)) % $maxjobs)) == 0 )) ; then wait # wait until all have finished (not optimal, but most times good enough) echo $n wait fi done
你可以使用一个简单的嵌套for循环(用下面的N和Mreplace适当的整数):
for i in {1..N}; do (for j in {1..M}; do do_something; done & ); done
这将在M轮中执行do_somethingN * M次,每轮执行N个作业并行。 您可以使N等于您拥有的CPU数量。
这是我如何设法解决这个问题在bash脚本中:
#! /bin/bash MAX_JOBS=32 FILE_LIST=($(cat ${1})) echo Length ${#FILE_LIST[@]} for ((INDEX=0; INDEX < ${#FILE_LIST[@]}; INDEX=$((${INDEX}+${MAX_JOBS})) )); do JOBS_RUNNING=0 while ((JOBS_RUNNING < MAX_JOBS)) do I=$((${INDEX}+${JOBS_RUNNING})) FILE=${FILE_LIST[${I}]} if [ "$FILE" != "" ];then echo $JOBS_RUNNING $FILE ./M22Checker ${FILE} & else echo $JOBS_RUNNING NULL & fi JOBS_RUNNING=$((JOBS_RUNNING+1)) done wait done
我的解决scheme始终保持运行的给定数量的进程,保持跟踪错误和处理不可中断/僵尸进程:
function log { echo "$1" } # Take a list of commands to run, runs them sequentially with numberOfProcesses commands simultaneously runs # Returns the number of non zero exit codes from commands function ParallelExec { local numberOfProcesses="${1}" # Number of simultaneous commands to run local commandsArg="${2}" # Semi-colon separated list of commands local pid local runningPids=0 local counter=0 local commandsArray local pidsArray local newPidsArray local retval local retvalAll=0 local pidState local commandsArrayPid IFS=';' read -r -a commandsArray <<< "$commandsArg" log "Runnning ${#commandsArray[@]} commands in $numberOfProcesses simultaneous processes." while [ $counter -lt "${#commandsArray[@]}" ] || [ ${#pidsArray[@]} -gt 0 ]; do while [ $counter -lt "${#commandsArray[@]}" ] && [ ${#pidsArray[@]} -lt $numberOfProcesses ]; do log "Running command [${commandsArray[$counter]}]." eval "${commandsArray[$counter]}" & pid=$! pidsArray+=($pid) commandsArrayPid[$pid]="${commandsArray[$counter]}" counter=$((counter+1)) done newPidsArray=() for pid in "${pidsArray[@]}"; do # Handle uninterruptible sleep state or zombies by ommiting them from running process array (How to kill that is already dead ? :) if kill -0 $pid > /dev/null 2>&1; then pidState=$(ps -p$pid -o state= 2 > /dev/null) if [ "$pidState" != "D" ] && [ "$pidState" != "Z" ]; then newPidsArray+=($pid) fi else # pid is dead, get it's exit code from wait command wait $pid retval=$? if [ $retval -ne 0 ]; then log "Command [${commandsArrayPid[$pid]}] failed with exit code [$retval]." retvalAll=$((retvalAll+1)) fi fi done pidsArray=("${newPidsArray[@]}") # Add a trivial sleep time so bash won't eat all CPU sleep .05 done return $retvalAll }
用法:
cmds="du -csh /var;du -csh /tmp;sleep 3;du -csh /root;sleep 10; du -csh /home" # Execute 2 processes at a time ParallelExec 2 "$cmds" # Execute 4 processes at a time ParallelExec 4 "$cmds"
$ DOMAINS =“some命令中的某个域的列表”,用于some-command
foo
eval `some-command for $DOMAINS` & job[$i]=$! i=$(( i + 1))
DONE
Ndomains = echo $DOMAINS |wc -w
对于我在$(seq 1 1 $ Ndomains)做回声“等待$ {job [$ i]}”等待“$ {job [$ i]}”完成
在这个概念将为并行工作。 重要的是eval的最后一行是“&”,它将把命令放在背景上。