在node.js中协调并行执行

node.js的事件驱动编程模型使协调程序stream程变得有些棘手。

简单的顺序执行变成了嵌套的callback,这很容易(尽pipe有点复杂的写下来)。

但是并行执行呢? 假设你有三个可以并行运行的任务A,B,C,当他们完成时,你想把结果发送给任务D.

用叉/join模式,这将是

  • 叉子
  • 叉B
  • 叉C
  • joinA,B,C,运行D

如何在node.js中写入? 有没有最佳做法或食谱? 我是否每次都必须手动推出解决scheme ,还是有一些图书馆有帮​​手?

因为它是单线程的,所以在node.js中没有什么是真正的并行的。 但是,可以安排多个事件并按照事先不能确定的顺序运行。 而像数据库访问这样的东西实际上是“并行的”,因为数据库查询本身是在单独的线程中运行的,但是在完成时重新集成到事件stream中。

那么,你如何安排多个事件处理程序的callback呢? 那么,这是浏览器端JavaScript中animation中常用的一种技术:使用variables来追踪完成。

这听起来像是一个破解,而且听起来有点麻烦,留下一堆全局variables来进行跟踪,而用更less的语言就会出现这种情况。 但在JavaScript中,我们可以使用闭包:

function fork (async_calls, shared_callback) { var counter = async_calls.length; var callback = function () { counter --; if (counter == 0) { shared_callback() } } for (var i=0;i<async_calls.length;i++) { async_calls[i](callback); } } // usage: fork([A,B,C],D); 

在上面的例子中,我们通过假设asynchronous和callback函数不需要参数来保持代码简单。 您当然可以修改代码以将parameter passing给asynchronous函数,并使callback函数累积结果并将其传递给shared_callback函数。


附加答案:

实际上,即使这样, fork()函数也可以使用闭包将parameter passing给asynchronous函数:

 fork([ function(callback){ A(1,2,callback) }, function(callback){ B(1,callback) }, function(callback){ C(1,2,callback) } ],D); 

剩下唯一要做的就是积累A,B,C的结果并将它们传递给D.


更多的附加答案:

我无法抗拒。 在早餐时保持对此的思考。 下面是fork()的一个实现,它累积结果(通常作为parameter passing给callback函数):

 function fork (async_calls, shared_callback) { var counter = async_calls.length; var all_results = []; function makeCallback (index) { return function () { counter --; var results = []; // we use the arguments object here because some callbacks // in Node pass in multiple arguments as result. for (var i=0;i<arguments.length;i++) { results.push(arguments[i]); } all_results[index] = results; if (counter == 0) { shared_callback(all_results); } } } for (var i=0;i<async_calls.length;i++) { async_calls[i](makeCallback(i)); } } 

这很容易。 这使得fork()相当通用,可以用来同步多个非同类事件。

Node.js中的示例用法:

 // Read 3 files in parallel and process them together: function A (c){ fs.readFile('file1',c) }; function B (c){ fs.readFile('file2',c) }; function C (c){ fs.readFile('file3',c) }; function D (result) { file1data = result[0][1]; file2data = result[1][1]; file3data = result[2][1]; // process the files together here } fork([A,B,C],D); 

更新

这个代码是在像async.js或各种基于promise的库之类的库之前编写的。 我想相信,async.js是受这个启发,但我没有任何证据。 无论如何..如果你今天想这样做看看async.js或承诺。 只要考虑一下上面的答案,就可以很好地解释/ async.parallel这样的工作。

我相信现在“asynchronous”模块提供了这种并行function,并且与上面的fork函数大致相同。

期货模块有一个名为join的子模块,我喜欢使用:

将asynchronous调用联合在一起,类似于pthread_join对于线程的工作方式。

自述文件展示了使用自由式或使用Promise模式使用未来子模块的一些很好的例子。 来自文档的示例:

 var Join = require('join') , join = Join() , callbackA = join.add() , callbackB = join.add() , callbackC = join.add(); function abcComplete(aArgs, bArgs, cArgs) { console.log(aArgs[1] + bArgs[1] + cArgs[1]); } setTimeout(function () { callbackA(null, 'Hello'); }, 300); setTimeout(function () { callbackB(null, 'World'); }, 500); setTimeout(function () { callbackC(null, '!'); }, 400); // this must be called after all join.when(abcComplete); 

一个简单的解决scheme可能在这里: http : //howtonode.org/control-flow-part-ii滚动到并行操作。 另一种方法是使A,B和C共享相同的callback函数,如果所有三个函数都调用了callback函数,那么让函数运行D,则该函数具有全局函数或者至less不是函数的增量函数,当然你也必须把A,B和C的结果存储在某个地方。

另一个选项可以是Node的Step模块: https : //github.com/creationix/step

你可能想试试这个小图书馆: https : //www.npmjs.com/package/parallel-io

除了stream行的承诺和asynchronous库,还有第三种优雅的方式 – 使用“布线”:

 var l = new Wire(); funcA(l.branch('post')); funcB(l.branch('comments')); funcC(l.branch('links')); l.success(function(results) { // result will be object with results: // { post: ..., comments: ..., links: ...} }); 

https://github.com/garmoshka-mo/mo-wire