Skip to content

[MongoDB] Adaptive batchSize to improve change stream timeout handling#609

Merged
rkistner merged 13 commits intomainfrom
improve-change-stream-timeout-handling
Apr 21, 2026
Merged

[MongoDB] Adaptive batchSize to improve change stream timeout handling#609
rkistner merged 13 commits intomainfrom
improve-change-stream-timeout-handling

Conversation

@rkistner
Copy link
Copy Markdown
Contributor

@rkistner rkistner commented Apr 20, 2026

This makes a change to batchSize values, to improve timeout handling for [PSYNC_S1345] Timeout while reading MongoDB ChangeStream errors, helping with recovering automatically. For background on this issue, see #417.

#417 significantly reduced the number of occurrences by improving the pipeline performance on the source database. However, we still see the issue coming up occasionally. This tweaks the batchSize for the aggregate request to further improve the handling.

To understand how this helps, consider a case where we replicate from database A, which receives a slow stream of updates, while database B in the same cluster receives a very high rate of updates for a couple of minutes. If we fall behind with replication, we could end up with a backlog like this:

1 000 000 updates on B
1 update on A
1 000 000 updates on B
10 updates on A
1 000 000 updates on C
1 update on A
... repeat this pattern another 20x

With our default batchSize of 6000, the change stream request would have to read through all the changes in B before building up 6000 changes in A. So, the solution is to make the batch size smaller.

However, we can't universally make the batch size significantly smaller, since that can reduce throughput for the common case where this timeout isn't an issue. So for this, we use adaptive batch sizing:

  1. Always use batchSize: 1 for the aggregate command when opening a change stream. Since this is only run on startup or when retrying, the overhead of the tiny batch size should be small.
  2. batchSize: 6000 for the getMore command by default.
  3. After a timeout, reduce batchSize to 2 for getMore, then double on every batch until it reaches the original batch size.

So if we do run into a timeout, the change stream restarts with a new aggregate command with batchSize: 1, then smaller getMore batch sizes, which allows us to make progress.

The original attempt only used the batchSize: 1 for aggregate command, and not changing the batchSize for the getMore command. There are however edge cases where that could result in glacial replication lag. For example, suppose we this backlog:

5000 update on A
1 000 000 updates on B
10 updates on A
1 000 000 updates on B
10 updates on A
...

With this backlog, we can expect a batchSize: 6000 request to always fail. But with only batchSize: 1 on the aggregate command, we only move one record further, then fail again on the next getMore, effectively replicating around 1 change per minute until we move past those original 5000.

By reducing the batchSize for getMore and doubling it again, we should be able to make a lot more progress before running into the timeout. In the above example, we'd make requests with batchSize of 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, before the next request of 4096 fails, and restarting at 1 again.

Tests

This configures the test setup to use enableTestCommands=1 in the MongoDB server. That allows us to inject error responses for specific commands, making testing these edge cases easier.

Flush

This now does a flush() and setResumeToken() on each change stream batch. This makes sure that we persist progress if we run into a fatal change stream error. This is only possible now that we're using raw change streams.

This change also breaks our logic for detecting a change in database URI. The old behavior was not reliable here, since it depends on which type of resume token happened to be used. We can revisit this in the future, looking into different mechanisms to detect the database change (such as persisting and comparing the name).

@changeset-bot
Copy link
Copy Markdown

changeset-bot Bot commented Apr 20, 2026

🦋 Changeset detected

Latest commit: 8f60b65

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 12 packages
Name Type
@powersync/service-module-mongodb Patch
@powersync/service-schema Patch
@powersync/service-image Patch
@powersync/service-core Patch
@powersync/service-core-tests Patch
@powersync/service-module-core Patch
@powersync/service-module-mongodb-storage Patch
@powersync/service-module-mssql Patch
@powersync/service-module-mysql Patch
@powersync/service-module-postgres-storage Patch
@powersync/service-module-postgres Patch
test-client Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@rkistner rkistner changed the title [MongoDB] Improve change stream timeout handling [MongoDB] Adaptive batchSize to improve change stream timeout handling Apr 20, 2026
@rkistner rkistner marked this pull request as ready for review April 20, 2026 11:42
@rkistner
Copy link
Copy Markdown
Contributor Author

For reference, this is what the logs look like now when manually triggering failures on getMore:

