在Node.js中等待多个callback的方式

假设你需要做一些依赖临时文件的操作。 既然我们在这里谈论Node,那么这些操作显然是asynchronous的。 什么是等待所有操作完成的惯用方法,以便知道何时可以删除临时文件?

下面是一些代码,显示我想要做什么:

do_something(tmp_file_name, function(err) {}); do_something_other(tmp_file_name, function(err) {}); fs.unlink(tmp_file_name); 

但是,如果我这样写,第三个调用可以在前两个有机会使用该文件之前执行。 我需要一些方法来保证前两个调用已经完成(调用它们的callback函数),然后继续前进,而不是嵌套调用(并使它们在实践中同步)。

我想过在callback中使用事件发射器,并将计数器注册为接收器。 柜台将收到完成的事件,并计算有多less操作仍在进行中。 当最后一个完成时,它会删除该文件。 但是有一个竞争条件的风险,我不知道这通常是如何做这件事情。

Node人如何解决这类问题?

更新:

现在我build议看看:

  • 承诺

    Promise对象用于延迟和asynchronous计算。 Promise代表尚未完成的操作,但预计在未来。

    stream行的承诺库是蓝鸟 。 A会build议看看为什么承诺 。

    你应该使用承诺来打开这个:

     fs.readFile("file.json", function (err, val) { if (err) { console.error("unable to read file"); } else { try { val = JSON.parse(val); console.log(val.success); } catch (e) { console.error("invalid json in file"); } } }); 

    进入这个:

     fs.readFileAsync("file.json").then(JSON.parse).then(function (val) { console.log(val.success); }) .catch(SyntaxError, function (e) { console.error("invalid json in file"); }) .catch(function (e) { console.error("unable to read file"); }); 
  • 发电机:例如通过co 。

    基于生成器的nod​​ejs和浏览器的控制stream程良好性,使用promise,让你以一种很好的方式编写非阻塞代码。

     var co = require('co'); co(function *(){ // yield any promise var result = yield Promise.resolve(true); }).catch(onerror); co(function *(){ // resolve multiple promises in parallel var a = Promise.resolve(1); var b = Promise.resolve(2); var c = Promise.resolve(3); var res = yield [a, b, c]; console.log(res); // => [1, 2, 3] }).catch(onerror); // errors can be try/catched co(function *(){ try { yield Promise.reject(new Error('boom')); } catch (err) { console.error(err.message); // "boom" } }).catch(onerror); function onerror(err) { // log any uncaught errors // co will not throw any errors you do not handle!!! // HANDLE ALL YOUR ERRORS!!! console.error(err.stack); } 

如果我理解正确,我想你应该看看非常好的asynchronous库。 你应该特别看看这个系列 。 只是从github页面的片段副本:

 async.series([ function(callback){ // do some stuff ... callback(null, 'one'); }, function(callback){ // do some more stuff ... callback(null, 'two'); }, ], // optional callback function(err, results){ // results is now equal to ['one', 'two'] }); // an example using an object instead of an array async.series({ one: function(callback){ setTimeout(function(){ callback(null, 1); }, 200); }, two: function(callback){ setTimeout(function(){ callback(null, 2); }, 100); }, }, function(err, results) { // results is now equals to: {one: 1, two: 2} }); 

作为一个加号,这个库也可以在浏览器中运行。

最简单的方法是当您启动一个asynchronous操作时递增一个整数计数器,然后在callback中递减计数器。 根据复杂性,callback可以检查计数器为零,然后删除文件。

更复杂一点是维护一个对象列表,每个对象都有任何需要标识操作的属性(甚至可以是函数调用)以及状态码。 callback将设置状态码完成。

然后你将有一个等待的循环(使用process.nextTick ),并检查是否所有的任务完成。 这种方法相对于计数器的优点是,如果可以完成所有未完成的任务,则在发出所有任务之前,计数器技术会使您过早地删除文件。

 // simple countdown latch function CDL(countdown, completion) { this.signal = function() { if(--countdown < 1) completion(); }; } // usage var latch = new CDL(10, function() { console.log("latch.signal() was called 10 times."); }); 

没有“原生”解决scheme,但是有一百万个节点stream控制库 。 你可能会喜欢步骤:

 Step( function(){ do_something(tmp_file_name, this.parallel()); do_something_else(tmp_file_name, this.parallel()); }, function(err) { if (err) throw err; fs.unlink(tmp_file_name); } ) 

或者,正如迈克尔所说,柜台可能是一个更简单的解决scheme。 看看这个信号量模型 。 你会这样使用它:

 do_something1(file, queue('myqueue')); do_something2(file, queue('myqueue')); queue.done('myqueue', function(){ fs.unlink(file); }); 

我想提供另一个解决scheme,它在节点的核心部分利用编程范例的速度和效率:事件。

用Promises或模块devise来pipe理stream量控制的任何事情,比如async ,都可以使用事件和简单的状态机来完成,我相信这提供了一种比其他选项更容易理解的方法。

例如,假设您希望并行地总结多个文件的长度:

 const EventEmitter = require('events').EventEmitter; // simple event-driven state machine const sm = new EventEmitter(); // running state let context={ tasks: 0, // number of total tasks active: 0, // number of active tasks results: [] // task results }; const next = (result) => { // must be called when each task chain completes if(result) { // preserve result of task chain context.results.push(result); } // decrement the number of running tasks context.active -= 1; // when all tasks complete, trigger done state if(!context.active) { sm.emit('done'); } }; // operational states // start state - initializes context sm.on('start', (paths) => { const len=paths.length; console.log(`start: beginning processing of ${len} paths`); context.tasks = len; // total number of tasks context.active = len; // number of active tasks sm.emit('forEachPath', paths); // go to next state }); // start processing of each path sm.on('forEachPath', (paths)=>{ console.log(`forEachPath: starting ${paths.length} process chains`); paths.forEach((path) => sm.emit('readPath', path)); }); // read contents from path sm.on('readPath', (path) => { console.log(` readPath: ${path}`); fs.readFile(path,(err,buf) => { if(err) { sm.emit('error',err); return; } sm.emit('processContent', buf.toString(), path); }); }); // compute length of path contents sm.on('processContent', (str, path) => { console.log(` processContent: ${path}`); next(str.length); }); // when processing is complete sm.on('done', () => { const total = context.results.reduce((sum,n) => sum + n); console.log(`The total of ${context.tasks} files is ${total}`); }); // error state sm.on('error', (err) => { throw err; }); // ====================================================== // start processing - ok, let's go // ====================================================== sm.emit('start', ['file1','file2','file3','file4']); 

哪个会输出:

开始:开始处理4个path
 forEachPath:启动4个stream程链
   readPath:file1
   readPath:file2
   processContent:file1
   readPath:file3
   processContent:file2
   processContent:file3
   readPath:file4
   processContent:file4
共有4个文件是4021

请注意,stream程链任务的顺序取决于系统负载。

您可以将程序stream程设想为:

开始 - > forEachPath  -  +  - > readPath 1 - > processContent 1 -  +  - >完成
                       +  - > readFile 2 - > processContent 2 -  +
                       +  - > readFile 3 - > processContent 3 -  +
                       +  - > readFile 4 - > processContent 4 -  +

为了重复使用,创build一个模块来支持各种stream控制模式,即串行,并行,批处理,同时,等等,将是微不足道的。

最简单的解决方法是运行do_something *并按照以下顺序取消链接:

 do_something(tmp_file_name, function(err) { do_something_other(tmp_file_name, function(err) { fs.unlink(tmp_file_name); }); }); 

除非出于性能方面的考虑,否则,您希望并行地执行do_something()和do_something_other(),所以我build议保持简单并采用这种方式。

等等。https ://github.com/luciotato/waitfor

使用Wait.for:

 var wait=require('wait.for'); ...in a fiber... wait.for(do_something,tmp_file_name); wait.for(do_something_other,tmp_file_name); fs.unlink(tmp_file_name);