Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 99 additions & 121 deletions packages/sqlite_async/lib/src/web/database.dart
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import 'dart:async';
import 'dart:developer';
import 'dart:js_interop';
import 'dart:js_interop_unsafe';

import 'package:sqlite3/common.dart';
import 'package:sqlite3_web/sqlite3_web.dart';
import 'package:sqlite3_web/protocol_utils.dart' as proto;
import 'package:sqlite_async/sqlite_async.dart';
import 'package:sqlite_async/src/utils/profiler.dart';
import 'package:sqlite_async/src/web/database/broadcast_updates.dart';
Expand Down Expand Up @@ -97,72 +95,85 @@ class WebDatabase
@override
Future<T> readLock<T>(Future<T> Function(SqliteReadContext tx) callback,
{Duration? lockTimeout, String? debugContext}) async {
if (_mutex case var mutex?) {
return await mutex.lock(timeout: lockTimeout, () {
return ScopedReadContext.assumeReadLock(
_UnscopedContext(this), callback);
});
} else {
// No custom mutex, coordinate locks through shared worker.
await _database.customRequest(
CustomDatabaseMessage(CustomDatabaseMessageKind.requestSharedLock));

try {
return await ScopedReadContext.assumeReadLock(
_UnscopedContext(this), callback);
} finally {
await _database.customRequest(
CustomDatabaseMessage(CustomDatabaseMessageKind.releaseLock));
}
}
// Since there is only a single physical connection per database on the web,
// we can't enable concurrent readers to a writer. Even supporting multiple
// readers alone isn't safe, since these readers could start read
// transactions where we need to block other tabs from sending `BEGIN` and
// `COMMIT` statements if they were to start their own transactions.
return _lockInternal(
(unscoped) => ScopedReadContext.assumeReadLock(unscoped, callback),
lockTimeout: lockTimeout,
debugContext: debugContext,
flush: false,
);
}

@override
Future<T> writeTransaction<T>(
Future<T> Function(SqliteWriteContext tx) callback,
{Duration? lockTimeout,
bool? flush}) {
return writeLock((writeContext) {
return ScopedWriteContext.assumeWriteLock(
_UnscopedContext(this),
(ctx) async {
return await ctx.writeTransaction(callback);
},
);
},
debugContext: 'writeTransaction()',
lockTimeout: lockTimeout,
flush: flush);
return _lockInternal(
(context) {
return ScopedWriteContext.assumeWriteLock(
context,
(ctx) async {
return await ctx.writeTransaction(callback);
},
);
},
flush: flush ?? true,
lockTimeout: lockTimeout,
);
}

@override
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
{Duration? lockTimeout, String? debugContext, bool? flush}) async {
return await _lockInternal(
(unscoped) {
return ScopedWriteContext.assumeWriteLock(unscoped, callback);
},
flush: flush ?? true,
debugContext: debugContext,
lockTimeout: lockTimeout,
);
}

Future<T> _lockInternal<T>(
Future<T> Function(_UnscopedContext) callback, {
required bool flush,
Duration? lockTimeout,
String? debugContext,
}) async {
if (_mutex case var mutex?) {
return await mutex.lock(timeout: lockTimeout, () async {
final context = _UnscopedContext(this);
final context = _UnscopedContext(this, null);
try {
return await ScopedWriteContext.assumeWriteLock(context, callback);
return await callback(context);
} finally {
if (flush != false) {
if (flush) {
await this.flush();
}
}
});
} else {
// No custom mutex, coordinate locks through shared worker.
await _database.customRequest(CustomDatabaseMessage(
CustomDatabaseMessageKind.requestExclusiveLock));
final context = _UnscopedContext(this);
try {
return await ScopedWriteContext.assumeWriteLock(context, callback);
} finally {
if (flush != false) {
await this.flush();
final abortTrigger = switch (lockTimeout) {
null => null,
final duration => Future.delayed(duration),
};

return await _database.requestLock(abortTrigger: abortTrigger,
(token) async {
final context = _UnscopedContext(this, token);
try {
return await callback(context);
} finally {
if (flush) {
await this.flush();
}
}
await _database.customRequest(
CustomDatabaseMessage(CustomDatabaseMessageKind.releaseLock));
}
});
}
}

Expand All @@ -184,9 +195,20 @@ class WebDatabase
final class _UnscopedContext extends UnscopedContext {
final WebDatabase _database;

/// If this context is scoped to a lock on the database, the [LockToken] from
/// `package:sqlite3_web`.
///
/// This token needs to be passed to queries to run them. While a lock token
/// exists on the database, all queries not passing that token are blocked.
final LockToken? _lock;

final TimelineTask? _task;

_UnscopedContext(this._database)
/// Whether statements should be rejected if the database is not in an
/// autocommit state.
bool _checkInTransaction = false;

_UnscopedContext(this._database, this._lock)
: _task = _database.profileQueries ? TimelineTask() : null;

@override
Expand All @@ -213,8 +235,15 @@ final class _UnscopedContext extends UnscopedContext {
sql: sql,
parameters: parameters,
() async {
return await wrapSqliteException(
() => _database._database.select(sql, parameters));
return await wrapSqliteException(() async {
final result = await _database._database.select(
sql,
parameters: parameters,
token: _lock,
checkInTransaction: _checkInTransaction,
);
return result.result;
});
},
);
}
Expand All @@ -234,8 +263,15 @@ final class _UnscopedContext extends UnscopedContext {
@override
Future<ResultSet> execute(String sql, [List<Object?> parameters = const []]) {
return _task.timeAsync('execute', sql: sql, parameters: parameters, () {
return wrapSqliteException(
() => _database._database.select(sql, parameters));
return wrapSqliteException(() async {
final result = await _database._database.select(
sql,
parameters: parameters,
token: _lock,
checkInTransaction: _checkInTransaction,
);
return result.result;
});
});
}

Expand All @@ -246,7 +282,12 @@ final class _UnscopedContext extends UnscopedContext {
for (final set in parameterSets) {
// use execute instead of select to avoid transferring rows from the
// worker to this context.
await _database._database.execute(sql, set);
await _database._database.execute(
sql,
parameters: set,
token: _lock,
checkInTransaction: _checkInTransaction,
);
}
});
});
Expand All @@ -256,75 +297,7 @@ final class _UnscopedContext extends UnscopedContext {
UnscopedContext interceptOutermostTransaction() {
// All execute calls done in the callback will be checked for the
// autocommit state
return _ExclusiveTransactionContext(_database);
}
}