info: [powersync_3] Processed batch of 1 changes in 35ms
warn: [powersync_3] Resumable change stream error, retrying: Failing command via 'failCommand' failpoint Failing command via 'failCommand' failpoint {"$clusterTime":{"data":[88,0,0,0,17,99,108,117,115,116,101,114,84,105,109,101,0,2,0,0,0,141,21,230,105,3,115,105,103,110,97,116,117,114,101,0,51,0,0,0,5,104,97,115,104,0,20,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,18,107,101,121,73,100,0,0,0,0,0,0,0,0,0,0,0],"type":"Buffer"},"code":50,"codeName":"MaxTimeMSExpired","errorLabelSet":{},"errorResponse":{"$clusterTime":{"data":[88,0,0,0,17,99,108,117,115,116,101,114,84,105,109,101,0,2,0,0,0,141,21,230,105,3,115,105,103,110,97,116,117,114,101,0,51,0,0,0,5,104,97,115,104,0,20,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,18,107,101,121,73,100,0,0,0,0,0,0,0,0,0,0,0],"type":"Buffer"},"code":50,"codeName":"MaxTimeMSExpired","errmsg":"Failing command via 'failCommand' failpoint","ok":0,"operationTime":{"$timestamp":"7630810313960456194"}},"ok":0,"operationTime":{"$timestamp":"7630810313960456194"},"stack":"MongoServerError: Failing command via 'failCommand' failpoint\n    at Connection.sendCommand (/home/ralf/src/powersync-service/node_modules/.pnpm/mongodb@6.20.0_@mongodb-js+zstd@2.0.1_snappy@7.3.2_socks@2.8.3/node_modules/mongodb/lib/cmap/connection.js:306:27)\n    at async Connection.command (/home/ralf/src/powersync-service/node_modules/.pnpm/mongodb@6.20.0_@mongodb-js+zstd@2.0.1_snappy@7.3.2_socks@2.8.3/node_modules/mongodb/lib/cmap/connection.js:334:26)\n    at async Server.command (/home/ralf/src/powersync-service/node_modules/.pnpm/mongodb@6.20.0_@mongodb-js+zstd@2.0.1_snappy@7.3.2_socks@2.8.3/node_modules/mongodb/lib/sdam/server.js:194:29)\n    at async tryOperation (/home/ralf/src/powersync-service/node_modules/.pnpm/mongodb@6.20.0_@mongodb-js+zstd@2.0.1_snappy@7.3.2_socks@2.8.3/node_modules/mongodb/lib/operations/execute_operation.js:213:32)\n    at async executeOperation (/home/ralf/src/powersync-service/node_modules/.pnpm/mongodb@6.20.0_@mongodb-js+zstd@2.0.1_snappy@7.3.2_socks@2.8.3/node_modules/mongodb/lib/operations/execute_operation.js:78:16)\n    at async Db.command (/home/ralf/src/powersync-service/node_modules/.pnpm/mongodb@6.20.0_@mongodb-js+zstd@2.0.1_snappy@7.3.2_socks@2.8.3/node_modules/mongodb/lib/db.js:189:16)\n    at async rawChangeStreamInner (file:///home/ralf/src/powersync-service/modules/module-mongodb/dist/replication/RawChangeStream.js:127:35)\n    at async rawChangeStream (file:///home/ralf/src/powersync-service/modules/module-mongodb/dist/replication/RawChangeStream.js:48:28)\n    at async file:///home/ralf/src/powersync-service/modules/module-mongodb/dist/replication/ChangeStream.js:671:28\n    at async MongoSyncBucketStorageV1.startBatch (file:///home/ralf/src/powersync-service/modules/module-mongodb-storage/dist/storage/implementation/MongoSyncBucketStorage.js:129:9)"}
info: [powersync_3] Processed batch of 1 changes in 4ms
warn: [powersync_3] Resumable change stream error, retrying: Failing command via 'failCommand' failpoint Failing command via 'failCommand' failpoint {"$clusterTime":{"data":[88,0,0,0,17,99,108,117,115,116,101,114,84,105,109,101,0,1,0,0,0,142,21,230,105,3,115,105,103,110,97,116,117,114,101,0,51,0,0,0,5,104,97,115,104,0,20,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,18,107,101,121,73,100,0,0,0,0,0,0,0,0,0,0,0],"type":"Buffer"},"code":50,"codeName":"MaxTimeMSExpired","errorLabelSet":{},"errorResponse":{"$clusterTime":{"data":[88,0,0,0,17,99,108,117,115,116,101,114,84,105,109,101,0,1,0,0,0,142,21,230,105,3,115,105,103,110,97,116,117,114,101,0,51,0,0,0,5,104,97,115,104,0,20,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,18,107,101,121,73,100,0,0,0,0,0,0,0,0,0,0,0],"type":"Buffer"},"code":50,"codeName":"MaxTimeMSExpired","errmsg":"Failing command via 'failCommand' failpoint","ok":0,"operationTime":{"$timestamp":"7630810318255423489"}},"ok":0,"operationTime":{"$timestamp":"7630810318255423489"},"stack":"MongoServerError: Failing command via 'failCommand' failpoint\n    at Connection.sendCommand (/home/ralf/src/powersync-service/node_modules/.pnpm/mongodb@6.20.0_@mongodb-js+zstd@2.0.1_snappy@7.3.2_socks@2.8.3/node_modules/mongodb/lib/cmap/connection.js:306:27)\n    at async Connection.command (/home/ralf/src/powersync-service/node_modules/.pnpm/mongodb@6.20.0_@mongodb-js+zstd@2.0.1_snappy@7.3.2_socks@2.8.3/node_modules/mongodb/lib/cmap/connection.js:334:26)\n    at async Server.command (/home/ralf/src/powersync-service/node_modules/.pnpm/mongodb@6.20.0_@mongodb-js+zstd@2.0.1_snappy@7.3.2_socks@2.8.3/node_modules/mongodb/lib/sdam/server.js:194:29)\n    at async tryOperation (/home/ralf/src/powersync-service/node_modules/.pnpm/mongodb@6.20.0_@mongodb-js+zstd@2.0.1_snappy@7.3.2_socks@2.8.3/node_modules/mongodb/lib/operations/execute_operation.js:213:32)\n    at async executeOperation (/home/ralf/src/powersync-service/node_modules/.pnpm/mongodb@6.20.0_@mongodb-js+zstd@2.0.1_snappy@7.3.2_socks@2.8.3/node_modules/mongodb/lib/operations/execute_operation.js:78:16)\n    at async Db.command (/home/ralf/src/powersync-service/node_modules/.pnpm/mongodb@6.20.0_@mongodb-js+zstd@2.0.1_snappy@7.3.2_socks@2.8.3/node_modules/mongodb/lib/db.js:189:16)\n    at async rawChangeStreamInner (file:///home/ralf/src/powersync-service/modules/module-mongodb/dist/replication/RawChangeStream.js:127:35)\n    at async rawChangeStream (file:///home/ralf/src/powersync-service/modules/module-mongodb/dist/replication/RawChangeStream.js:48:28)\n    at async file:///home/ralf/src/powersync-service/modules/module-mongodb/dist/replication/ChangeStream.js:671:28\n    at async MongoSyncBucketStorageV1.startBatch (file:///home/ralf/src/powersync-service/modules/module-mongodb-storage/dist/storage/implementation/MongoSyncBucketStorage.js:129:9)"}
info: [powersync_3] Processed batch of 1 changes in 4ms
info: [powersync_3] Processed batch of 2 changes in 2ms
info: [powersync_3] Processed batch of 4 changes in 1ms
info: [powersync_3] Processed batch of 8 changes in 1ms
info: [powersync_3] Processed batch of 16 changes in 1ms
info: [powersync_3] Processed batch of 32 changes in 1ms
info: [powersync_3] Processed batch of 64 changes in 1ms
info: [powersync_3] Processed batch of 128 changes in 5ms
info: [powersync_3] Processed batch of 256 changes in 6ms
info: [powersync_3] Processed batch of 512 changes in 7ms
info: [powersync_3] Processed batch of 1024 changes in 10ms
info: [powersync_3] Processed batch of 2048 changes in 11ms
info: [powersync_3] Processed batch of 125 changes in 10ms

@rkistner rkistner requested a review from Sleepful April 20, 2026 12:57
khawarizmus
khawarizmus previously approved these changes Apr 21, 2026
Copy link
Copy Markdown

@khawarizmus khawarizmus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM Really good approach

Comment thread modules/module-mongodb/src/replication/RawChangeStream.ts Outdated
@rkistner rkistner merged commit 176885c into main Apr 21, 2026
44 checks passed
@rkistner rkistner deleted the improve-change-stream-timeout-handling branch April 21, 2026 08:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants