diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index cb1814b..0e0d34b 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -3,13 +3,15 @@ name: Test on: push: branches: - - "**" + - "*" + pull_request: jobs: build: runs-on: ubuntu-latest + if: github.event_name == 'push' || (github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name != github.repository) steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v5 - uses: dart-lang/setup-dart@v1 - name: Install Melos @@ -29,6 +31,7 @@ jobs: test: runs-on: ubuntu-latest + if: github.event_name == 'push' || (github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name != github.repository) strategy: fail-fast: false matrix: @@ -49,7 +52,7 @@ jobs: sqlite_url: "https://www.sqlite.org/2022/sqlite-autoconf-3380000.tar.gz" dart_sdk: stable steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v5 - uses: dart-lang/setup-dart@v1 with: sdk: ${{ matrix.dart_sdk }} diff --git a/CHANGELOG.md b/CHANGELOG.md index cafde71..0342a48 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,90 @@ All notable changes to this project will be documented in this file. See [Conventional Commits](https://conventionalcommits.org) for commit guidelines. +## 2025-11-04 + +### Changes + +--- + +Packages with breaking changes: + + - `sqlite_async` uses version 0.4.x of `sqlite3_web`, which requires a new worker. + +Packages with other changes: + + - [`sqlite_async` - `v0.13.0`](#sqlite_async---v0130) + - [`drift_sqlite_async` - `v0.2.6`](#drift_sqlite_async---v026) + +--- + +#### `sqlite_async` - `v0.13.0` + + - Update sqlite3_web to 0.4.0 + +#### `drift_sqlite_async` - `v0.2.6` + +- Support latest `sqlite_async`. + +## 2025-10-13 + +### Changes + +--- + +Packages with breaking changes: + + - There are no breaking changes in this release. + +Packages with other changes: + + - [`sqlite_async` - `v0.12.2`](#sqlite_async---v0122) + - [`drift_sqlite_async` - `v0.2.5`](#drift_sqlite_async---v025) + +--- + +#### `sqlite_async` - `v0.12.2` + + - Add `withAllConnections` method to run statements on all connections in the pool. + +#### `drift_sqlite_async` - `v0.2.5` + + - Allow customizing update notifications from `sqlite_async`. + + +## 2025-08-08 + +### Changes + +--- + +Packages with breaking changes: + + - There are no breaking changes in this release. + +Packages with other changes: + + - [`sqlite_async` - `v0.12.1`](#sqlite_async---v0121) + - [`sqlite_async` - `v0.12.0`](#sqlite_async---v0120) + - [`drift_sqlite_async` - `v0.2.3+1`](#drift_sqlite_async---v0231) + +Packages with dependency updates only: + +> Packages listed below depend on other packages in this workspace that have had changes. Their versions have been incremented to bump the minimum dependency versions of the packages they depend upon in this project. + + - `drift_sqlite_async` - `v0.2.3+1` + +--- + +#### `sqlite_async` - `v0.12.1` + +- Fix distributing updates from shared worker. + +#### `sqlite_async` - `v0.12.0` + + - Avoid large transactions creating a large internal update queue. + + ## 2025-07-29 --- diff --git a/packages/drift_sqlite_async/CHANGELOG.md b/packages/drift_sqlite_async/CHANGELOG.md index e70cd40..bd780e9 100644 --- a/packages/drift_sqlite_async/CHANGELOG.md +++ b/packages/drift_sqlite_async/CHANGELOG.md @@ -1,3 +1,19 @@ +## 0.2.6 + +- Support latest `sqlite_async`. + +## 0.2.5 + + - Allow customizing update notifications from `sqlite_async`. + +## 0.2.4 + +- Allow transforming table updates from sqlite_async. + +## 0.2.3+1 + + - Update a dependency to the latest release. + ## 0.2.3 - Support nested transactions. diff --git a/packages/drift_sqlite_async/lib/src/connection.dart b/packages/drift_sqlite_async/lib/src/connection.dart index a1af55a..6c34ed4 100644 --- a/packages/drift_sqlite_async/lib/src/connection.dart +++ b/packages/drift_sqlite_async/lib/src/connection.dart @@ -15,12 +15,22 @@ import 'package:sqlite_async/sqlite_async.dart'; class SqliteAsyncDriftConnection extends DatabaseConnection { late StreamSubscription _updateSubscription; - SqliteAsyncDriftConnection(SqliteConnection db, {bool logStatements = false}) - : super(SqliteAsyncQueryExecutor(db, logStatements: logStatements)) { + /// [transformTableUpdates] is useful to map local table names from PowerSync that are backed by a view name + /// which is the entity that the user interacts with. + SqliteAsyncDriftConnection( + SqliteConnection db, { + bool logStatements = false, + Set Function(UpdateNotification)? transformTableUpdates, + }) : super(SqliteAsyncQueryExecutor(db, logStatements: logStatements)) { _updateSubscription = (db as SqliteQueries).updates!.listen((event) { - var setUpdates = {}; - for (var tableName in event.tables) { - setUpdates.add(TableUpdate(tableName)); + final Set setUpdates; + if (transformTableUpdates != null) { + setUpdates = transformTableUpdates(event); + } else { + setUpdates = {}; + for (var tableName in event.tables) { + setUpdates.add(TableUpdate(tableName)); + } } super.streamQueries.handleTableUpdates(setUpdates); }); diff --git a/packages/drift_sqlite_async/pubspec.yaml b/packages/drift_sqlite_async/pubspec.yaml index 77f250d..dc19fe7 100644 --- a/packages/drift_sqlite_async/pubspec.yaml +++ b/packages/drift_sqlite_async/pubspec.yaml @@ -1,5 +1,5 @@ name: drift_sqlite_async -version: 0.2.3 +version: 0.2.6 homepage: https://github.com/powersync-ja/sqlite_async.dart repository: https://github.com/powersync-ja/sqlite_async.dart description: Use Drift with a sqlite_async database, allowing both to be used in the same application. @@ -15,7 +15,7 @@ environment: sdk: ">=3.0.0 <4.0.0" dependencies: drift: ">=2.28.0 <3.0.0" - sqlite_async: ^0.11.8 + sqlite_async: ^0.13.0 dev_dependencies: build_runner: ^2.4.8 diff --git a/packages/drift_sqlite_async/test/basic_test.dart b/packages/drift_sqlite_async/test/basic_test.dart index 339cc04..cac6f55 100644 --- a/packages/drift_sqlite_async/test/basic_test.dart +++ b/packages/drift_sqlite_async/test/basic_test.dart @@ -11,6 +11,7 @@ import 'package:sqlite_async/sqlite_async.dart'; import 'package:test/test.dart'; import './utils/test_utils.dart'; +import 'generated/database.dart'; class EmptyDatabase extends GeneratedDatabase { EmptyDatabase(super.executor); @@ -182,7 +183,7 @@ void main() { {'description': 'Test 1'}, {'description': 'Test 3'} ])); - }, skip: 'sqlite_async does not support nested transactions'); + }); test('Concurrent select', () async { var completer1 = Completer(); @@ -245,4 +246,43 @@ INSERT INTO test_data(description) VALUES('test data'); expect(row, isEmpty); }); }); + + test('transform table updates', () async { + final path = dbPath(); + await cleanDb(path: path); + + final db = await setupDatabase(path: path); + final connection = SqliteAsyncDriftConnection( + db, + // tables with the local_ prefix are mapped to the name without the prefix + transformTableUpdates: (event) { + final updates = {}; + + for (final originalTableName in event.tables) { + final effectiveName = originalTableName.startsWith("local_") + ? originalTableName.substring(6) + : originalTableName; + updates.add(TableUpdate(effectiveName)); + } + + return updates; + }, + ); + + // Create table with a different name than drift. (Mimicking a table name backed by a view in PowerSync with the optional sync strategy) + await db.execute( + 'CREATE TABLE local_todos(id INTEGER PRIMARY KEY AUTOINCREMENT, description TEXT)', + ); + + final dbu = TodoDatabase.fromSqliteAsyncConnection(connection); + + final tableUpdatesFut = + dbu.tableUpdates(TableUpdateQuery.onTableName("todos")).first; + + // This insert will trigger the sqlite_async "updates" stream + await db.execute("INSERT INTO local_todos(description) VALUES('Test 1')"); + + expect(await tableUpdatesFut.timeout(const Duration(seconds: 2)), + {TableUpdate("todos")}); + }); } diff --git a/packages/drift_sqlite_async/test/generated/database.dart b/packages/drift_sqlite_async/test/generated/database.dart index 928c7dd..4e03600 100644 --- a/packages/drift_sqlite_async/test/generated/database.dart +++ b/packages/drift_sqlite_async/test/generated/database.dart @@ -16,6 +16,8 @@ class TodoItems extends Table { class TodoDatabase extends _$TodoDatabase { TodoDatabase(SqliteConnection db) : super(SqliteAsyncDriftConnection(db)); + TodoDatabase.fromSqliteAsyncConnection(SqliteAsyncDriftConnection super.conn); + @override int get schemaVersion => 1; } diff --git a/packages/sqlite_async/CHANGELOG.md b/packages/sqlite_async/CHANGELOG.md index 387c944..d833558 100644 --- a/packages/sqlite_async/CHANGELOG.md +++ b/packages/sqlite_async/CHANGELOG.md @@ -1,3 +1,19 @@ +## 0.13.0 + + - Update sqlite3_web to 0.4.0 + +## 0.12.2 + + - Add `withAllConnections` method to run statements on all connections in the pool. + +## 0.12.1 + +- Fix distributing updates from shared worker. + +## 0.12.0 + + - Avoid large transactions creating a large internal update queue. + ## 0.11.8 - Support nested transactions (emulated with `SAVEPOINT` statements). diff --git a/packages/sqlite_async/lib/src/common/isolate_connection_factory.dart b/packages/sqlite_async/lib/src/common/isolate_connection_factory.dart index 90dc9b1..87e6176 100644 --- a/packages/sqlite_async/lib/src/common/isolate_connection_factory.dart +++ b/packages/sqlite_async/lib/src/common/isolate_connection_factory.dart @@ -30,8 +30,8 @@ abstract class IsolateConnectionFactory SerializedPortClient get upstreamPort; factory IsolateConnectionFactory( - {required openFactory, - required mutex, + {required dynamic /* platform-specific type */ openFactory, + required dynamic /* platform-specific type */ mutex, required SerializedPortClient upstreamPort}) { return IsolateConnectionFactoryImpl( openFactory: openFactory, diff --git a/packages/sqlite_async/lib/src/common/port_channel_native.dart b/packages/sqlite_async/lib/src/common/port_channel_native.dart index bb8318e..c9f8e6b 100644 --- a/packages/sqlite_async/lib/src/common/port_channel_native.dart +++ b/packages/sqlite_async/lib/src/common/port_channel_native.dart @@ -128,7 +128,7 @@ class ParentPortClient implements PortClient { _close(); } - tieToIsolate(Isolate isolate) { + void tieToIsolate(Isolate isolate) { _isolateDebugName = isolate.debugName; isolate.addErrorListener(_errorPort.sendPort); isolate.addOnExitListener(_receivePort.sendPort, response: _closeMessage); @@ -207,7 +207,7 @@ class RequestPortServer { RequestPortServer(this.port); - open(Future Function(Object? message) handle) { + PortServer open(Future Function(Object? message) handle) { return PortServer.forSendPort(port, handle); } } diff --git a/packages/sqlite_async/lib/src/common/port_channel_stub.dart b/packages/sqlite_async/lib/src/common/port_channel_stub.dart index 6c6e5cc..d5c50ae 100644 --- a/packages/sqlite_async/lib/src/common/port_channel_stub.dart +++ b/packages/sqlite_async/lib/src/common/port_channel_stub.dart @@ -53,7 +53,7 @@ class ParentPortClient implements PortClient { _stub(); } - tieToIsolate(Isolate isolate) { + void tieToIsolate(Isolate isolate) { _stub(); } } @@ -97,7 +97,7 @@ class RequestPortServer { RequestPortServer(this.port); - open(Future Function(Object? message) handle) { + PortServer open(Future Function(Object? message) handle) { _stub(); } } diff --git a/packages/sqlite_async/lib/src/common/sqlite_database.dart b/packages/sqlite_async/lib/src/common/sqlite_database.dart index 3cb12bb..a73828a 100644 --- a/packages/sqlite_async/lib/src/common/sqlite_database.dart +++ b/packages/sqlite_async/lib/src/common/sqlite_database.dart @@ -39,6 +39,14 @@ mixin SqliteDatabaseMixin implements SqliteConnection, SqliteQueries { /// /// Use this to access the database in background isolates. IsolateConnectionFactory isolateConnectionFactory(); + + /// Locks all underlying connections making up this database, and gives [block] access to all of them at once. + /// This can be useful to run the same statement on all connections. For instance, + /// ATTACHing a database, that is expected to be available in all connections. + Future withAllConnections( + Future Function( + SqliteWriteContext writer, List readers) + block); } /// A SQLite database instance. @@ -62,7 +70,7 @@ abstract class SqliteDatabase /// /// A maximum of [maxReaders] concurrent read transactions are allowed. factory SqliteDatabase( - {required path, + {required String path, int maxReaders = SqliteDatabase.defaultMaxReaders, SqliteOptions options = const SqliteOptions.defaults()}) { return SqliteDatabaseImpl( diff --git a/packages/sqlite_async/lib/src/impl/single_connection_database.dart b/packages/sqlite_async/lib/src/impl/single_connection_database.dart index 4cd3144..7ca4357 100644 --- a/packages/sqlite_async/lib/src/impl/single_connection_database.dart +++ b/packages/sqlite_async/lib/src/impl/single_connection_database.dart @@ -57,4 +57,12 @@ final class SingleConnectionDatabase return connection.writeLock(callback, lockTimeout: lockTimeout, debugContext: debugContext); } + + @override + Future withAllConnections( + Future Function( + SqliteWriteContext writer, List readers) + block) { + return writeLock((_) => block(connection, [])); + } } diff --git a/packages/sqlite_async/lib/src/impl/stub_sqlite_database.dart b/packages/sqlite_async/lib/src/impl/stub_sqlite_database.dart index 29db641..cc106a7 100644 --- a/packages/sqlite_async/lib/src/impl/stub_sqlite_database.dart +++ b/packages/sqlite_async/lib/src/impl/stub_sqlite_database.dart @@ -20,7 +20,7 @@ class SqliteDatabaseImpl int maxReaders; factory SqliteDatabaseImpl( - {required path, + {required String path, int maxReaders = SqliteDatabase.defaultMaxReaders, SqliteOptions options = const SqliteOptions.defaults()}) { throw UnimplementedError(); @@ -64,4 +64,12 @@ class SqliteDatabaseImpl Future getAutoCommit() { throw UnimplementedError(); } + + @override + Future withAllConnections( + Future Function( + SqliteWriteContext writer, List readers) + block) { + throw UnimplementedError(); + } } diff --git a/packages/sqlite_async/lib/src/native/database/connection_pool.dart b/packages/sqlite_async/lib/src/native/database/connection_pool.dart index 9521b34..8dab27e 100644 --- a/packages/sqlite_async/lib/src/native/database/connection_pool.dart +++ b/packages/sqlite_async/lib/src/native/database/connection_pool.dart @@ -31,6 +31,8 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { final MutexImpl mutex; + int _runningWithAllConnectionsCount = 0; + @override bool closed = false; @@ -88,6 +90,14 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { return; } + if (_availableReadConnections.isEmpty && + _runningWithAllConnectionsCount > 0) { + // Wait until [withAllConnections] is done. Otherwise we could spawn a new + // reader while the user is configuring all the connections, + // e.g. a global open factory configuration shared across all connections. + return; + } + var nextItem = _queue.removeFirst(); while (nextItem.completer.isCompleted) { // This item already timed out - try the next one if available @@ -232,6 +242,66 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { await connection.refreshSchema(); } } + + Future withAllConnections( + Future Function( + SqliteWriteContext writer, List readers) + block) async { + try { + _runningWithAllConnectionsCount++; + + final blockCompleter = Completer(); + final (write, reads) = await _lockAllConns(blockCompleter); + + try { + final res = await block(write, reads); + blockCompleter.complete(res); + return res; + } catch (e, st) { + blockCompleter.completeError(e, st); + rethrow; + } + } finally { + _runningWithAllConnectionsCount--; + + // Continue processing any pending read requests that may have been queued while + // the block was running. + Timer.run(_nextRead); + } + } + + /// Locks all connections, returning the acquired contexts. + /// We pass a completer that would be called after the locks are taken. + Future<(SqliteWriteContext, List)> _lockAllConns( + Completer lockCompleter) async { + final List> readLockedCompleters = []; + final Completer writeLockedCompleter = Completer(); + + // Take the write lock + writeLock((ctx) { + writeLockedCompleter.complete(ctx); + return lockCompleter.future; + }); + + // Take all the read locks + for (final readConn in _allReadConnections) { + final completer = Completer(); + readLockedCompleters.add(completer); + + readConn.readLock((ctx) { + completer.complete(ctx); + return lockCompleter.future; + }); + } + + // Wait after all locks are taken + final [writer as SqliteWriteContext, ...readers] = await Future.wait([ + writeLockedCompleter.future, + ...readLockedCompleters.map((e) => e.future) + ]); + + return (writer, readers); + } } typedef ReadCallback = Future Function(SqliteReadContext tx); diff --git a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart index 301d0af..e90739e 100644 --- a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart +++ b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart @@ -303,7 +303,7 @@ Future _sqliteConnectionIsolateInner(_SqliteConnectionParams params, final server = params.portServer; final commandPort = ReceivePort(); - db.throttledUpdatedTables.listen((changedTables) { + db.updatedTables.listen((changedTables) { client.fire(UpdateNotification(changedTables)); }); diff --git a/packages/sqlite_async/lib/src/native/database/native_sqlite_database.dart b/packages/sqlite_async/lib/src/native/database/native_sqlite_database.dart index 7bea111..85b0dd0 100644 --- a/packages/sqlite_async/lib/src/native/database/native_sqlite_database.dart +++ b/packages/sqlite_async/lib/src/native/database/native_sqlite_database.dart @@ -54,7 +54,7 @@ class SqliteDatabaseImpl /// /// A maximum of [maxReaders] concurrent read transactions are allowed. factory SqliteDatabaseImpl( - {required path, + {required String path, int maxReaders = SqliteDatabase.defaultMaxReaders, SqliteOptions options = const SqliteOptions.defaults()}) { final factory = @@ -171,4 +171,12 @@ class SqliteDatabaseImpl Future refreshSchema() { return _pool.refreshSchema(); } + + @override + Future withAllConnections( + Future Function( + SqliteWriteContext writer, List readers) + block) { + return _pool.withAllConnections(block); + } } diff --git a/packages/sqlite_async/lib/src/native/native_isolate_connection_factory.dart b/packages/sqlite_async/lib/src/native/native_isolate_connection_factory.dart index 915d2ad..5cbb1a7 100644 --- a/packages/sqlite_async/lib/src/native/native_isolate_connection_factory.dart +++ b/packages/sqlite_async/lib/src/native/native_isolate_connection_factory.dart @@ -75,7 +75,7 @@ class _IsolateUpdateListener { return controller.stream; } - close() { + void close() { client.fire(UnsubscribeToUpdates(port.sendPort)); controller.close(); port.close(); diff --git a/packages/sqlite_async/lib/src/native/native_isolate_mutex.dart b/packages/sqlite_async/lib/src/native/native_isolate_mutex.dart index 7d82f68..9d1a212 100644 --- a/packages/sqlite_async/lib/src/native/native_isolate_mutex.dart +++ b/packages/sqlite_async/lib/src/native/native_isolate_mutex.dart @@ -159,7 +159,7 @@ class SharedMutex implements Mutex { }, zoneValues: {this: true}); } - _unlock() { + void _unlock() { client.fire(const _UnlockMessage()); } diff --git a/packages/sqlite_async/lib/src/sqlite_migrations.dart b/packages/sqlite_async/lib/src/sqlite_migrations.dart index a0c9991..7f2e5d8 100644 --- a/packages/sqlite_async/lib/src/sqlite_migrations.dart +++ b/packages/sqlite_async/lib/src/sqlite_migrations.dart @@ -25,7 +25,7 @@ class SqliteMigrations { SqliteMigrations({this.migrationTable = "_migrations"}); - add(SqliteMigration migration) { + void add(SqliteMigration migration) { assert( migrations.isEmpty || migrations.last.toVersion < migration.toVersion); @@ -48,7 +48,7 @@ class SqliteMigrations { } /// The current version as specified by the migrations. - get version { + int get version { return migrations.isEmpty ? 0 : migrations.last.toVersion; } @@ -68,7 +68,7 @@ class SqliteMigrations { } } - _validateCreateDatabase() { + void _validateCreateDatabase() { if (createDatabase != null) { if (createDatabase!.downMigration != null) { throw MigrationError("createDatabase may not contain down migrations"); @@ -229,7 +229,7 @@ class SqliteDownMigration { SqliteDownMigration({required this.toVersion}); /// Add an statement to execute to downgrade the database version. - add(String sql, [List? params]) { + void add(String sql, [List? params]) { _statements.add(_SqliteMigrationStatement(sql, params ?? [])); } } diff --git a/packages/sqlite_async/lib/src/utils/shared_utils.dart b/packages/sqlite_async/lib/src/utils/shared_utils.dart index ff291f4..542611e 100644 --- a/packages/sqlite_async/lib/src/utils/shared_utils.dart +++ b/packages/sqlite_async/lib/src/utils/shared_utils.dart @@ -79,68 +79,85 @@ List mapParameters(List parameters) { } extension ThrottledUpdates on CommonDatabase { - /// Wraps [updatesSync] to: + /// An unthrottled stream of updated tables that emits on every commit. /// - /// - Not fire in transactions. - /// - Fire asynchronously. - /// - Only report table names, which are buffered to avoid duplicates. - Stream> get throttledUpdatedTables { - StreamController>? controller; - var pendingUpdates = {}; - var paused = false; - - Timer? updateDebouncer; - - void maybeFireUpdates() { - updateDebouncer?.cancel(); - updateDebouncer = null; - - if (paused) { - // Continue collecting updates, but don't fire any - return; + /// A paused subscription on this stream will buffer changed tables into a + /// growing set instead of losing events, so this stream is simple to throttle + /// downstream. + Stream> get updatedTables { + final listeners = <_UpdateListener>[]; + var uncommitedUpdates = {}; + var underlyingSubscriptions = >[]; + + void handleUpdate(SqliteUpdate update) { + uncommitedUpdates.add(update.tableName); + } + + void afterCommit() { + for (final listener in listeners) { + listener.notify(uncommitedUpdates); } - if (!autocommit) { - // Inside a transaction - do not fire updates - return; + uncommitedUpdates.clear(); + } + + void afterRollback() { + uncommitedUpdates.clear(); + } + + void addListener(_UpdateListener listener) { + listeners.add(listener); + + if (listeners.length == 1) { + // First listener, start listening for raw updates on underlying + // database. + underlyingSubscriptions = [ + updatesSync.listen(handleUpdate), + commits.listen((_) => afterCommit()), + commits.listen((_) => afterRollback()) + ]; } + } - if (pendingUpdates.isNotEmpty) { - controller!.add(pendingUpdates); - pendingUpdates = {}; + void removeListener(_UpdateListener listener) { + listeners.remove(listener); + if (listeners.isEmpty) { + for (final sub in underlyingSubscriptions) { + sub.cancel(); + } } } - void collectUpdate(SqliteUpdate event) { - pendingUpdates.add(event.tableName); + return Stream.multi( + (listener) { + final wrapped = _UpdateListener(listener); + addListener(wrapped); - updateDebouncer ??= - Timer(const Duration(milliseconds: 1), maybeFireUpdates); + listener.onResume = wrapped.addPending; + listener.onCancel = () => removeListener(wrapped); + }, + isBroadcast: true, + ); + } +} + +class _UpdateListener { + final MultiStreamController> downstream; + Set buffered = {}; + + _UpdateListener(this.downstream); + + void notify(Set pendingUpdates) { + buffered.addAll(pendingUpdates); + if (!downstream.isPaused) { + addPending(); } + } - StreamSubscription? txSubscription; - StreamSubscription? sourceSubscription; - - controller = StreamController(onListen: () { - txSubscription = commits.listen((_) { - maybeFireUpdates(); - }, onError: (error) { - controller?.addError(error); - }); - - sourceSubscription = updatesSync.listen(collectUpdate, onError: (error) { - controller?.addError(error); - }); - }, onPause: () { - paused = true; - }, onResume: () { - paused = false; - maybeFireUpdates(); - }, onCancel: () { - txSubscription?.cancel(); - sourceSubscription?.cancel(); - }); - - return controller.stream; + void addPending() { + if (buffered.isNotEmpty) { + downstream.add(buffered); + buffered = {}; + } } } diff --git a/packages/sqlite_async/lib/src/web/database.dart b/packages/sqlite_async/lib/src/web/database.dart index cfaf987..23424fc 100644 --- a/packages/sqlite_async/lib/src/web/database.dart +++ b/packages/sqlite_async/lib/src/web/database.dart @@ -1,11 +1,9 @@ import 'dart:async'; import 'dart:developer'; import 'dart:js_interop'; -import 'dart:js_interop_unsafe'; import 'package:sqlite3/common.dart'; import 'package:sqlite3_web/sqlite3_web.dart'; -import 'package:sqlite3_web/protocol_utils.dart' as proto; import 'package:sqlite_async/sqlite_async.dart'; import 'package:sqlite_async/src/utils/profiler.dart'; import 'package:sqlite_async/src/web/database/broadcast_updates.dart'; @@ -97,24 +95,17 @@ class WebDatabase @override Future readLock(Future Function(SqliteReadContext tx) callback, {Duration? lockTimeout, String? debugContext}) async { - if (_mutex case var mutex?) { - return await mutex.lock(timeout: lockTimeout, () { - return ScopedReadContext.assumeReadLock( - _UnscopedContext(this), callback); - }); - } else { - // No custom mutex, coordinate locks through shared worker. - await _database.customRequest( - CustomDatabaseMessage(CustomDatabaseMessageKind.requestSharedLock)); - - try { - return await ScopedReadContext.assumeReadLock( - _UnscopedContext(this), callback); - } finally { - await _database.customRequest( - CustomDatabaseMessage(CustomDatabaseMessageKind.releaseLock)); - } - } + // Since there is only a single physical connection per database on the web, + // we can't enable concurrent readers to a writer. Even supporting multiple + // readers alone isn't safe, since these readers could start read + // transactions where we need to block other tabs from sending `BEGIN` and + // `COMMIT` statements if they were to start their own transactions. + return _lockInternal( + (unscoped) => ScopedReadContext.assumeReadLock(unscoped, callback), + lockTimeout: lockTimeout, + debugContext: debugContext, + flush: false, + ); } @override @@ -122,47 +113,67 @@ class WebDatabase Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout, bool? flush}) { - return writeLock((writeContext) { - return ScopedWriteContext.assumeWriteLock( - _UnscopedContext(this), - (ctx) async { - return await ctx.writeTransaction(callback); - }, - ); - }, - debugContext: 'writeTransaction()', - lockTimeout: lockTimeout, - flush: flush); + return _lockInternal( + (context) { + return ScopedWriteContext.assumeWriteLock( + context, + (ctx) async { + return await ctx.writeTransaction(callback); + }, + ); + }, + flush: flush ?? true, + lockTimeout: lockTimeout, + ); } @override Future writeLock(Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout, String? debugContext, bool? flush}) async { + return await _lockInternal( + (unscoped) { + return ScopedWriteContext.assumeWriteLock(unscoped, callback); + }, + flush: flush ?? true, + debugContext: debugContext, + lockTimeout: lockTimeout, + ); + } + + Future _lockInternal( + Future Function(_UnscopedContext) callback, { + required bool flush, + Duration? lockTimeout, + String? debugContext, + }) async { if (_mutex case var mutex?) { return await mutex.lock(timeout: lockTimeout, () async { - final context = _UnscopedContext(this); + final context = _UnscopedContext(this, null); try { - return await ScopedWriteContext.assumeWriteLock(context, callback); + return await callback(context); } finally { - if (flush != false) { + if (flush) { await this.flush(); } } }); } else { - // No custom mutex, coordinate locks through shared worker. - await _database.customRequest(CustomDatabaseMessage( - CustomDatabaseMessageKind.requestExclusiveLock)); - final context = _UnscopedContext(this); - try { - return await ScopedWriteContext.assumeWriteLock(context, callback); - } finally { - if (flush != false) { - await this.flush(); + final abortTrigger = switch (lockTimeout) { + null => null, + final duration => Future.delayed(duration), + }; + + return await _database.requestLock(abortTrigger: abortTrigger, + (token) async { + final context = _UnscopedContext(this, token); + try { + return await callback(context); + } finally { + if (flush) { + await this.flush(); + } } - await _database.customRequest( - CustomDatabaseMessage(CustomDatabaseMessageKind.releaseLock)); - } + }); } } @@ -171,14 +182,33 @@ class WebDatabase await isInitialized; return _database.fileSystem.flush(); } + + @override + Future withAllConnections( + Future Function( + SqliteWriteContext writer, List readers) + block) { + return writeLock((_) => block(this, [])); + } } final class _UnscopedContext extends UnscopedContext { final WebDatabase _database; + /// If this context is scoped to a lock on the database, the [LockToken] from + /// `package:sqlite3_web`. + /// + /// This token needs to be passed to queries to run them. While a lock token + /// exists on the database, all queries not passing that token are blocked. + final LockToken? _lock; + final TimelineTask? _task; - _UnscopedContext(this._database) + /// Whether statements should be rejected if the database is not in an + /// autocommit state. + bool _checkInTransaction = false; + + _UnscopedContext(this._database, this._lock) : _task = _database.profileQueries ? TimelineTask() : null; @override @@ -205,8 +235,15 @@ final class _UnscopedContext extends UnscopedContext { sql: sql, parameters: parameters, () async { - return await wrapSqliteException( - () => _database._database.select(sql, parameters)); + return await wrapSqliteException(() async { + final result = await _database._database.select( + sql, + parameters: parameters, + token: _lock, + checkInTransaction: _checkInTransaction, + ); + return result.result; + }); }, ); } @@ -226,8 +263,15 @@ final class _UnscopedContext extends UnscopedContext { @override Future execute(String sql, [List parameters = const []]) { return _task.timeAsync('execute', sql: sql, parameters: parameters, () { - return wrapSqliteException( - () => _database._database.select(sql, parameters)); + return wrapSqliteException(() async { + final result = await _database._database.select( + sql, + parameters: parameters, + token: _lock, + checkInTransaction: _checkInTransaction, + ); + return result.result; + }); }); } @@ -238,7 +282,12 @@ final class _UnscopedContext extends UnscopedContext { for (final set in parameterSets) { // use execute instead of select to avoid transferring rows from the // worker to this context. - await _database._database.execute(sql, set); + await _database._database.execute( + sql, + parameters: set, + token: _lock, + checkInTransaction: _checkInTransaction, + ); } }); }); @@ -248,75 +297,7 @@ final class _UnscopedContext extends UnscopedContext { UnscopedContext interceptOutermostTransaction() { // All execute calls done in the callback will be checked for the // autocommit state - return _ExclusiveTransactionContext(_database); - } -} - -final class _ExclusiveTransactionContext extends _UnscopedContext { - _ExclusiveTransactionContext(super._database); - - Future _executeInternal( - String sql, List parameters) async { - // Operations inside transactions are executed with custom requests - // in order to verify that the connection does not have autocommit enabled. - // The worker will check if autocommit = true before executing the SQL. - // An exception will be thrown if autocommit is enabled. - // The custom request which does the above will return the ResultSet as a formatted - // JavaScript object. This is the converted into a Dart ResultSet. - return await wrapSqliteException(() async { - var res = await _database._database.customRequest(CustomDatabaseMessage( - CustomDatabaseMessageKind.executeInTransaction, sql, parameters)) - as JSObject; - - if (res.has('format') && (res['format'] as JSNumber).toDartInt == 2) { - // Newer workers use a serialization format more efficient than dartify(). - return proto.deserializeResultSet(res['r'] as JSObject); - } - - var result = Map.from(res.dartify() as Map); - final columnNames = [ - for (final entry in result['columnNames']) entry as String - ]; - final rawTableNames = result['tableNames']; - final tableNames = rawTableNames != null - ? [ - for (final entry in (rawTableNames as List)) - entry as String - ] - : null; - - final rows = >[]; - for (final row in (result['rows'] as List)) { - final dartRow = []; - - for (final column in (row as List)) { - dartRow.add(column); - } - - rows.add(dartRow); - } - final resultSet = ResultSet(columnNames, tableNames, rows); - return resultSet; - }); - } - - @override - Future execute(String sql, - [List parameters = const []]) async { - return _task.timeAsync('execute', sql: sql, parameters: parameters, () { - return _executeInternal(sql, parameters); - }); - } - - @override - Future executeBatch( - String sql, List> parameterSets) async { - return _task.timeAsync('executeBatch', sql: sql, () async { - for (final set in parameterSets) { - await _database._database.customRequest(CustomDatabaseMessage( - CustomDatabaseMessageKind.executeBatchInTransaction, sql, set)); - } - }); + return _UnscopedContext(_database, _lock).._checkInTransaction = true; } } @@ -329,6 +310,11 @@ Future wrapSqliteException(Future Function() callback) async { throw serializedCause; } + if (ex.message.contains('Database is not in a transaction')) { + throw SqliteException( + 0, "Transaction rolled back by earlier statement. Cannot execute."); + } + // Older versions of package:sqlite_web reported SqliteExceptions as strings // only. if (ex.toString().contains('SqliteException')) { diff --git a/packages/sqlite_async/lib/src/web/database/web_sqlite_database.dart b/packages/sqlite_async/lib/src/web/database/web_sqlite_database.dart index c6d1b75..e129afa 100644 --- a/packages/sqlite_async/lib/src/web/database/web_sqlite_database.dart +++ b/packages/sqlite_async/lib/src/web/database/web_sqlite_database.dart @@ -56,7 +56,7 @@ class SqliteDatabaseImpl /// /// A maximum of [maxReaders] concurrent read transactions are allowed. factory SqliteDatabaseImpl( - {required path, + {required String path, int maxReaders = SqliteDatabase.defaultMaxReaders, SqliteOptions options = const SqliteOptions.defaults()}) { final factory = @@ -178,4 +178,12 @@ class SqliteDatabaseImpl Future exposeEndpoint() async { return await _connection.exposeEndpoint(); } + + @override + Future withAllConnections( + Future Function( + SqliteWriteContext writer, List readers) + block) { + return writeLock((_) => block(_connection, [])); + } } diff --git a/packages/sqlite_async/lib/src/web/protocol.dart b/packages/sqlite_async/lib/src/web/protocol.dart index d17c06b..9fcbb57 100644 --- a/packages/sqlite_async/lib/src/web/protocol.dart +++ b/packages/sqlite_async/lib/src/web/protocol.dart @@ -6,12 +6,8 @@ import 'dart:js_interop'; import 'package:sqlite3_web/protocol_utils.dart' as proto; enum CustomDatabaseMessageKind { - requestSharedLock, - requestExclusiveLock, - releaseLock, - lockObtained, + ok, getAutoCommit, - executeInTransaction, executeBatchInTransaction, updateSubscriptionManagement, notifyUpdates, diff --git a/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart b/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart index 513b1f2..29248d5 100644 --- a/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart +++ b/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart @@ -63,10 +63,19 @@ class DefaultSqliteOpenFactory final workers = await _initialized; final connection = await connectToWorker(workers, path); - // When the database is accessed through a shared worker, we implement - // mutexes over custom messages sent through the shared worker. In other - // cases, we need to implement a mutex locally. - final mutex = connection.access == AccessMode.throughSharedWorker + // When the database is hosted in a shared worker, we don't need a local + // mutex since that worker will hand out leases for us. + // Additionally, package:sqlite3_web uses navigator locks internally for + // OPFS databases. + // Technically, the only other implementation (IndexedDB in a local context + // or a dedicated worker) is inherently unsafe to use across tabs. But + // wrapping those in a mutex and flushing the file system helps a little bit + // (still something we're trying to avoid). + final hasSqliteWebMutex = + connection.access == AccessMode.throughSharedWorker || + connection.storage == StorageMode.opfs; + + final mutex = hasSqliteWebMutex ? null : MutexImpl(identifier: path); // Use the DB path as a mutex identifier diff --git a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart index 0c3e8f7..164c8ff 100644 --- a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart +++ b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart @@ -1,6 +1,5 @@ import 'dart:async'; import 'dart:js_interop'; -import 'dart:js_interop_unsafe'; import 'package:meta/meta.dart'; import 'package:mutex/mutex.dart'; @@ -56,18 +55,12 @@ class AsyncSqliteDatabase extends WorkerDatabase { final Map _state = {}; AsyncSqliteDatabase({required this.database}) - : _updates = database.throttledUpdatedTables; + : _updates = database.updatedTables; _ConnectionState _findState(ClientConnection connection) { return _state.putIfAbsent(connection, _ConnectionState.new); } - void _markHoldsMutex(ClientConnection connection) { - final state = _findState(connection); - state.holdsMutex = true; - _registerCloseListener(state, connection); - } - void _registerCloseListener( _ConnectionState state, ClientConnection connection) { if (!state.hasOnCloseListener) { @@ -87,44 +80,11 @@ class AsyncSqliteDatabase extends WorkerDatabase { final message = request as CustomDatabaseMessage; switch (message.kind) { - case CustomDatabaseMessageKind.requestSharedLock: - await mutex.acquireRead(); - _markHoldsMutex(connection); - case CustomDatabaseMessageKind.requestExclusiveLock: - await mutex.acquireWrite(); - _markHoldsMutex(connection); - case CustomDatabaseMessageKind.releaseLock: - _findState(connection).holdsMutex = false; - mutex.release(); - case CustomDatabaseMessageKind.lockObtained: + case CustomDatabaseMessageKind.ok: case CustomDatabaseMessageKind.notifyUpdates: throw UnsupportedError('This is a response, not a request'); case CustomDatabaseMessageKind.getAutoCommit: return database.autocommit.toJS; - case CustomDatabaseMessageKind.executeInTransaction: - final sql = message.rawSql.toDart; - final hasTypeInfo = message.typeInfo.isDefinedAndNotNull; - final parameters = proto.deserializeParameters( - message.rawParameters, message.typeInfo); - if (database.autocommit) { - throw SqliteException(0, - "Transaction rolled back by earlier statement. Cannot execute: $sql"); - } - - var res = database.select(sql, parameters); - if (hasTypeInfo) { - // If the client is sending a request that has parameters with type - // information, it will also support a newer serialization format for - // result sets. - return JSObject() - ..['format'] = 2.toJS - ..['r'] = proto.serializeResultSet(res); - } else { - var dartMap = resultSetToMap(res); - var jsObject = dartMap.jsify(); - return jsObject; - } - case CustomDatabaseMessageKind.executeBatchInTransaction: final sql = message.rawSql.toDart; final parameters = proto.deserializeParameters( @@ -144,19 +104,20 @@ class AsyncSqliteDatabase extends WorkerDatabase { state.unsubscribeUpdates(); _registerCloseListener(state, connection); - state.updatesNotification = _updates.listen((tables) { - connection.customRequest(CustomDatabaseMessage( + late StreamSubscription subscription; + subscription = state.updatesNotification = _updates.listen((tables) { + subscription.pause(connection.customRequest(CustomDatabaseMessage( CustomDatabaseMessageKind.notifyUpdates, id, tables.toList(), - )); + ))); }); } else { state.unsubscribeUpdates(); } } - return CustomDatabaseMessage(CustomDatabaseMessageKind.lockObtained); + return CustomDatabaseMessage(CustomDatabaseMessageKind.ok); } Map resultSetToMap(ResultSet resultSet) { diff --git a/packages/sqlite_async/pubspec.yaml b/packages/sqlite_async/pubspec.yaml index 464bd2d..1ddd431 100644 --- a/packages/sqlite_async/pubspec.yaml +++ b/packages/sqlite_async/pubspec.yaml @@ -1,6 +1,6 @@ name: sqlite_async description: High-performance asynchronous interface for SQLite on Dart and Flutter. -version: 0.11.8 +version: 0.13.0 repository: https://github.com/powersync-ja/sqlite_async.dart environment: sdk: ">=3.5.0 <4.0.0" @@ -12,8 +12,8 @@ topics: - flutter dependencies: - sqlite3: ^2.9.0 - sqlite3_web: ^0.3.2 + sqlite3: ^2.9.4 + sqlite3_web: ^0.4.0 async: ^2.10.0 collection: ^1.17.0 mutex: ^3.1.0 @@ -23,8 +23,8 @@ dependencies: dev_dependencies: build_runner: ^2.4.14 build_web_compilers: ^4.1.1 - build_test: ^2.2.3 - lints: ^5.0.0 + build_test: ^3.5.0 + lints: ^6.0.0 test: ^1.21.0 test_api: ^0.7.0 glob: ^2.1.1 diff --git a/packages/sqlite_async/test/basic_test.dart b/packages/sqlite_async/test/basic_test.dart index 6a315da..e2914b3 100644 --- a/packages/sqlite_async/test/basic_test.dart +++ b/packages/sqlite_async/test/basic_test.dart @@ -7,6 +7,7 @@ import 'utils/test_utils_impl.dart'; final testUtils = TestUtils(); const _isDart2Wasm = bool.fromEnvironment('dart.tool.dart2wasm'); +const _isWeb = identical(0, 0.0) || _isDart2Wasm; void main() { group('Shared Basic Tests', () { @@ -301,6 +302,49 @@ void main() { 'Web locks are managed with a shared worker, which does not support timeouts', ) }); + + test('with all connections', () async { + final maxReaders = _isWeb ? 0 : 3; + + final db = SqliteDatabase.withFactory( + await testUtils.testFactory(path: path), + maxReaders: maxReaders, + ); + await db.initialize(); + await createTables(db); + + // Warm up to spawn the max readers + await Future.wait([for (var i = 0; i < 10; i++) db.get('SELECT $i')]); + + bool finishedWithAllConns = false; + + late Future readsCalledWhileWithAllConnsRunning; + + final parentZone = Zone.current; + await db.withAllConnections((writer, readers) async { + expect(readers.length, maxReaders); + + // Run some reads during the block that they should run after the block finishes and releases + // all locks + // Need a root zone here to avoid recursive lock errors. + readsCalledWhileWithAllConnsRunning = + Future(parentZone.bindCallback(() async { + await Future.wait( + [1, 2, 3, 4, 5, 6, 7, 8].map((i) async { + await db.readLock((c) async { + expect(finishedWithAllConns, isTrue); + await Future.delayed(const Duration(milliseconds: 100)); + }); + }), + ); + })); + + await Future.delayed(const Duration(milliseconds: 200)); + finishedWithAllConns = true; + }); + + await readsCalledWhileWithAllConnsRunning; + }); }); } diff --git a/packages/sqlite_async/test/native/basic_test.dart b/packages/sqlite_async/test/native/basic_test.dart index dec1fed..3f348e6 100644 --- a/packages/sqlite_async/test/native/basic_test.dart +++ b/packages/sqlite_async/test/native/basic_test.dart @@ -2,12 +2,16 @@ library; import 'dart:async'; +import 'dart:io'; import 'dart:math'; +import 'package:collection/collection.dart'; +import 'package:path/path.dart' show join; import 'package:sqlite3/common.dart' as sqlite; import 'package:sqlite_async/sqlite_async.dart'; import 'package:test/test.dart'; +import '../utils/abstract_test_utils.dart'; import '../utils/test_utils_impl.dart'; final testUtils = TestUtils(); @@ -100,6 +104,126 @@ void main() { print("${DateTime.now()} done"); }); + test('prevent opening new readers while in withAllConnections', () async { + final sharedStateDir = Directory.systemTemp.createTempSync(); + addTearDown(() => sharedStateDir.deleteSync(recursive: true)); + + final File sharedStateFile = + File(join(sharedStateDir.path, 'shared-state.txt')); + + sharedStateFile.writeAsStringSync('initial'); + + final db = SqliteDatabase.withFactory( + _TestSqliteOpenFactoryWithSharedStateFile( + path: path, sharedStateFilePath: sharedStateFile.path), + maxReaders: 3); + await db.initialize(); + await createTables(db); + + // The writer saw 'initial' in the file when opening the connection + expect( + await db + .writeLock((c) => c.get('SELECT file_contents_on_open() AS state')), + {'state': 'initial'}, + ); + + final withAllConnectionsCompleter = Completer(); + + final withAllConnsFut = db.withAllConnections((writer, readers) async { + expect(readers.length, 0); // No readers yet + + // Simulate some work until the file is updated + await Future.delayed(const Duration(milliseconds: 200)); + sharedStateFile.writeAsStringSync('updated'); + + await withAllConnectionsCompleter.future; + }); + + // Start a reader that gets the contents of the shared file + bool readFinished = false; + final someReadFut = + db.get('SELECT file_contents_on_open() AS state', []).then((r) { + readFinished = true; + return r; + }); + + // The withAllConnections should prevent the reader from opening + await Future.delayed(const Duration(milliseconds: 100)); + expect(readFinished, isFalse); + + // Free all the locks + withAllConnectionsCompleter.complete(); + await withAllConnsFut; + + final readerInfo = await someReadFut; + expect(readFinished, isTrue); + // The read should see the updated value in the file. This checks + // that a reader doesn't spawn while running withAllConnections + expect(readerInfo, {'state': 'updated'}); + }); + + test('with all connections', () async { + final maxReaders = 3; + + final db = SqliteDatabase.withFactory( + await testUtils.testFactory(path: path), + maxReaders: maxReaders, + ); + await db.initialize(); + await createTables(db); + + Future readWithRandomDelay( + SqliteReadContext ctx, int id) async { + return await ctx.get( + 'SELECT ? as i, test_sleep(?) as sleep, test_connection_name() as connection', + [id, 5 + Random().nextInt(10)]); + } + + // Warm up to spawn the max readers + await Future.wait( + [1, 2, 3, 4, 5, 6, 7, 8].map((i) => readWithRandomDelay(db, i)), + ); + + bool finishedWithAllConns = false; + + late Future readsCalledWhileWithAllConnsRunning; + + print("${DateTime.now()} start"); + await db.withAllConnections((writer, readers) async { + expect(readers.length, maxReaders); + + // Run some reads during the block that they should run after the block finishes and releases + // all locks + readsCalledWhileWithAllConnsRunning = Future.wait( + [1, 2, 3, 4, 5, 6, 7, 8].map((i) async { + final r = await db.readLock((c) async { + expect(finishedWithAllConns, isTrue); + return await readWithRandomDelay(c, i); + }); + print( + "${DateTime.now()} After withAllConnections, started while running $r"); + }), + ); + + await Future.wait([ + writer.execute( + "INSERT OR REPLACE INTO test_data(id, description) SELECT ? as i, test_sleep(?) || ' ' || test_connection_name() || ' 1 ' || datetime() as connection RETURNING *", + [ + 123, + 5 + Random().nextInt(20) + ]).then((value) => + print("${DateTime.now()} withAllConnections writer done $value")), + ...readers + .mapIndexed((i, r) => readWithRandomDelay(r, i).then((results) { + print( + "${DateTime.now()} withAllConnections readers done $results"); + })) + ]); + }).then((_) => finishedWithAllConns = true); + + await readsCalledWhileWithAllConnsRunning; + }); + test('read-only transactions', () async { final db = await testUtils.setupDatabase(path: path); await createTables(db); @@ -379,3 +503,31 @@ class _InvalidPragmaOnOpenFactory extends DefaultSqliteOpenFactory { ]; } } + +class _TestSqliteOpenFactoryWithSharedStateFile + extends TestDefaultSqliteOpenFactory { + final String sharedStateFilePath; + + _TestSqliteOpenFactoryWithSharedStateFile( + {required super.path, required this.sharedStateFilePath}); + + @override + sqlite.CommonDatabase open(SqliteOpenOptions options) { + final File sharedStateFile = File(sharedStateFilePath); + final String sharedState = sharedStateFile.readAsStringSync(); + + final db = super.open(options); + + // Function to return the contents of the shared state file at the time of opening + // so that we know at which point the factory was called. + db.createFunction( + functionName: 'file_contents_on_open', + argumentCount: const sqlite.AllowedArgumentCount(0), + function: (args) { + return sharedState; + }, + ); + + return db; + } +} diff --git a/packages/sqlite_async/test/native/watch_test.dart b/packages/sqlite_async/test/native/watch_test.dart index 4e4fb83..0a97b17 100644 --- a/packages/sqlite_async/test/native/watch_test.dart +++ b/packages/sqlite_async/test/native/watch_test.dart @@ -7,6 +7,7 @@ import 'dart:math'; import 'package:sqlite3/common.dart'; import 'package:sqlite_async/sqlite_async.dart'; +import 'package:sqlite_async/src/utils/shared_utils.dart'; import 'package:test/test.dart'; import '../utils/test_utils_impl.dart'; @@ -31,6 +32,51 @@ void main() { return db; }); + test('raw update notifications', () async { + final factory = await testUtils.testFactory(path: path); + final db = factory + .openDB(SqliteOpenOptions(primaryConnection: true, readOnly: false)); + + db.execute('CREATE TABLE a (bar INTEGER);'); + db.execute('CREATE TABLE b (bar INTEGER);'); + final events = >[]; + final subscription = db.updatedTables.listen(events.add); + + db.execute('insert into a default values'); + expect(events, isEmpty); // should be async + await pumpEventQueue(); + expect(events.removeLast(), {'a'}); + + db.execute('begin'); + db.execute('insert into a default values'); + db.execute('insert into b default values'); + await pumpEventQueue(); + expect(events, isEmpty); // should only trigger on commit + db.execute('commit'); + + await pumpEventQueue(); + expect(events.removeLast(), {'a', 'b'}); + + db.execute('begin'); + db.execute('insert into a default values'); + db.execute('rollback'); + expect(events, isEmpty); + await pumpEventQueue(); + expect(events, isEmpty); // should ignore cancelled transactions + + // Should still listen during pause, and dispatch on resume + subscription.pause(); + db.execute('insert into a default values'); + await pumpEventQueue(); + expect(events, isEmpty); + + subscription.resume(); + await pumpEventQueue(); + expect(events.removeLast(), {'a'}); + + subscription.pause(); + }); + test('watch in isolate', () async { final db = await testUtils.setupDatabase(path: path); await createTables(db); diff --git a/packages/sqlite_async/test/watch_test.dart b/packages/sqlite_async/test/watch_test.dart index dc28add..1597aa0 100644 --- a/packages/sqlite_async/test/watch_test.dart +++ b/packages/sqlite_async/test/watch_test.dart @@ -8,7 +8,7 @@ import 'utils/test_utils_impl.dart'; final testUtils = TestUtils(); -createTables(SqliteDatabase db) async { +Future createTables(SqliteDatabase db) async { await db.writeTransaction((tx) async { await tx.execute( 'CREATE TABLE assets(id INTEGER PRIMARY KEY AUTOINCREMENT, make TEXT, customer_id INTEGER)');