final class _ExclusiveTransactionContext extends _UnscopedContext {
_ExclusiveTransactionContext(super._database);

Future<ResultSet> _executeInternal(
String sql, List<Object?> parameters) async {
// Operations inside transactions are executed with custom requests
// in order to verify that the connection does not have autocommit enabled.
// The worker will check if autocommit = true before executing the SQL.
// An exception will be thrown if autocommit is enabled.
// The custom request which does the above will return the ResultSet as a formatted
// JavaScript object. This is the converted into a Dart ResultSet.
return await wrapSqliteException(() async {
var res = await _database._database.customRequest(CustomDatabaseMessage(
CustomDatabaseMessageKind.executeInTransaction, sql, parameters))
as JSObject;

if (res.has('format') && (res['format'] as JSNumber).toDartInt == 2) {
// Newer workers use a serialization format more efficient than dartify().
return proto.deserializeResultSet(res['r'] as JSObject);
}

var result = Map<String, dynamic>.from(res.dartify() as Map);
final columnNames = [
for (final entry in result['columnNames']) entry as String
];
final rawTableNames = result['tableNames'];
final tableNames = rawTableNames != null
? [
for (final entry in (rawTableNames as List<Object?>))
entry as String
]
: null;

final rows = <List<Object?>>[];
for (final row in (result['rows'] as List<Object?>)) {
final dartRow = <Object?>[];

for (final column in (row as List<Object?>)) {
dartRow.add(column);
}

rows.add(dartRow);
}
final resultSet = ResultSet(columnNames, tableNames, rows);
return resultSet;
});
}

@override
Future<ResultSet> execute(String sql,
[List<Object?> parameters = const []]) async {
return _task.timeAsync('execute', sql: sql, parameters: parameters, () {
return _executeInternal(sql, parameters);
});
}

@override
Future<void> executeBatch(
String sql, List<List<Object?>> parameterSets) async {
return _task.timeAsync('executeBatch', sql: sql, () async {
for (final set in parameterSets) {
await _database._database.customRequest(CustomDatabaseMessage(
CustomDatabaseMessageKind.executeBatchInTransaction, sql, set));
}
});
return _UnscopedContext(_database, _lock).._checkInTransaction = true;
}
}

Expand All @@ -337,6 +310,11 @@ Future<T> wrapSqliteException<T>(Future<T> Function() callback) async {
throw serializedCause;
}

if (ex.message.contains('Database is not in a transaction')) {
throw SqliteException(
0, "Transaction rolled back by earlier statement. Cannot execute.");
}

// Older versions of package:sqlite_web reported SqliteExceptions as strings
// only.
if (ex.toString().contains('SqliteException')) {
Expand Down
6 changes: 1 addition & 5 deletions packages/sqlite_async/lib/src/web/protocol.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,8 @@ import 'dart:js_interop';
import 'package:sqlite3_web/protocol_utils.dart' as proto;

enum CustomDatabaseMessageKind {
requestSharedLock,
requestExclusiveLock,
releaseLock,
lockObtained,
ok,
getAutoCommit,
executeInTransaction,
executeBatchInTransaction,
updateSubscriptionManagement,
notifyUpdates,
Expand Down
17 changes: 13 additions & 4 deletions packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,19 @@ class DefaultSqliteOpenFactory
final workers = await _initialized;
final connection = await connectToWorker(workers, path);

// When the database is accessed through a shared worker, we implement
// mutexes over custom messages sent through the shared worker. In other
// cases, we need to implement a mutex locally.
final mutex = connection.access == AccessMode.throughSharedWorker
// When the database is hosted in a shared worker, we don't need a local
// mutex since that worker will hand out leases for us.
// Additionally, package:sqlite3_web uses navigator locks internally for
// OPFS databases.
// Technically, the only other implementation (IndexedDB in a local context
// or a dedicated worker) is inherently unsafe to use across tabs. But
// wrapping those in a mutex and flushing the file system helps a little bit
// (still something we're trying to avoid).
final hasSqliteWebMutex =
connection.access == AccessMode.throughSharedWorker ||
connection.storage == StorageMode.opfs;

final mutex = hasSqliteWebMutex
? null
: MutexImpl(identifier: path); // Use the DB path as a mutex identifier

Expand Down
Loading