4

I'm attempting to listen for changes in my mongodb cluster, using change streams; however after following several tutorials, my final implementation using mongoose doesn't work. How can I use the current mongoose connection to listen for changes in the database

mongoose connection:

mongoose
.connect(db, {
    useNewUrlParser: true,
    useFindAndModify: false,
    useUnifiedTopology: true
    // useCreateIndex: true
})
.then(() => {
    console.log("Connected to MongoDB...");
})
.catch(err => {
    console.log(err);
});

Change stream:

const pipeline = { 
  $match: {
    $or: [{ operationType: 'insert' },{ operationType: 'update' }], 
    'fullDocument.institution': uniId 
  } 
};

const changeStream = Post.watch([pipeline], {fullDocument: 'updateLookup'});

changeStream.on("change", next => {
        switch(next.operationType) {
          case 'insert':
            console.log('an insert happened...', "uni_ID: ", next.fullDocument.institution);
            let rooms = Object.keys(socket.rooms);
            console.log("rooms: ", rooms);

            nmsps.emit('insert', {
              type: 'insert',
              msg: 'New question available',
              newPost: next.fullDocument
            });
            break;

          case 'update':
            console.log('an update happened...');

            nmsps.emit('update', {
              type: 'update',
              postId: next.documentKey._id,
              updateInfo: next.updateDescription.updatedFields,
              msg: "Question has been updated."
            });
            break;

          case 'delete':
            console.log('a delete happened...');

            nmsps.emit('delete', {
              type: 'delete',
              deletedId: next.documentKey._id,
              msg: 'Question has been deleted.'
            });
            break;

          default:
            break;
       }
 })

2 Answers 2

2

Because mongoose is using mongodb driver as the core module, you can use mongodb client to watch the change stream.

After connected:

const client = mongoose.connection.client;
const db = client.db('dbName');
const collection = db.collection('collectionName');
const changeStream = collection.watch();
changeStream.on('change', next => {
    
});

Sign up to request clarification or add additional context in comments.

Comments

0

He is a code of working solution with changeStream

const mongoose = require('mongoose')
const { connection } = require('../boot/mongo')

const Schema = mongoose.Schema

const status = new Schema({
    _id: {
        type: mongoose.Schema.Types.Number,
        required: true
    },
    receiveTs: {
        type: mongoose.Schema.Types.Date,
        required: true
    }
})

const OnlineStatusSchema = connection.model('Status', Status, 'status')

const pipeline = [
    {
        $match: {
            $or: [{ operationType: 'insert' }, { operationType: 'update' }]
        }
    },
    { $project: { 'fullDocument._id': 1, 'fullDocument.receiveTs': 1 } }
]



const changeStream = OnlineStatusSchema.watch(pipeline)

changeStream.on('change', async (change) => {
    // get meters reading log for respective platfrom and date
    try {
        console.log(change)
    } catch (error) {
        throw error
    }
})

module.exports = OnlineStatusSchema

1 Comment

where is the close change stream section? please add it

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.