连续对一个mongodb游标进行迭代(在移动到下一个文档之前等待callback)
使用mongoskin,我可以做这样的查询,这将返回一个游标:
myCollection.find({}, function(err, resultCursor) { resultCursor.each(function(err, result) { } }
不过,我想为每个文档调用一些asynchronous函数,并且在这个callback之后才移动到游标上的下一个项目(类似于async.js模块中的eachSeries结构)。 例如:
myCollection.find({}, function(err, resultCursor) { resultCursor.each(function(err, result) { externalAsyncFunction(result, function(err) { //externalAsyncFunction completed - now want to move to next doc }); } }
我怎么能这样做?
谢谢
更新:
我不想使用toArray()
因为这是一个大的批处理操作,并且结果可能不适合内存。
如果您不想使用toArray将所有结果加载到内存中,则可以使用类似于以下内容的光标进行迭代。
myCollection.find({}, function(err, resultCursor) { function processItem(err, item) { if(item === null) { return; // All done! } externalAsyncFunction(item, function(err) { resultCursor.nextObject(processItem); }); } resultCursor.nextObject(processItem); }
使用async
/ await
更现代的方法:
const cursor = db.collection("foo").find({}); while(await cursor.hasNext()) { const doc = await cursor.next(); // process doc here }
笔记:
- 当asynchronous迭代器到达时,这可能更简单。
- 你可能会想添加try / catch来进行错误检查。
- 包含函数应该是
async
或者代码应该被包装在(async function() { ... })()
因为它使用了await
。 - 如果你想添加
await new Promise(resolve => setTimeout(resolve, 1000));
(暂停1秒)在while循环的结尾处,以显示它一个接一个地处理文档。
这通过使用setImmediate对大数据集起作用:
var cursor = collection.find({filter...}).cursor(); cursor.nextObject(function fn(err, item) { if (err || !item) return; setImmediate(fnAction, item, arg1, arg2, function() { cursor.nextObject(fn); }); }); function fnAction(item, arg1, arg2, callback) { // Here you can do whatever you want to do with your item. return callback(); }
如果有人正在寻找这样做的承诺方式(而不是使用nextObject的callback),在这里。 我正在使用Node v4.2.2和mongo驱动程序v2.1.7。 这是Cursor.forEach()
的一个asyncSeries版本:
function forEachSeries(cursor, iterator) { return new Promise(function(resolve, reject) { var count = 0; function processDoc(doc) { if (doc != null) { count++; return iterator(doc).then(function() { return cursor.next().then(processDoc); }); } else { resolve(count); } } cursor.next().then(processDoc); }); }
为了使用这个,传递游标和一个迭代器,这个迭代器asynchronous地处理每个文档(就像Cursor.forEach一样)。 迭代器需要返回一个promise,就像大多数mongodb本地驱动函数一样。
说,你想更新收集test
中的所有文件。 这是你如何做到这一点:
var theDb; MongoClient.connect(dbUrl).then(function(db) { theDb = db; // save it, we'll need to close the connection when done. var cur = db.collection('test').find(); return forEachSeries(cur, function(doc) { // this is the iterator return db.collection('test').updateOne( {_id: doc._id}, {$set: {updated: true}} // or whatever else you need to change ); // updateOne returns a promise, if not supplied a callback. Just return it. }); }) .then(function(count) { console.log("All Done. Processed", count, "records"); theDb.close(); })
你可以得到一个Array
的结果,并使用recursion函数迭代,就像这样。
myCollection.find({}).toArray(function (err, items) { var count = items.length; var fn = function () { externalAsyncFuntion(items[count], function () { count -= 1; if (count) fn(); }) } fn(); });
你可以用async lib做这样的事情。 这里的关键是检查当前的文档是否为空。 如果是,那就意味着你已经完成了。
async.series([ function (cb) { cursor.each(function (err, doc) { if (err) { cb(err); } else if (doc === null) { cb(); } else { console.log(doc); array.push(doc); } }); } ], function (err) { callback(err, array); });
你可以使用未来:
myCollection.find({}, function(err, resultCursor) { resultCursor.count(Meteor.bindEnvironment(function(err,count){ for(var i=0;i<count;i++) { var itemFuture=new Future(); resultCursor.nextObject(function(err,item)){ itemFuture.result(item); } var item=itemFuture.wait(); //do what you want with the item, //and continue with the loop if so } })); });
你可以使用简单的setTimeOut's。 这是在nodejs上运行的typecript中的一个例子(我通过'when'模块使用promise,但是也可以在没有它们的情况下完成):
import mongodb = require("mongodb"); var dbServer = new mongodb.Server('localhost', 27017, {auto_reconnect: true}, {}); var db = new mongodb.Db('myDb', dbServer); var util = require('util'); var when = require('when'); //npm install when var dbDefer = when.defer(); db.open(function() { console.log('db opened...'); dbDefer.resolve(db); }); dbDefer.promise.then(function(db : mongodb.Db){ db.collection('myCollection', function (error, dataCol){ if(error) { console.error(error); return; } var doneReading = when.defer(); var processOneRecordAsync = function(record) : When.Promise{ var result = when.defer(); setTimeout (function() { //simulate a variable-length operation console.log(util.inspect(record)); result.resolve('record processed'); }, Math.random()*5); return result.promise; } var runCursor = function (cursor : MongoCursor){ cursor.next(function(error : any, record : any){ if (error){ console.log('an error occurred: ' + error); return; } if (record){ processOneRecordAsync(record).then(function(r){ setTimeout(function() {runCursor(cursor)}, 1); }); } else{ //cursor up doneReading.resolve('done reading data.'); } }); } dataCol.find({}, function(error, cursor : MongoCursor){ if (!error) { setTimeout(function() {runCursor(cursor)}, 1); } }); doneReading.promise.then(function(message : string){ //message='done reading data' console.log(message); }); }); });