如何监听MongoDB集合的更改?
我使用MongoDB作为数据存储创build了一种后台作业队列系统。 在产生工作人员来处理工作之前,我怎样才能“聆听”插入MongoDB集合? 我是否需要每隔几秒轮询一次以查看上次是否有任何更改,或者脚本是否可以等待插入发生? 这是我正在开发的一个PHP项目,但可以随时在Ruby或语言不可知的情况下回答。
你在想什么听起来很像触发器。 MongoDB对触发器没有任何支持,但是有些人使用一些技巧“翻身”。 这里的关键是oplog。
在副本集中运行MongoDB时,所有的MongoDB操作都被logging到操作日志(称为oplog)中。 oplog基本上只是一个对数据进行修改的运行列表。 通过监听此oplog上的更改,然后在本地应用更改,副本设置function。
这听起来很熟悉吗?
我不能在这里详细描述整个过程,这是几页文档,但是你需要的工具是可用的。
首先在oplog上写一些简单的说明 – 简要描述 – local
集合的布局 (包含oplog)
你也会想要利用可用的游标 。 这些将为您提供一种方式来倾听更改,而不是轮询它们。 请注意,复制使用可放大的游标,所以这是一个受支持的function。
MongoDB具有所谓的capped collections
和tailable cursors
,允许MongoDB将数据推送给听众。
capped collection
本质上是一个固定大小的集合,只允许插入。 以下是创build一个的样子:
db.createCollection("messages", { capped: true, size: 100000000 })
MongoDB Tailable游标( Jonathan H. Wage的原始文章 )
ruby
coll = db.collection('my_collection') cursor = Mongo::Cursor.new(coll, :tailable => true) loop do if doc = cursor.next_document puts doc else sleep 1 end end
PHP
$mongo = new Mongo(); $db = $mongo->selectDB('my_db') $coll = $db->selectCollection('my_collection'); $cursor = $coll->find()->tailable(true); while (true) { if ($cursor->hasNext()) { $doc = $cursor->getNext(); print_r($doc); } else { sleep(1); } }
Python ( 罗伯特·斯图尔特)
from pymongo import Connection import time db = Connection().my_db coll = db.my_collection cursor = coll.find(tailable=True) while cursor.alive: try: doc = cursor.next() print doc except StopIteration: time.sleep(1)
Perl (由Max )
use 5.010; use strict; use warnings; use MongoDB; my $db = MongoDB::Connection->new; my $coll = $db->my_db->my_collection; my $cursor = $coll->find->tailable(1); for (;;) { if (defined(my $doc = $cursor->next)) { say $doc; } else { sleep 1; } }
其他资源:
Ruby / Node.js教程引导你创build一个监听插入到MongoDB上限集合中的应用程序。
一篇文章更详细地讨论可放大的游标。
使用可放大光标的PHP,Ruby,Python和Perl示例。
或者,您可以使用标准的Mongo FindAndUpdate方法,并且在callback中,在callback运行时触发EventEmitter事件(在Node中)。
监听此事件的应用程序或体系结构的任何其他部分都将收到更新通知,并且还会发送相关数据。 这是实现Mongo通知的一个非常简单的方法。
由于MongoDB 3.6将会有一个新的通知API,你可以使用它。 看到这个博客文章为例 。 从它的例子:
cursor = client.my_db.my_collection.changes([ {'$match': { 'operationType': {'$in': ['insert', 'replace']} }}, {'$match': { 'newDocument.n': {'$gte': 1} }} ]) # Loops forever. for change in cursor: print(change['newDocument'])
有一个工作的java例子可以在这里find。
MongoClient mongoClient = new MongoClient(); DBCollection coll = mongoClient.getDatabase("local").getCollection("oplog.rs"); DBCursor cur = coll.find().sort(BasicDBObjectBuilder.start("$natural", 1).get()) .addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA); System.out.println("== open cursor =="); Runnable task = () -> { System.out.println("\tWaiting for events"); while (cur.hasNext()) { DBObject obj = cur.next(); System.out.println( obj ); } }; new Thread(task).start();
关键是这里给出的QUERY OPTIONS 。
如果你不需要每次加载所有的数据,你也可以改变find查询。
BasicDBObject query= new BasicDBObject(); query.put("ts", new BasicDBObject("$gt", new BsonTimestamp(1471952088, 1))); //timestamp is within some range query.put("op", "i"); //Only insert operation DBCursor cur = coll.find(query).sort(BasicDBObjectBuilder.start("$natural", 1).get()) .addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);
这些答案中的许多只会给你新的logging,而不是更新和/或是非常无效的
唯一可靠的,高性能的方法是在本地db:oplog.rs集合上创build一个可拖动的光标,以获得对MongoDB的所有更改,并使用它执行操作。 (MongoDB甚至可以或多或less地支持复制!)
oplog包含的解释: https ://www.compose.com/articles/the-mongodb-oplog-and-node-js/
一个Node.js库的例子,它提供了一个API,可以使用oplog完成什么工作: https : //github.com/cayasso/mongo-oplog
实际上,不是看输出,而是使用mongoose模式提供的中间件插入新东西时,为什么不注意
您可以捕获插入新文档的事件,并在插入完成后执行某些操作