From adfd90e7686a38a3c3b0d520541fd705f5448313 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 30 Jul 2025 17:22:06 +0200 Subject: [PATCH] POC of using snapshot queries. --- .../implementation/MongoBucketBatch.ts | 13 +++++--- .../implementation/MongoSyncBucketStorage.ts | 32 ++++++++++++++++--- .../src/storage/implementation/models.ts | 4 ++- .../src/storage/SyncRulesBucketStorage.ts | 3 +- .../src/sync/BucketChecksumState.ts | 2 +- 5 files changed, 42 insertions(+), 12 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts index ceee3542e..d36881c5f 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts @@ -718,6 +718,7 @@ export class MongoBucketBatch if (this.persisted_op != null) { update.last_checkpoint = this.persisted_op; + update.last_checkpoint_clustertime = '$$CLUSTER_TIME' as any; } // Mark relevant write checkpoints as "processed". @@ -741,10 +742,14 @@ export class MongoBucketBatch { _id: this.group_id }, - { - $set: update, - $unset: { snapshot_lsn: 1 } - }, + [ + { + $set: update + }, + { + $unset: ['snapshot_lsn'] + } + ], { session: this.session } ); await this.autoActivate(lsn); diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index 6a0cfcf04..7b3c1a31a 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -109,17 +109,19 @@ export class MongoSyncBucketStorage const doc = await this.db.sync_rules.findOne( { _id: this.group_id }, { - projection: { last_checkpoint: 1, last_checkpoint_lsn: 1, snapshot_done: 1 } + projection: { last_checkpoint: 1, last_checkpoint_lsn: 1, snapshot_done: 1, last_checkpoint_clustertime: 1 } } ); if (!doc?.snapshot_done) { return { checkpoint: 0n, + clusterTime: null, lsn: null }; } return { checkpoint: doc?.last_checkpoint ?? 0n, + clusterTime: doc?.last_checkpoint_clustertime ?? null, lsn: doc?.last_checkpoint_lsn ?? null }; } @@ -261,17 +263,24 @@ export class MongoSyncBucketStorage return result!; } - async getParameterSets(checkpoint: utils.InternalOpId, lookups: ParameterLookup[]): Promise { + async getParameterSets(checkpoint: ReplicationCheckpoint, lookups: ParameterLookup[]): Promise { + console.trace('getParameterSets', checkpoint); const lookupFilter = lookups.map((lookup) => { return storage.serializeLookup(lookup); }); + // 1. Filter by sync rules version (group_id) + // 2. Filter by lookup keys + // 3. Filter by checkpoint (_id <= checkpoint) + // 4. Return the latest parameter set for each (key, lookup) + // This is indexed on {'key.g', lookup, _id}, which should cover the $match stage. + // To prevent having too many results after the $match stage, we need to compact periodically. const rows = await this.db.bucket_parameters .aggregate([ { $match: { 'key.g': this.group_id, lookup: { $in: lookupFilter }, - _id: { $lte: checkpoint } + _id: { $lte: checkpoint.checkpoint } } }, { @@ -289,6 +298,17 @@ export class MongoSyncBucketStorage } ]) .toArray(); + + if (checkpoint.clusterTime != null) { + console.log('snapshot query', checkpoint.clusterTime); + const d = await this.db.db.command({ + find: 'bucket_parameters', + readConcern: { + level: 'snapshot', + atClusterTime: checkpoint.clusterTime + } + }); + } const groupedParameters = rows.map((row) => { return row.bucket_parameters; }); @@ -661,9 +681,10 @@ export class MongoSyncBucketStorage return new MongoCompactor(this.db, this.group_id, options).compact(); } - private makeActiveCheckpoint(doc: SyncRuleCheckpointState | null) { + private makeActiveCheckpoint(doc: SyncRuleCheckpointState | null): ReplicationCheckpoint { return { checkpoint: doc?.last_checkpoint ?? 0n, + clusterTime: doc?.last_checkpoint_clustertime ?? null, lsn: doc?.last_checkpoint_lsn ?? null }; } @@ -698,7 +719,8 @@ export class MongoSyncBucketStorage _id: 1, state: 1, last_checkpoint: 1, - last_checkpoint_lsn: 1 + last_checkpoint_lsn: 1, + last_checkpoint_clustertime: 1 } } ); diff --git a/modules/module-mongodb-storage/src/storage/implementation/models.ts b/modules/module-mongodb-storage/src/storage/implementation/models.ts index 33eac22d8..c34f7a492 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/models.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/models.ts @@ -137,6 +137,8 @@ export interface SyncRuleDocument { */ last_checkpoint: bigint | null; + last_checkpoint_clustertime?: bson.Timestamp | null; + /** * The LSN associated with the last consistent checkpoint. */ @@ -185,7 +187,7 @@ export interface CheckpointEventDocument { export type SyncRuleCheckpointState = Pick< SyncRuleDocument, - 'last_checkpoint' | 'last_checkpoint_lsn' | '_id' | 'state' + 'last_checkpoint' | 'last_checkpoint_lsn' | '_id' | 'state' | 'last_checkpoint_clustertime' >; export interface CustomWriteCheckpointDocument { diff --git a/packages/service-core/src/storage/SyncRulesBucketStorage.ts b/packages/service-core/src/storage/SyncRulesBucketStorage.ts index 6b27e3c7c..3ad0d7159 100644 --- a/packages/service-core/src/storage/SyncRulesBucketStorage.ts +++ b/packages/service-core/src/storage/SyncRulesBucketStorage.ts @@ -69,7 +69,7 @@ export interface SyncRulesBucketStorage /** * Used to resolve "dynamic" parameter queries. */ - getParameterSets(checkpoint: util.InternalOpId, lookups: ParameterLookup[]): Promise; + getParameterSets(checkpoint: ReplicationCheckpoint, lookups: ParameterLookup[]): Promise; /** * Given two checkpoints, return the changes in bucket data and parameters that may have occurred @@ -243,6 +243,7 @@ export interface SyncBucketDataChunk { export interface ReplicationCheckpoint { readonly checkpoint: util.InternalOpId; readonly lsn: string | null; + readonly clusterTime: unknown; } export interface WatchWriteCheckpointOptions { diff --git a/packages/service-core/src/sync/BucketChecksumState.ts b/packages/service-core/src/sync/BucketChecksumState.ts index 2bbd7c8cd..a425c0a0f 100644 --- a/packages/service-core/src/sync/BucketChecksumState.ts +++ b/packages/service-core/src/sync/BucketChecksumState.ts @@ -440,7 +440,7 @@ export class BucketParameterState { if (hasParameterChange || this.cachedDynamicBuckets == null || this.cachedDynamicBucketSet == null) { dynamicBuckets = await querier.queryDynamicBucketDescriptions({ getParameterSets(lookups) { - return storage.getParameterSets(checkpoint.base.checkpoint, lookups); + return storage.getParameterSets(checkpoint.base, lookups); } }); this.cachedDynamicBuckets = dynamicBuckets;