Skip to content

Commit 43cfd09

Browse files
committed
Partial wa-sqlite worker implementation.
1 parent a183c89 commit 43cfd09

File tree

16 files changed

+727
-36
lines changed

16 files changed

+727
-36
lines changed

packages/driver/src/util/LazyConnectionPool.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,12 @@ export class LazyConnectionPool implements SqliteDriverConnectionPool {
3333
}
3434

3535
async close(): Promise<void> {
36+
try {
3637
await this.initPromise;
3738
await this.connection!.close();
39+
} catch (e) {
40+
console.error(e);
41+
}
3842
}
3943

4044
onUpdate(

packages/wa-sqlite-driver/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
"author": "",
1616
"license": "MIT",
1717
"dependencies": {
18-
"@journeyapps/wa-sqlite": "^0.3.0",
18+
"@journeyapps/wa-sqlite": "^1.3.2",
1919
"@sqlite-js/driver": "workspace:^",
2020
"async-mutex": "^0.5.0"
2121
},
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
export * from './wa-sqlite-driver.js';
1+
export * from './pool.js';
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import { SqliteDriverConnectionPool } from '@sqlite-js/driver';
2+
import { LazyConnectionPool } from '@sqlite-js/driver/util';
3+
import { WorkerDriverConnection } from './worker_threads';
4+
// import { WaSqliteConnection } from './wa-sqlite-driver';
5+
6+
// export function waSqlitePool(path: string): SqliteDriverConnectionPool {
7+
// return new LazyConnectionPool(async () => {
8+
// return await WaSqliteConnection.open(path);
9+
// });
10+
// }
11+
12+
export function waSqliteWorkerPool(path: string): SqliteDriverConnectionPool {
13+
return new LazyConnectionPool(async () => {
14+
return new WorkerDriverConnection(
15+
new Worker(new URL('./wa-sqlite-worker.js', import.meta.url), {
16+
type: 'module'
17+
}),
18+
{ path }
19+
);
20+
// return await WaSqliteConnection.open(path);
21+
});
22+
}

packages/wa-sqlite-driver/src/wa-sqlite-driver.ts

Lines changed: 16 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,11 @@ import {
1616
import { LazyConnectionPool } from '@sqlite-js/driver/util';
1717
import { SqliteError } from '@sqlite-js/driver';
1818
import * as mutex from 'async-mutex';
19+
import { WorkerDriverConnection } from './worker_threads';
1920

2021
// Initialize SQLite.
21-
const module = await SQLiteESMFactory();
22-
const sqlite3 = SQLite.Factory(module);
23-
24-
export function waSqlitePool(path: string): SqliteDriverConnectionPool {
25-
return new LazyConnectionPool(async () => {
26-
return await WaSqliteConnection.open(path);
27-
});
28-
}
29-
30-
// // Register a custom file system.
31-
// const vfs = await IDBBatchAtomicVFS.create('hello', module);
32-
// // @ts-ignore
33-
// sqlite3.vfs_register(vfs, true);
22+
export const module = await SQLiteESMFactory();
23+
export const sqlite3 = SQLite.Factory(module);
3424

3525
const m = new mutex.Mutex();
3626

@@ -39,7 +29,6 @@ class StatementImpl implements SqliteDriverStatement {
3929
private bindPromise?: Promise<{ error: SqliteError | null }>;
4030
private columns: string[] = [];
4131

42-
private stringRef?: number;
4332
private statementRef?: number;
4433
private done = false;
4534

@@ -55,17 +44,21 @@ class StatementImpl implements SqliteDriverStatement {
5544
return await m.runExclusive(() => this._prepare());
5645
}
5746

47+
private async getStatement() {
48+
const statementsIter = sqlite3.statements(this.db, this.source, {
49+
unscoped: true
50+
});
51+
for await (let statement of statementsIter) {
52+
return statement;
53+
}
54+
throw new Error(`No SQL statements in: ${this.source}`);
55+
}
56+
5857
async _prepare() {
5958
try {
60-
this.stringRef = sqlite3.str_new(this.db, this.source);
61-
const strValue = sqlite3.str_value(this.stringRef);
62-
const r = await sqlite3.prepare_v2(this.db, strValue);
63-
if (r == null) {
64-
throw new Error('could not prepare');
65-
}
66-
67-
this.statementRef = r?.stmt;
68-
this.columns = sqlite3.column_names(this.statementRef!);
59+
const statement = await this.getStatement();
60+
this.statementRef = statement;
61+
this.columns = sqlite3.column_names(statement);
6962
return { error: null };
7063
} catch (e: any) {
7164
return {
@@ -207,10 +200,6 @@ class StatementImpl implements SqliteDriverStatement {
207200
sqlite3.finalize(this.statementRef);
208201
this.statementRef = undefined;
209202
}
210-
if (this.stringRef) {
211-
sqlite3.str_finish(this.stringRef);
212-
this.stringRef = undefined;
213-
}
214203
}
215204

216205
finalize(): void {
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import { sqlite3, module, WaSqliteConnection } from './wa-sqlite-driver';
2+
import { setupDriverWorker } from './worker_threads';
3+
import { IDBBatchAtomicVFS } from '@journeyapps/wa-sqlite/src/examples/IDBBatchAtomicVFS.js';
4+
import { OPFSCoopSyncVFS } from '@journeyapps/wa-sqlite/src/examples/OPFSCoopSyncVFS.js';
5+
6+
// Register a custom file system.
7+
// @ts-ignore
8+
const vfs = await OPFSCoopSyncVFS.create('test.db', module, {
9+
lockPolicy: 'exclusive'
10+
});
11+
// @ts-ignore
12+
sqlite3.vfs_register(vfs as any, true);
13+
14+
setupDriverWorker({
15+
async openConnection(options) {
16+
return await WaSqliteConnection.open(options.path);
17+
}
18+
});
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
import {
2+
SqliteChanges,
3+
SqliteDriverConnection,
4+
SqliteDriverStatement,
5+
SqliteStepResult,
6+
UpdateListener
7+
} from '@sqlite-js/driver';
8+
import { mapError } from '@sqlite-js/driver/util';
9+
import {
10+
InferBatchResult,
11+
SqliteBind,
12+
SqliteCommand,
13+
SqliteCommandResponse,
14+
SqliteCommandType,
15+
SqliteFinalize,
16+
SqliteParse,
17+
SqliteParseResult,
18+
SqlitePrepare,
19+
SqliteReset,
20+
SqliteRun,
21+
SqliteStep,
22+
WorkerDriver
23+
} from './async-commands.js';
24+
25+
export class WorkerConnectionAdapter implements WorkerDriver {
26+
constructor(public connnection: SqliteDriverConnection) {}
27+
28+
statements = new Map<number, SqliteDriverStatement>();
29+
30+
async close() {
31+
await this.connnection.close();
32+
}
33+
34+
private requireStatement(id: number) {
35+
const statement = this.statements.get(id);
36+
if (statement == null) {
37+
throw new Error(`statement not found: ${id}`);
38+
}
39+
return statement;
40+
}
41+
42+
private _prepare(command: SqlitePrepare): void {
43+
const { id, sql } = command;
44+
45+
const existing = this.statements.get(id);
46+
if (existing != null) {
47+
throw new Error(
48+
`Replacing statement ${id} without finalizing the previous one`
49+
);
50+
}
51+
52+
const statement = this.connnection.prepare(sql, {
53+
bigint: command.bigint,
54+
persist: command.persist,
55+
rawResults: command.rawResults
56+
});
57+
this.statements.set(id, statement);
58+
}
59+
60+
private async _parse(command: SqliteParse): Promise<SqliteParseResult> {
61+
const { id } = command;
62+
const statement = this.requireStatement(id);
63+
return { columns: await statement.getColumns() };
64+
}
65+
66+
private _bind(command: SqliteBind): void {
67+
const { id, parameters } = command;
68+
const statement = this.requireStatement(id);
69+
statement.bind(parameters);
70+
}
71+
72+
private _step(command: SqliteStep): Promise<SqliteStepResult> {
73+
const { id, n, requireTransaction } = command;
74+
const statement = this.requireStatement(id);
75+
return statement.step(n, { requireTransaction });
76+
}
77+
78+
private _run(command: SqliteRun): Promise<SqliteChanges> {
79+
const { id } = command;
80+
const statement = this.requireStatement(id);
81+
return statement.run(command);
82+
}
83+
84+
private _reset(command: SqliteReset): void {
85+
const { id } = command;
86+
const statement = this.requireStatement(id);
87+
statement.reset(command);
88+
}
89+
90+
private _finalize(command: SqliteFinalize): void {
91+
const { id } = command;
92+
const statement = this.requireStatement(id);
93+
statement.finalize();
94+
this.statements.delete(id);
95+
}
96+
97+
private async _executeCommand(command: SqliteCommand): Promise<any> {
98+
switch (command.type) {
99+
case SqliteCommandType.prepare:
100+
return this._prepare(command);
101+
case SqliteCommandType.bind:
102+
return this._bind(command);
103+
case SqliteCommandType.step:
104+
return this._step(command);
105+
case SqliteCommandType.run:
106+
return this._run(command);
107+
case SqliteCommandType.reset:
108+
return this._reset(command);
109+
case SqliteCommandType.finalize:
110+
return this._finalize(command);
111+
case SqliteCommandType.parse:
112+
return this._parse(command);
113+
case SqliteCommandType.changes:
114+
return this.connnection.getLastChanges();
115+
default:
116+
throw new Error(`Unknown command: ${command.type}`);
117+
}
118+
}
119+
120+
async execute<const T extends SqliteCommand[]>(
121+
commands: T
122+
): Promise<InferBatchResult<T>> {
123+
let results: SqliteCommandResponse[] = [];
124+
125+
for (let command of commands) {
126+
try {
127+
const result = await this._executeCommand(command);
128+
results.push({ value: result });
129+
} catch (e: any) {
130+
const err = mapError(e);
131+
results.push({
132+
error: { message: err.message, stack: err.stack, code: err.code }
133+
});
134+
}
135+
}
136+
return results as InferBatchResult<T>;
137+
}
138+
139+
onUpdate(
140+
listener: UpdateListener,
141+
options?:
142+
| { tables?: string[] | undefined; batchLimit?: number | undefined }
143+
| undefined
144+
): () => void {
145+
throw new Error('Not implemented yet');
146+
}
147+
}

0 commit comments

Comments
 (0)