Skip to content

Commit b82f30a

Browse files
committed
[WIP] readwrite pool.
1 parent 4223302 commit b82f30a

File tree

8 files changed

+91
-28
lines changed

8 files changed

+91
-28
lines changed

packages/driver/src/util/MultiConnectionPool.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ export class MultiConnectionPool implements SqliteDriverConnectionPool {
2020
private _availableReadConnections: SqliteDriverConnection[] = [];
2121
private _queue: QueuedPoolItem[] = [];
2222
private _maxConnections: number;
23+
private _nextConnectionNumber = 1;
2324

2425
private options: ConnectionPoolOptions;
2526

@@ -42,7 +43,8 @@ export class MultiConnectionPool implements SqliteDriverConnectionPool {
4243
const promise = new Promise<ReservedConnection>((resolve, reject) => {
4344
this._queue.push({
4445
resolve,
45-
reject
46+
reject,
47+
options: options ?? {}
4648
});
4749
});
4850

@@ -57,7 +59,7 @@ export class MultiConnectionPool implements SqliteDriverConnectionPool {
5759
const connection = await this.factory.openConnection({
5860
...this.options,
5961
...options,
60-
connectionName: `connection-${this._allConnections.size + 1}`
62+
connectionName: `connection-${this._nextConnectionNumber++}`
6163
});
6264
this._allConnections.add(connection);
6365
return connection;
@@ -82,7 +84,7 @@ export class MultiConnectionPool implements SqliteDriverConnectionPool {
8284
let connection: SqliteDriverConnection;
8385
if (this._availableReadConnections.length == 0) {
8486
// FIXME: prevent opening more than the max
85-
connection = await this.expandPool();
87+
connection = await this.expandPool(item.options);
8688
} else {
8789
connection = this._availableReadConnections.shift()!;
8890
}

packages/driver/src/util/connection-pools.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ export interface DriverFactory {
2222
export interface QueuedPoolItem {
2323
resolve: (reserved: ReservedConnection) => void;
2424
reject: (err: any) => void;
25+
options: ReserveConnectionOptions;
2526
}
2627

2728
export class ReservedConnectionImpl implements ReservedConnection {

packages/wa-sqlite-driver/src/OPFSCoopSyncVFS2.ts

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -72,18 +72,20 @@ export class OPFSCoopSyncVFS2 extends FacadeVFS {
7272
unboundAccessHandles = new Set<FileSystemSyncAccessHandle>();
7373
accessiblePaths = new Set<string>();
7474
releaser: null | (() => void) = null;
75+
private readonly: boolean;
7576

76-
static async create(name, module) {
77-
const vfs = new OPFSCoopSyncVFS2(name, module);
77+
static async create(name: string, module, readonly: boolean) {
78+
const vfs = new OPFSCoopSyncVFS2(name, module, readonly);
7879
await Promise.all([
7980
(vfs as any).isReady(),
8081
vfs.#initialize(DEFAULT_TEMPORARY_FILES)
8182
]);
8283
return vfs;
8384
}
8485

85-
constructor(name, module) {
86+
constructor(name: string, module, readonly: boolean) {
8687
super(name, module);
88+
this.readonly = readonly;
8789
}
8890

8991
get #module() {
@@ -143,7 +145,7 @@ export class OPFSCoopSyncVFS2 extends FacadeVFS {
143145
for (let i = 0; i < nTemporaryFiles; i++) {
144146
const tmpFile = await tmpDir.getFileHandle(`${i}.tmp`, { create: true });
145147
const tmpAccessHandle = await (tmpFile as any).createSyncAccessHandle({
146-
mode: 'readwrite-unsafe'
148+
mode: this.readonly ? 'read-only' : 'readwrite'
147149
});
148150
this.unboundAccessHandles.add(tmpAccessHandle);
149151
}
@@ -543,7 +545,9 @@ export class OPFSCoopSyncVFS2 extends FacadeVFS {
543545
if (subPersistentFile) {
544546
subPersistentFile.accessHandle = await (
545547
subPersistentFile.fileHandle as any
546-
).createSyncAccessHandle({ mode: 'readwrite-unsafe' });
548+
).createSyncAccessHandle({
549+
mode: this.readonly ? 'read-only' : 'readwrite'
550+
});
547551
}
548552
})
549553
);
@@ -584,14 +588,18 @@ export class OPFSCoopSyncVFS2 extends FacadeVFS {
584588
setTimeout(notify);
585589

586590
this.log?.(`lock requested: ${lockName}`);
587-
navigator.locks.request(lockName, { mode: 'shared' }, (lock) => {
588-
// We have the lock. Stop asking other connections for it.
589-
this.log?.(`lock acquired: ${lockName}`, lock);
590-
clearInterval(notifyId);
591-
return new Promise<() => void>((res) => {
592-
resolve(res as () => void);
593-
});
594-
});
591+
navigator.locks.request(
592+
lockName,
593+
{ mode: this.readonly ? 'shared' : 'exclusive' },
594+
(lock) => {
595+
// We have the lock. Stop asking other connections for it.
596+
this.log?.(`lock acquired: ${lockName}`, lock);
597+
clearInterval(notifyId);
598+
return new Promise<() => void>((res) => {
599+
resolve(res as () => void);
600+
});
601+
}
602+
);
595603
});
596604
}
597605
}

packages/wa-sqlite-driver/src/pool.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import { SqliteDriverConnectionPool } from '@sqlite-js/driver';
22
import {
33
LazyConnectionPool,
4-
MultiConnectionPool
4+
MultiConnectionPool,
5+
ReadWriteConnectionPool
56
} from '@sqlite-js/driver/util';
67
import {
78
ReserveConnectionOptions,
@@ -32,21 +33,26 @@ export function waSqliteSingleWorker(path: string): SqliteDriverConnectionPool {
3233
}
3334

3435
export function waSqliteWorkerPool(path: string): SqliteDriverConnectionPool {
35-
return new MultiConnectionPool(
36+
return new ReadWriteConnectionPool(
3637
{
3738
async openConnection(
3839
options?: ReserveConnectionOptions & { connectionName?: string }
3940
): Promise<SqliteDriverConnection> {
41+
console.log('openConnection', options);
4042
const connection = new WorkerDriverConnection(
4143
new Worker(new URL('./wa-sqlite-worker.js', import.meta.url), {
4244
type: 'module'
4345
}),
44-
{ path }
46+
{
47+
path,
48+
readonly: options?.readonly ?? false,
49+
connectionName: options?.connectionName
50+
}
4551
);
4652
await connection.open();
4753
return connection;
4854
}
4955
},
50-
{}
56+
{ maxConnections: 5 }
5157
);
5258
}

packages/wa-sqlite-driver/src/wa-sqlite-driver.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ class StatementImpl implements SqliteDriverStatement {
190190
}
191191

192192
async _finalize() {
193+
console.log('finalizing...', this.con.path);
193194
// Wait for these to complete, but ignore any errors.
194195
// TODO: also wait for run/step to complete
195196
await this.preparePromise;
@@ -202,7 +203,7 @@ class StatementImpl implements SqliteDriverStatement {
202203
}
203204

204205
finalize(): void {
205-
this._finalize();
206+
m.runExclusive(() => this._finalize());
206207
}
207208

208209
reset(options?: ResetOptions): void {
@@ -287,6 +288,7 @@ export class WaSqliteConnection implements SqliteDriverConnection {
287288

288289
async close() {
289290
await m.runExclusive(async () => {
291+
console.log('closing...', this.path);
290292
for (let statement of this.statements) {
291293
if (statement.options.persist) {
292294
statement.finalize();

packages/wa-sqlite-driver/src/wa-sqlite-worker.ts

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,22 @@ import { OPFSCoopSyncVFS2 } from './OPFSCoopSyncVFS2';
22
import { sqlite3, module, WaSqliteConnection } from './wa-sqlite-driver';
33
import { setupDriverWorker } from './worker_threads';
44

5-
// Register a custom file system.
6-
const vfs = await OPFSCoopSyncVFS2.create('test.db', module);
7-
// @ts-ignore
8-
sqlite3.vfs_register(vfs as any, true);
9-
5+
let vfs: OPFSCoopSyncVFS2 | null = null;
106
setupDriverWorker({
117
async openConnection(options) {
8+
// Register a custom file system.
9+
if (vfs != null) {
10+
throw new Error('Can only open one connection');
11+
}
12+
console.log('open', options);
13+
vfs = await OPFSCoopSyncVFS2.create(
14+
'test.db',
15+
module,
16+
options.readonly ?? false
17+
);
18+
// @ts-ignore
19+
sqlite3.vfs_register(vfs as any, true);
20+
1221
return await WaSqliteConnection.open(options.path, vfs);
1322
}
1423
});

packages/wa-sqlite-driver/src/worker_threads/WorkerDriverAdapter.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,12 @@ export class WorkerConnectionAdapter implements WorkerDriver {
2626
constructor(public connnection: SqliteDriverConnection) {}
2727

2828
statements = new Map<number, SqliteDriverStatement>();
29+
private promise: Promise<void> = Promise.resolve();
2930

3031
async close() {
31-
await this.connnection.close();
32+
const p = this.promise.then(() => this.connnection.close());
33+
this.promise = p;
34+
return p;
3235
}
3336

3437
private requireStatement(id: number) {
@@ -123,6 +126,14 @@ export class WorkerConnectionAdapter implements WorkerDriver {
123126

124127
async execute<const T extends SqliteCommand[]>(
125128
commands: T
129+
): Promise<InferBatchResult<T>> {
130+
const p = this.promise.then(() => this._execute(commands));
131+
this.promise = p.then(() => {});
132+
return p;
133+
}
134+
135+
async _execute<const T extends SqliteCommand[]>(
136+
commands: T
126137
): Promise<InferBatchResult<T>> {
127138
let results: SqliteCommandResponse[] = [];
128139

packages/wa-sqlite-driver/test/src/concurrency.test.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ describe('concurrency tests', () => {
4242
for (let i = 0; i < 5; i++) {
4343
const p = (async () => {
4444
const start = Date.now();
45-
await using connection = await driver.reserveConnection();
45+
await using connection = await driver.reserveConnection({
46+
readonly: true
47+
});
4648

4749
using b = connection.prepare('begin immediate');
4850
await b.step();
@@ -59,5 +61,27 @@ describe('concurrency tests', () => {
5961
promises.push(p);
6062
}
6163
await Promise.all(promises);
64+
65+
for (let i = 0; i < 5; i++) {
66+
const p = (async () => {
67+
const start = Date.now();
68+
await using connection = await driver.reserveConnection({
69+
readonly: true
70+
});
71+
72+
using b = connection.prepare('begin immediate');
73+
await b.step();
74+
using s = connection.prepare('select * from test_data');
75+
const { rows } = await s.step();
76+
77+
expect(rows).toEqual([{ id: 1, data: 'test' }]);
78+
await new Promise((resolve) => setTimeout(resolve, 500));
79+
80+
using e = connection.prepare('commit');
81+
await e.step();
82+
console.log('tx done in', Date.now() - start);
83+
})();
84+
promises.push(p);
85+
}
6286
});
6387
});

0 commit comments

Comments
 (0)