Skip to content

Commit 8644fe2

Browse files
committed
WIP: Replace key with subkey.
1 parent 4168795 commit 8644fe2

File tree

3 files changed

+114
-13
lines changed

3 files changed

+114
-13
lines changed

crates/core/src/migrations.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::error::{PSResult, SQLiteError};
1212
use crate::fix_data::apply_v035_fix;
1313
use crate::sync::BucketPriority;
1414

15-
pub const LATEST_VERSION: i32 = 10;
15+
pub const LATEST_VERSION: i32 = 11;
1616

1717
pub fn powersync_migrate(
1818
ctx: *mut sqlite::context,
@@ -386,5 +386,34 @@ INSERT INTO ps_migration(id, down_migrations) VALUES (10, json_array(
386386
.into_db_result(local_db)?;
387387
}
388388

389+
if current_version < 11 && target_version >= 11 {
390+
let stmt = "\
391+
CREATE TABLE ps_oplog_new(
392+
bucket INTEGER NOT NULL,
393+
op_id INTEGER NOT NULL,
394+
row_type TEXT NOT NULL,
395+
row_id TEXT NOT NULL,
396+
subkey TEXT NOT NULL,
397+
data TEXT,
398+
hash INTEGER NOT NULL) STRICT;
399+
400+
INSERT INTO ps_oplog_new (bucket, op_id, row_type, row_id, subkey, data, hash)
401+
SELECT bucket, op_id, row_type, row_id, null, data, hash FROM ps_oplog;
402+
403+
DROP TABLE ps_oplog;
404+
ALTER TABLE ps_oplog_new RENAME TO ps_oplog;
405+
406+
CREATE INDEX ps_oplog_row ON ps_oplog (row_type, row_id);
407+
CREATE INDEX ps_oplog_opid ON ps_oplog (bucket, op_id);
408+
CREATE INDEX ps_oplog_key ON ps_oplog (bucket, row_type, row_id, subkey);
409+
410+
INSERT INTO ps_migration(id, down_migrations) VALUES(11, json_array(
411+
json_object('sql', 'DELETE FROM ps_migration WHERE id >= 11')
412+
));
413+
";
414+
415+
local_db.exec_safe(stmt).into_db_result(local_db)?;
416+
}
417+
389418
Ok(())
390419
}

crates/core/src/sync/operations.rs

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,16 @@ pub fn insert_bucket_operations(
3838
"\
3939
DELETE FROM ps_oplog
4040
WHERE unlikely(ps_oplog.bucket = ?1)
41-
AND ps_oplog.key = ?2
41+
AND ps_oplog.row_type = ?2
42+
AND ps_oplog.row_id = ?3
43+
AND ps_oplog.subkey = ?4
4244
RETURNING op_id, hash",
4345
)?;
4446
supersede_statement.bind_int64(1, bucket_id)?;
4547

4648
// language=SQLite
4749
let insert_statement = db.prepare_v2("\
48-
INSERT INTO ps_oplog(bucket, op_id, key, row_type, row_id, data, hash) VALUES (?, ?, ?, ?, ?, ?, ?)")?;
50+
INSERT INTO ps_oplog(bucket, op_id, subkey, row_type, row_id, data, hash) VALUES (?, ?, ?, ?, ?, ?, ?)")?;
4951
insert_statement.bind_int64(1, bucket_id)?;
5052

5153
let updated_row_statement = db.prepare_v2(
@@ -70,15 +72,25 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
7072
added_ops += 1;
7173

7274
if op == OpType::PUT || op == OpType::REMOVE {
73-
let key: String;
74-
if let (Some(object_type), Some(object_id)) = (object_type, object_id) {
75-
let subkey = line.subkey.as_ref().map(|i| &**i).unwrap_or("null");
76-
key = format!("{}/{}/{}", &object_type, &object_id, subkey);
75+
let subkey = line.subkey.as_ref().map(|i| &**i);
76+
77+
if let Some(subkey) = subkey {
78+
supersede_statement.bind_text(4, &subkey, sqlite::Destructor::STATIC)?;
7779
} else {
78-
key = String::from("");
80+
supersede_statement.bind_text(4, "", sqlite::Destructor::STATIC)?;
7981
}
8082

81-
supersede_statement.bind_text(2, &key, sqlite::Destructor::STATIC)?;
83+
if let Some(object_type) = object_type {
84+
supersede_statement.bind_text(2, &object_type, sqlite::Destructor::STATIC)?;
85+
} else {
86+
supersede_statement.bind_text(2, "", sqlite::Destructor::STATIC)?;
87+
}
88+
89+
if let Some(object_id) = object_id {
90+
supersede_statement.bind_text(3, &object_id, sqlite::Destructor::STATIC)?;
91+
} else {
92+
supersede_statement.bind_text(3, "", sqlite::Destructor::STATIC)?;
93+
}
8294

8395
let mut superseded = false;
8496

@@ -124,10 +136,10 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
124136
}
125137

126138
insert_statement.bind_int64(2, op_id)?;
127-
if key != "" {
128-
insert_statement.bind_text(3, &key, sqlite::Destructor::STATIC)?;
139+
if let Some(subkey) = subkey {
140+
insert_statement.bind_text(3, &subkey, sqlite::Destructor::STATIC)?;
129141
} else {
130-
insert_statement.bind_null(3)?;
142+
insert_statement.bind_text(3, "", sqlite::Destructor::STATIC)?;
131143
}
132144

133145
if let (Some(object_type), Some(object_id)) = (object_type, object_id) {

dart/test/utils/migration_fixtures.dart

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/// The current database version
2-
const databaseVersion = 10;
2+
const databaseVersion = 11;
33

44
/// This is the base database state that we expect at various schema versions.
55
/// Generated by loading the specific library version, and exporting the schema.
@@ -355,6 +355,54 @@ const expectedState = <int, String>{
355355
;INSERT INTO ps_migration(id, down_migrations) VALUES(9, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_at_last"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_since_last"},{"sql":"DELETE FROM ps_migration WHERE id >= 9"}]')
356356
;INSERT INTO ps_migration(id, down_migrations) VALUES(10, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"DELETE FROM ps_migration WHERE id >= 10"}]')
357357
''',
358+
11: r'''
359+
;CREATE TABLE ps_buckets(
360+
id INTEGER PRIMARY KEY,
361+
name TEXT NOT NULL,
362+
last_applied_op INTEGER NOT NULL DEFAULT 0,
363+
last_op INTEGER NOT NULL DEFAULT 0,
364+
target_op INTEGER NOT NULL DEFAULT 0,
365+
add_checksum INTEGER NOT NULL DEFAULT 0,
366+
op_checksum INTEGER NOT NULL DEFAULT 0,
367+
pending_delete INTEGER NOT NULL DEFAULT 0
368+
, count_at_last INTEGER NOT NULL DEFAULT 0, count_since_last INTEGER NOT NULL DEFAULT 0) STRICT
369+
;CREATE TABLE ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT, tx_id INTEGER)
370+
;CREATE TABLE ps_kv(key TEXT PRIMARY KEY NOT NULL, value BLOB)
371+
;CREATE TABLE ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT)
372+
;CREATE TABLE "ps_oplog"(
373+
bucket INTEGER NOT NULL,
374+
op_id INTEGER NOT NULL,
375+
row_type TEXT NOT NULL,
376+
row_id TEXT NOT NULL,
377+
subkey TEXT NOT NULL,
378+
data TEXT,
379+
hash INTEGER NOT NULL) STRICT
380+
;CREATE TABLE ps_sync_state (
381+
priority INTEGER NOT NULL PRIMARY KEY,
382+
last_synced_at TEXT NOT NULL
383+
) STRICT
384+
;CREATE TABLE ps_tx(id INTEGER PRIMARY KEY NOT NULL, current_tx INTEGER, next_tx INTEGER)
385+
;CREATE TABLE ps_untyped(type TEXT NOT NULL, id TEXT NOT NULL, data TEXT, PRIMARY KEY (type, id))
386+
;CREATE TABLE ps_updated_rows(
387+
row_type TEXT,
388+
row_id TEXT,
389+
PRIMARY KEY(row_type, row_id)) STRICT, WITHOUT ROWID
390+
;CREATE UNIQUE INDEX ps_buckets_name ON ps_buckets (name)
391+
;CREATE INDEX ps_oplog_key ON ps_oplog (bucket, row_type, row_id, subkey)
392+
;CREATE INDEX ps_oplog_opid ON ps_oplog (bucket, op_id)
393+
;CREATE INDEX ps_oplog_row ON ps_oplog (row_type, row_id)
394+
;INSERT INTO ps_migration(id, down_migrations) VALUES(1, null)
395+
;INSERT INTO ps_migration(id, down_migrations) VALUES(2, '[{"sql":"DELETE FROM ps_migration WHERE id >= 2","params":[]},{"sql":"DROP TABLE ps_tx","params":[]},{"sql":"ALTER TABLE ps_crud DROP COLUMN tx_id","params":[]}]')
396+
;INSERT INTO ps_migration(id, down_migrations) VALUES(3, '[{"sql":"DELETE FROM ps_migration WHERE id >= 3"},{"sql":"DROP TABLE ps_kv"}]')
397+
;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]')
398+
;INSERT INTO ps_migration(id, down_migrations) VALUES(5, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_5"},{"sql":"ALTER TABLE ps_oplog RENAME TO ps_oplog_5"},{"sql":"CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)"},{"sql":"INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5"},{"sql":"CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)"},{"sql":"CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0"},{"sql":"CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)"},{"sql":"CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket"},{"sql":"DROP TABLE ps_oplog_5"},{"sql":"DROP TABLE ps_buckets_5"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r"},{"sql":"INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)"},{"sql":"DROP TABLE ps_updated_rows"},{"sql":"DELETE FROM ps_migration WHERE id >= 5"}]')
399+
;INSERT INTO ps_migration(id, down_migrations) VALUES(6, '[{"sql":"DELETE FROM ps_migration WHERE id >= 6"}]')
400+
;INSERT INTO ps_migration(id, down_migrations) VALUES(7, '[{"sql":"INSERT OR REPLACE INTO ps_kv(key, value) SELECT ''last_synced_at'', last_synced_at FROM ps_sync_state WHERE priority = 2147483647"},{"sql":"DROP TABLE ps_sync_state"},{"sql":"DELETE FROM ps_migration WHERE id >= 7"}]')
401+
;INSERT INTO ps_migration(id, down_migrations) VALUES(8, '[{"sql":"ALTER TABLE ps_sync_state RENAME TO ps_sync_state_new"},{"sql":"CREATE TABLE ps_sync_state (\n priority INTEGER NOT NULL,\n last_synced_at TEXT NOT NULL\n) STRICT"},{"sql":"INSERT INTO ps_sync_state SELECT * FROM ps_sync_state_new"},{"sql":"DROP TABLE ps_sync_state_new"},{"sql":"DELETE FROM ps_migration WHERE id >= 8"}]')
402+
;INSERT INTO ps_migration(id, down_migrations) VALUES(9, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_at_last"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_since_last"},{"sql":"DELETE FROM ps_migration WHERE id >= 9"}]')
403+
;INSERT INTO ps_migration(id, down_migrations) VALUES(10, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"DELETE FROM ps_migration WHERE id >= 10"}]')
404+
;INSERT INTO ps_migration(id, down_migrations) VALUES(11, '[{"sql":"DELETE FROM ps_migration WHERE id >= 11"}]')
405+
'''
358406
};
359407

360408
final finalState = expectedState[databaseVersion]!;
@@ -456,6 +504,17 @@ const data1 = <int, String>{
456504
(2, 3, 'lists', 'l1', '', '{}', 3)
457505
;INSERT INTO ps_updated_rows(row_type, row_id) VALUES
458506
('lists', 'l2')
507+
''',
508+
11: r'''
509+
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last) VALUES
510+
(1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0),
511+
(2, 'b2', 0, 0, 0, 1005, 3, 0, 0, 0)
512+
;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, subkey, data, hash) VALUES
513+
(1, 1, 'todos', 't1', '', '{}', 100),
514+
(1, 2, 'todos', 't2', '', '{}', 20),
515+
(2, 3, 'lists', 'l1', '', '{}', 3)
516+
;INSERT INTO ps_updated_rows(row_type, row_id) VALUES
517+
('lists', 'l2')
459518
'''
460519
};
461520

@@ -501,6 +560,7 @@ final dataDown1 = <int, String>{
501560
7: data1[5]!,
502561
8: data1[5]!,
503562
9: data1[9]!,
563+
10: data1[10]!,
504564
};
505565

506566
final finalData1 = data1[databaseVersion]!;

0 commit comments

Comments
 (0)