212 votes

Comment écouter les modifications apportées à une collection MongoDB ?

Je suis en train de créer une sorte de système de file d'attente pour les travaux en arrière-plan avec MongoDB comme magasin de données. Comment puis-je "écouter" les insertions dans une collection MongoDB avant de créer des travailleurs pour traiter le travail ?

Dois-je interroger toutes les quelques secondes pour voir s'il y a des changements par rapport à la dernière fois, ou existe-t-il un moyen pour mon script d'attendre que les insertions se produisent ?

Il s'agit d'un projet PHP sur lequel je travaille, mais n'hésitez pas à répondre en Ruby ou dans un autre langage.

2 votes

Change Streams a été ajouté à MongoDB 3.6 pour répondre à votre scénario. docs.mongodb.com/manual/changeStreams De plus, si vous utilisez MongoDB Atlas, vous pouvez tirer parti des déclencheurs Stitch qui vous permettent d'exécuter des fonctions en réponse aux insertions/mises à jour/suppressions/etc. docs.mongodb.com/stitch/triggers/overview Il n'est plus nécessaire d'analyser l'oplog.

115voto

Gates VP Points 26481

Ce à quoi vous pensez ressemble beaucoup aux déclencheurs. MongoDB ne prend pas en charge les déclencheurs, mais certaines personnes se sont débrouillées seules en utilisant des astuces. La clé ici est l'oplog.

