diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 0a516af82..cd0bf3bc5 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -226,7 +226,10 @@ export class ChangeStream { private async getSnapshotLsn(): Promise { const hello = await this.defaultDb.command({ hello: 1 }); // Basic sanity check - if (hello.msg == 'isdbgrid') { + if (hello.internal?.cosmos_versions != null) { + // Exmaple: internal: { cosmos_versions: [ '1.104-1', '1.105.0', '12.1-1' ] }, + this.logger.info('CosmosDB detected. CosmosDB support is experimental and may not work as expected.'); + } else if (hello.msg == 'isdbgrid') { throw new ServiceError( ErrorCode.PSYNC_S1341, 'Sharded MongoDB Clusters are not supported yet (including MongoDB Serverless instances).' @@ -280,10 +283,13 @@ export class ChangeStream { if (!this.checkpointStreamId.equals(checkpointId)) { continue; } + + // CosmosDB workaround: use walltime const { comparable: lsn } = new MongoLSN({ - timestamp: changeDocument.clusterTime!, + timestamp: mongo.Timestamp.fromBits(0, (changeDocument as any).wallTime!.getTime() / 1000), resume_token: changeDocument._id }); + return lsn; } @@ -389,7 +395,8 @@ export class ChangeStream { const collection = await this.getCollectionInfo(this.defaultDb.databaseName, CHECKPOINTS_COLLECTION); if (collection == null) { await this.defaultDb.createCollection(CHECKPOINTS_COLLECTION, { - changeStreamPreAndPostImages: { enabled: true } + // Not supported by CosmosDB: + // changeStreamPreAndPostImages: { enabled: true } }); } else if (this.usePostImages && collection.options?.changeStreamPreAndPostImages?.enabled != true) { // Drop + create requires less permissions than collMod, @@ -718,8 +725,9 @@ export class ChangeStream { const pipeline: mongo.Document[] = [ { $match: filters.$match - }, - { $changeStreamSplitLargeEvent: {} } + } + + // { $changeStreamSplitLargeEvent: {} } // not supported on CosmosDB ]; let fullDocument: 'required' | 'updateLookup'; @@ -734,7 +742,7 @@ export class ChangeStream { } const streamOptions: mongo.ChangeStreamOptions = { - showExpandedEvents: true, + // showExpandedEvents: true, // not supported on CosmosDB maxAwaitTimeMS: options.maxAwaitTimeMs ?? this.maxAwaitTimeMS, fullDocument: fullDocument }; @@ -744,7 +752,7 @@ export class ChangeStream { */ if (resumeAfter) { streamOptions.resumeAfter = resumeAfter; - } else { + } else if (streamOptions.startAtOperationTime != null) { // Legacy: We don't persist lsns without resumeTokens anymore, but we do still handle the // case if we have an old one. streamOptions.startAtOperationTime = startAfter; @@ -756,7 +764,10 @@ export class ChangeStream { stream = this.client.watch(pipeline, streamOptions); } else { // Same general result, but requires less permissions than the above - stream = this.defaultDb.watch(pipeline, streamOptions); + // Core issue: Watching on an entire database is not supported on CosmosDB. + // stream = this.defaultDb.watch(pipeline, streamOptions); + // Temp workaround just to test other behavior + stream = this.defaultDb.collection('_powersync_checkpoints').watch(pipeline, streamOptions); } this.abort_signal.addEventListener('abort', () => { @@ -835,6 +846,8 @@ export class ChangeStream { break; } + console.log('change doc', originalChangeDocument); + if (originalChangeDocument == null) { // We get a new null document after `maxAwaitTimeMS` if there were no other events. // In this case, stream.resumeToken is the resume token associated with the last response. @@ -961,10 +974,17 @@ export class ChangeStream { if (!(checkpointId == STANDALONE_CHECKPOINT_ID || this.checkpointStreamId.equals(checkpointId))) { continue; } + + // CosmosDB workaround: use walltime + const wallTime = (changeDocument as any).wallTime; const { comparable: lsn } = new MongoLSN({ - timestamp: changeDocument.clusterTime!, + timestamp: mongo.Timestamp.fromBits(0, wallTime!.getTime() / 1000), resume_token: changeDocument._id }); + // const { comparable: lsn } = new MongoLSN({ + // timestamp: changeDocument.clusterTime!, + // resume_token: changeDocument._id + // }); if (batch.lastCheckpointLsn != null && lsn < batch.lastCheckpointLsn) { // Checkpoint out of order - should never happen with MongoDB. // If it does happen, we throw an error to stop the replication - restarting should recover. diff --git a/modules/module-mongodb/src/replication/MongoRelation.ts b/modules/module-mongodb/src/replication/MongoRelation.ts index 4ac11efb7..fcbe28776 100644 --- a/modules/module-mongodb/src/replication/MongoRelation.ts +++ b/modules/module-mongodb/src/replication/MongoRelation.ts @@ -3,7 +3,7 @@ import { storage } from '@powersync/service-core'; import { JSONBig, JsonContainer } from '@powersync/service-jsonbig'; import { SqliteRow, SqliteValue } from '@powersync/service-sync-rules'; -import { ErrorCode, ServiceError } from '@powersync/lib-services-framework'; +import { ErrorCode, ServiceAssertionError, ServiceError } from '@powersync/lib-services-framework'; import { MongoLSN } from '../common/MongoLSN.js'; import { CHECKPOINTS_COLLECTION } from './replication-utils.js'; @@ -181,9 +181,19 @@ export async function createCheckpoint( session } ); - const time = session.operationTime!; + + let time = session.operationTime; + if (time == null) { + // CosmosDB workaround + const hello = await db.command({ hello: 1 }, { session }); + if (hello.operationTime == null) { + throw new ServiceAssertionError('Failed to create checkpoint: no operation time available'); + } + time = hello.operationTime!; + } + // TODO: Use the above when we support custom write checkpoints - return new MongoLSN({ timestamp: time }).comparable; + return new MongoLSN({ timestamp: time! }).comparable; } finally { await session.endSession(); }