9

My Node app uses Mongo change streams, and the app runs 3+ instances in production (more eventually, so this will become more of an issue as it grows). So, when a change comes in the change stream functionality runs as many times as there are processes.

How to set things up so that the change stream only runs once?

Here's what I've got:

const options = { fullDocument: "updateLookup" };

const filter = [
  {
    $match: {
      $and: [
        { "updateDescription.updatedFields.sites": { $exists: true } },
        { operationType: "update" }
      ]
    }
  }
];

const sitesStream = Client.watch(sitesFilter, options);

// Start listening to site stream
sitesStream.on("change", async change => {
  console.log("in site change stream", change);
  console.log(
    "in site change stream, update desc",
    change.updateDescription
  );

  // Do work...
  console.log("site change stream done.");
  return;
});
1
  • running Mongo 4.2.19, [email protected] (driver) and at least two instances of subscribing app - I see only a single consumption per change event Commented May 11, 2022 at 3:06

6 Answers 6

7

Doing this with strong guarantees is difficult but not impossible. I wrote about the details of one solution here: https://www.alechenninger.com/2020/05/building-kafka-like-message-queue-with.html

The examples are in Java but the important part is the algorithm.

It comes down to a few techniques:

  • Each process attempts to obtain a lock
  • Each lock (or each change) has an associated fencing token
  • Processing each change must be idempotent
  • While processing the change, the token is used to ensure ordered, effectively-once updates.

More details in the blog post.

Sign up to request clarification or add additional context in comments.

1 Comment

@Shubham no, unfortunately it is a Java library. But the algorithm and model could be ported to other languages like JS relatively easily I suspect.
5

It can easily be done with only Mongodb query operators. You can add a modulo query on the ID field where the divisor is the number of your app instances (N). The remainder is then an element of {0, 1, 2, ..., N-1}. If your app instances are numbered in ascending order from zero to N-1 you can write the filter like this:

const filter = [
  {
    "$match": {
      "$and": [
        // Other filters
        { "_id": { "$mod": [<number of instances>, <this instance's id>]}}
      ]
    }
  }
];

4 Comments

Thanks, this sounds better than deploying MQTT or Kafka
This will not work if the instances auto scale to meet demand.
This will not work if instance N-1 fails either.
Actually this is a very cool solution. number of instances could be dynamically updated regularly. There might be times when a few instance gets duplicate events until number of instances is synced. But this would only be temporary which is fine cuz subscribers should be idempotent
1

It sounds like you need a way to partition updates between instances. Have you looked into Apache Kafka? Basically what you would do is have a single application that writes the change data to a partitioned Kafka Topic and have your node application be a Kafka consumer. This would ensure only one application instance ever receives an update.

Depending on your partitioning strategy, you could even ensure that updates for the same record always go to the same node app (if your application needs to maintain its own state). Otherwise, you can spread out the updates in a round robin fashion.

The biggest benefit to using Kafka is that you can add and remove instances without having to adjust configurations. For example, you could start one instance and it would handle all updates. Then, as soon as you start another instance, they each start handling half of the load. You can continue this pattern for as many instances as there are partitions (and you can configure the topic to have 1000s of partitions if you want), that is the power of the Kafka consumer group. Scaling down works in the reverse.

Comments

1

While the Kafka option sounded interesting, it was a lot of infrastructure work on a platform I'm not familiar with, so I decided to go with something a little closer to home for me, sending an MQTT message to a little stand alone app, and letting the MQTT server monitor messages for uniqueness.

siteStream.on("change", async change => {
  console.log("in site change stream);
  const mqttClient = mqtt.connect("mqtt://localhost:1883");
  const id = JSON.stringify(change._id._data);
  // You'll want to push more than just the change stream id obviously...
  mqttClient.on("connect", function() {
    mqttClient.publish("myTopic", id);
    mqttClient.end();
  });
});

I'm still working out the final version of the MQTT server, but the method to evaluate uniqueness of messages will probably store an array of change stream IDs in application memory, as there is no need to persist them, and evaluate whether to proceed any further based on whether that change stream ID has been seen before.

var mqtt = require("mqtt");
var client = mqtt.connect("mqtt://localhost:1883");
var seen = [];
client.on("connect", function() {
  client.subscribe("myTopic");
});
client.on("message", function(topic, message) {
  context = message.toString().replace(/"/g, "");
  if (seen.indexOf(context) < 0) {
    seen.push(context);
    // Do stuff
  }
});

This doesn't include security, etc., but you get the idea.

5 Comments

Was this actually the best way to handle this in the end? I am currently facing the same issue where multiple instances are messing up my DB writes I'm trying to achieve in the watch function. This seems like adding a lot more complexity to what I originally thought was a simple implementation of change streams. :(
At this point, my mind goes towards running another node server with a connection to the database which handles all the watching, which is obviously limited to a single instance?
@RyannGalea: Hi guys, I'm running into the same situation. . I'm writing a service which will responsible for watching changes and publish the changes to rabbitmq topic. But I still have some concerns such as single instance of this service may be overload by mongodb writes, resume change...etc. Anw, how is your final solution ?
There are some ways to distribute the load to multiple producers such as base on the created time of entity. Prefer to pluralsight's course.
So far it's been fine, but I think of it more as a stopgap solution for the time being. I believe you would need a pretty massive amount of non-stop amount of messages being sent to RabbitMQ before you would run in to scale issues. Will be looking in to the course from pluralsight for a longer term solution, thanks for sharing @pcuong
1

Will that having a field in DB called status which will be updated using findAnUpdate based on the event received from change stream. So lets say you get 2 events at the same time from change stream. First event will update the status to start and the other will throw error if status is start. So the second event will not process any business logic.

4 Comments

what if they do the read on thse same time of the status "start" and then both of them change it ... it is a bad work around solution
So only one will be able to change it as the status field will be updated to ```end`` and while the other tries to change it will be a failure. This is something u can gurantee in MongoDB that is update is atomic.
This seems like the best approach. All receivers of the change stream try to flip the flag, and the only one that succeeds processes the event. The only concern is if the change event's _id field is stable for each event.
This is what I do: Each instance gets its own ID (uuidv4, something random). Instead of sending "start" what you do is make status an object {id:<your-uid>, timestamp:new Date()}. So first time you get the value and see if the status is undefined/null/etc, if not, you write your own uuid and timestamp to the property. Afterwards, you read that value and if the ids match, you continue. The timestamp is for checking abandoned, e.g. if the timestamp is > 10 minutes old assume its false.
0

I'm not claiming those are rock-solid production grade solutions, but I believe something like this could work

Solution 1

applying Read-Modify-Write:

  1. Add version field to the document, all the created docs have version=0
  2. Receive ChangeStream event
  3. Read the document that needs to be updated
  4. Perform the update on the model
  5. Increment version
  6. Update the document where both id and version match, otherwise discard the change

Yes, it creates 2 * n_application_replicas useless queries, so there is another option

Solution 2

  1. Create collection of ResumeTokens in mongo which would store collection -> token mapping
  2. In the changeStream handler code, after successful write, update ResumeToken in the collection
  3. Create a feature toggle that will disable reading ChangeStream in your application
  4. Configure only a single instance of your application to be a "reader"

In case of "reader" failure you might either enable reading on another node, or redeploy the "reader" node.

As a result: there might be an infinite amount of non-reader replicas and there won't be any useless queries

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.