Lorsque vous exécutez MongoDB dans un ensemble de répliques, toutes les actions de MongoDB sont enregistrées dans un journal des opérations (connu sous le nom d'oplog). L'oplog est essentiellement une liste des modifications apportées aux données. Les ensembles de répliques fonctionnent en écoutant les changements sur cet oplog et en appliquant les changements localement.

Cela vous semble-t-il familier ?

Je ne peux pas détailler l'ensemble du processus ici, il s'agit de plusieurs pages de documentation, mais les outils dont vous avez besoin sont disponibles.

D'abord un peu d'écriture - Brève description - Disposition de la local collection (qui contient l'oplog)

Vous voudrez également tirer parti curseurs disponibles . Cela vous permettra d'écouter les changements au lieu de les demander. Notez que la réplication utilise des curseurs disponibles, il s'agit donc d'une fonctionnalité prise en charge.

1 votes

Hmm... ce n'est pas exactement ce que j'avais en tête. Je n'utilise qu'une seule instance pour l'instant (pas d'esclaves). Alors peut-être une solution plus basique ?

17 votes

Vous pouvez démarrer le serveur à l'aide de la commande --replSet et il créera / remplira le oplog . Même sans le secondaire. C'est certainement la seule façon d'"écouter" les changements dans la base de données.

2 votes

Voici une bonne description de la façon de configurer oplog pour enregistrer localement les modifications apportées à la base de données : loosexaml.wordpress.com/2012/09/03/

107voto

Andrew Points 30079

MongoDB dispose de ce que l'on appelle capped collections y tailable cursors qui permet à MongoDB d'envoyer des données aux auditeurs.

A capped collection est essentiellement une collection de taille fixe qui n'autorise que les insertions. Voici à quoi ressemblerait la création d'une telle collection :

db.createCollection("messages", { capped: true, size: 100000000 })

Curseurs mongoDB disponibles ( article original de Jonathan H. Wage )

Rubis

coll = db.collection('my_collection')
cursor = Mongo::Cursor.new(coll, :tailable => true)
loop do
  if doc = cursor.next_document
    puts doc
  else
    sleep 1
  end
end

PHP

$mongo = new Mongo();
$db = $mongo->selectDB('my_db')
$coll = $db->selectCollection('my_collection');
$cursor = $coll->find()->tailable(true);
while (true) {
    if ($cursor->hasNext()) {
        $doc = $cursor->getNext();
        print_r($doc);
    } else {
        sleep(1);
    }
}

Python (par Robert Stewart)

from pymongo import Connection
import time

db = Connection().my_db
coll = db.my_collection
cursor = coll.find(tailable=True)
while cursor.alive:
    try:
        doc = cursor.next()
        print doc
    except StopIteration:
        time.sleep(1)

Perl (par Max )

use 5.010;

use strict;
use warnings;
use MongoDB;

my $db = MongoDB::Connection->new;
my $coll = $db->my_db->my_collection;
my $cursor = $coll->find->tailable(1);
for (;;)
{
    if (defined(my $doc = $cursor->next))
    {
        say $doc;
    }
    else
    {
        sleep 1;
    }
}

Ressources complémentaires :

Tutoriel Ruby/Node.js qui vous guide dans la création d'une application qui écoute les insertions dans une collection plafonnée de MongoDB.

Un article traitant plus en détail des curseurs disponibles.

Exemples d'utilisation de curseurs disponibles en PHP, Ruby, Python et Perl.

74 votes

Sleep 1 ? vraiment ? pour un code de production ? en quoi cela ne constitue-t-il pas un sondage ?

2 votes

@rbp haha, je n'ai jamais dit que c'était du code de production, mais vous avez raison, dormir une seconde n'est pas une bonne pratique. Je suis presque sûr que cet exemple vient d'ailleurs. Je ne suis pas sûr de savoir comment le refactoriser.

2 votes

Lol, il montrait juste des curseurs disponibles ! il a fait son travail, pourquoi s'embêter avec sleep 1 ! c'est de loin la chose la plus hors de propos de ce post ! c'était une excellente réponse !

50voto

Rio Weber Points 994

Regardez ça : Changez de flux

10 janvier 2018 - Version 3.6

*EDIT : J'ai écrit un article sur la façon de procéder. https://medium.com/riow/mongodb-data-collection-change-85b63d96ff76

https://docs.mongodb.com/v3.6/changeStreams/


C'est nouveau en mongodb 3.6 https://docs.mongodb.com/manual/release-notes/3.6/ 2018/01/10

$ mongod --version
db version v3.6.2

Pour utiliser changeStreams la base de données doit être un Ensemble de réplication

En savoir plus sur Replicatio https://docs.mongodb.com/manual/replication/

Votre base de données sera un " Autonome "par défaut.

Comment convertir un ensemble autonome en ensemble réplique : https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/


Les éléments suivants ejemplo est une application pratique de cette méthode.
* Spécifiquement pour Node.

/* file.js */
'use strict'

module.exports = function (
    app,
    io,
    User // Collection Name
) {
    // SET WATCH ON COLLECTION 
    const changeStream = User.watch();  

    // Socket Connection  
    io.on('connection', function (socket) {
        console.log('Connection!');

        // USERS - Change
        changeStream.on('change', function(change) {
            console.log('COLLECTION CHANGED');

            User.find({}, (err, data) => {
                if (err) throw err;

                if (data) {
                    // RESEND ALL USERS
                    socket.emit('users', data);
                }
            });
        });
    });
};
/* END - file.js */

Liens utiles :
https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set
https://docs.mongodb.com/manual/tutorial/change-streams-example

https://docs.mongodb.com/v3.6/tutorial/change-streams-example
http://plusnconsulting.com/post/MongoDB-Change-Streams

46voto

Mitar Points 1621

Depuis MongoDB 3.6, il existe une nouvelle API de notification appelée Change Streams que vous pouvez utiliser à cette fin. Voir cet article de blog pour un exemple . Exemple :

cursor = client.my_db.my_collection.changes([
    {'$match': {
        'operationType': {'$in': ['insert', 'replace']}
    }},
    {'$match': {
        'newDocument.n': {'$gte': 1}
    }}
])

# Loops forever.
for change in cursor:
    print(change['newDocument'])

25voto

Robert Walters Points 411

La version 3.6 de MongoDB inclut désormais les flux de changement, qui sont essentiellement une API au-dessus de l'OpLog permettant des cas d'utilisation de type déclencheur/notification.

Voici un lien vers un exemple en Java : http://mongodb.github.io/mongo-java-driver/3.6/driver/tutorials/change-streams/

Un exemple NodeJS pourrait ressembler à quelque chose comme :

 var MongoClient = require('mongodb').MongoClient;
    MongoClient.connect("mongodb://localhost:22000/MyStore?readConcern=majority")
     .then(function(client){
       let db = client.db('MyStore')

       let change_streams = db.collection('products').watch()
          change_streams.on('change', function(change){
            console.log(JSON.stringify(change));
          });
      });

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X