From 2f6dea4cb818173fc4cdf3402d2b865e2d7d21a0 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 23 Jun 2025 11:42:28 -0400 Subject: [PATCH 1/3] New crud vtab: Only update `$local` bucket once --- crates/core/src/crud_vtab.rs | 46 ++++++++++++++++++++++++++---------- dart/test/crud_test.dart | 45 +++++++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+), 12 deletions(-) diff --git a/crates/core/src/crud_vtab.rs b/crates/core/src/crud_vtab.rs index 5da65c16..741d488d 100644 --- a/crates/core/src/crud_vtab.rs +++ b/crates/core/src/crud_vtab.rs @@ -55,7 +55,7 @@ enum CrudTransactionMode { Simple { stmt: ManagedStmt, set_updated_rows: ManagedStmt, - update_local_bucket: ManagedStmt, + had_writes: bool, }, } @@ -73,13 +73,13 @@ impl VirtualTable { } } - fn handle_insert(&self, args: &[*mut sqlite::value]) -> Result<(), SQLiteError> { + fn handle_insert(&mut self, args: &[*mut sqlite::value]) -> Result<(), SQLiteError> { let current_tx = self .current_tx - .as_ref() + .as_mut() .ok_or_else(|| SQLiteError(ResultCode::MISUSE, Some(String::from("No tx_id"))))?; - match ¤t_tx.mode { + match &mut current_tx.mode { CrudTransactionMode::Manual { stmt } => { // Columns are (data TEXT, options INT HIDDEN) let data = args[0].text(); @@ -96,7 +96,7 @@ impl VirtualTable { CrudTransactionMode::Simple { stmt, set_updated_rows, - update_local_bucket, + had_writes, } => { // Columns are (op TEXT, id TEXT, type TEXT, data TEXT, old_values TEXT, metadata TEXT, options INT HIDDEN) let flags = match args[6].value_type() { @@ -154,7 +154,7 @@ impl VirtualTable { set_updated_rows.bind_text(1, row_type, sqlite::Destructor::STATIC)?; set_updated_rows.bind_text(2, id, sqlite::Destructor::STATIC)?; set_updated_rows.exec()?; - update_local_bucket.exec()?; + *had_writes = true; } } @@ -184,7 +184,7 @@ impl VirtualTable { "INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?, ?)", 0, )?, - update_local_bucket: db.prepare_v3(formatcp!("INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) VALUES('$local', 0, {MAX_OP_ID})"), 0)?, + had_writes: false, } } else { const SQL: &str = formatcp!( @@ -206,7 +206,30 @@ SELECT * FROM insertion WHERE (NOT (?3 & {})) OR data->>'op' != 'PATCH' OR data- Ok(()) } - fn end_transaction(&mut self) { + fn end_transaction(&mut self) -> Result<(), SQLiteError> { + let tx = self.current_tx.take(); + if let Some(tx) = tx { + let needs_local_bucket_update = match tx.mode { + CrudTransactionMode::Manual { .. } => { + // In manual mode, users need to update the $local bucket themselves. + false + } + CrudTransactionMode::Simple { + had_writes, + stmt: _, + set_updated_rows: _, + } => had_writes, + }; + + if needs_local_bucket_update { + self.db.exec_safe(formatcp!("INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) VALUES('$local', 0, {MAX_OP_ID})"))?; + } + } + + Ok(()) + } + + fn clear_transaction_state(&mut self) { self.current_tx = None; } } @@ -269,13 +292,12 @@ extern "C" fn begin(vtab: *mut sqlite::vtab) -> c_int { extern "C" fn commit(vtab: *mut sqlite::vtab) -> c_int { let tab = unsafe { &mut *(vtab.cast::()) }; - tab.end_transaction(); - ResultCode::OK as c_int + vtab_result(vtab, tab.end_transaction()) } extern "C" fn rollback(vtab: *mut sqlite::vtab) -> c_int { let tab = unsafe { &mut *(vtab.cast::()) }; - tab.end_transaction(); + tab.clear_transaction_state(); // ps_tx will be rolled back automatically ResultCode::OK as c_int } @@ -295,7 +317,7 @@ extern "C" fn update( ResultCode::MISUSE as c_int } else if rowid.value_type() == sqlite::ColumnType::Null { // INSERT - let tab = unsafe { &*(vtab.cast::()) }; + let tab = unsafe { &mut *(vtab.cast::()) }; let result = tab.handle_insert(&args[2..]); vtab_result(vtab, result) } else { diff --git a/dart/test/crud_test.dart b/dart/test/crud_test.dart index 8c50c2ad..558997df 100644 --- a/dart/test/crud_test.dart +++ b/dart/test/crud_test.dart @@ -322,6 +322,51 @@ void main() { '{"op":"PUT","id":"foo","type":"users","data":{"my":"value"},"old":{"previous":"value"}}', }); }); + + test('resets state after commit', () { + db.execute('BEGIN'); + db.execute( + 'INSERT INTO powersync_crud (op, id, type) VALUES (?, ?, ?)', [ + 'DELETE', + 'foo', + 'users', + ]); + db.execute('commit'); + + db.execute( + 'INSERT INTO powersync_crud (op, id, type) VALUES (?, ?, ?)', [ + 'DELETE', + 'foo', + 'users', + ]); + expect(db.select('SELECT * FROM ps_crud').map((r) => r['tx_id']), + [1, 2]); + }); + + test('resets state after rollback', () { + db.execute('BEGIN'); + db.execute( + 'INSERT INTO powersync_crud (op, id, type) VALUES (?, ?, ?)', [ + 'DELETE', + 'foo', + 'users', + ]); + db.execute('rollback'); + + db.execute( + 'INSERT INTO powersync_crud (op, id, type) VALUES (?, ?, ?)', [ + 'DELETE', + 'foo2', + 'users', + ]); + expect(db.select('SELECT * FROM ps_crud'), [ + { + 'id': 1, + 'data': '{"op":"DELETE","id":"foo2","type":"users"}', + 'tx_id': 1, + } + ]); + }); }); }); From 8b20c4b0750f2a44b53a002b7b74a67fdac06fe5 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 23 Jun 2025 12:00:41 -0400 Subject: [PATCH 2/3] Prepare on demand --- crates/core/src/crud_vtab.rs | 118 ++++++++++++++++++++++------------- 1 file changed, 75 insertions(+), 43 deletions(-) diff --git a/crates/core/src/crud_vtab.rs b/crates/core/src/crud_vtab.rs index 741d488d..beb15b4c 100644 --- a/crates/core/src/crud_vtab.rs +++ b/crates/core/src/crud_vtab.rs @@ -49,14 +49,20 @@ struct ActiveCrudTransaction { } enum CrudTransactionMode { - Manual { - stmt: ManagedStmt, - }, - Simple { - stmt: ManagedStmt, - set_updated_rows: ManagedStmt, - had_writes: bool, - }, + Manual(ManualCrudTransactionMode), + Simple(SimpleCrudTransactionMode), +} + +#[derive(Default)] +struct ManualCrudTransactionMode { + stmt: Option, +} + +#[derive(Default)] +struct SimpleCrudTransactionMode { + stmt: Option, + set_updated_rows: Option, + had_writes: bool, } impl VirtualTable { @@ -78,9 +84,10 @@ impl VirtualTable { .current_tx .as_mut() .ok_or_else(|| SQLiteError(ResultCode::MISUSE, Some(String::from("No tx_id"))))?; + let db = self.db; match &mut current_tx.mode { - CrudTransactionMode::Manual { stmt } => { + CrudTransactionMode::Manual(manual) => { // Columns are (data TEXT, options INT HIDDEN) let data = args[0].text(); let flags = match args[1].value_type() { @@ -88,16 +95,13 @@ impl VirtualTable { _ => TableInfoFlags(args[1].int() as u32), }; + let stmt = manual.raw_crud_statement(db)?; stmt.bind_int64(1, current_tx.tx_id)?; stmt.bind_text(2, data, sqlite::Destructor::STATIC)?; stmt.bind_int(3, flags.0 as i32)?; stmt.exec()?; } - CrudTransactionMode::Simple { - stmt, - set_updated_rows, - had_writes, - } => { + CrudTransactionMode::Simple(simple) => { // Columns are (op TEXT, id TEXT, type TEXT, data TEXT, old_values TEXT, metadata TEXT, options INT HIDDEN) let flags = match args[6].value_type() { sqlite_nostd::ColumnType::Null => TableInfoFlags::default(), @@ -133,6 +137,7 @@ impl VirtualTable { // First, we insert into ps_crud like the manual vtab would too. We have to create // the JSON out of the individual components for that. + let stmt = simple.raw_crud_statement(db)?; stmt.bind_int64(1, current_tx.tx_id)?; let serialized = serde_json::to_string(&CrudEntry { @@ -151,10 +156,11 @@ impl VirtualTable { stmt.exec()?; // However, we also set ps_updated_rows and update the $local bucket + let set_updated_rows = simple.set_updated_rows_statement(db)?; set_updated_rows.bind_text(1, row_type, sqlite::Destructor::STATIC)?; set_updated_rows.bind_text(2, id, sqlite::Destructor::STATIC)?; set_updated_rows.exec()?; - *had_writes = true; + simple.had_writes = true; } } @@ -176,30 +182,9 @@ impl VirtualTable { self.current_tx = Some(ActiveCrudTransaction { tx_id, mode: if self.is_simple { - CrudTransactionMode::Simple { - // language=SQLite - stmt: db.prepare_v3("INSERT INTO ps_crud(tx_id, data) VALUES (?, ?)", 0)?, - // language=SQLite - set_updated_rows: db.prepare_v3( - "INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?, ?)", - 0, - )?, - had_writes: false, - } + CrudTransactionMode::Simple(Default::default()) } else { - const SQL: &str = formatcp!( - "\ -WITH insertion (tx_id, data) AS (VALUES (?1, ?2)) -INSERT INTO ps_crud(tx_id, data) -SELECT * FROM insertion WHERE (NOT (?3 & {})) OR data->>'op' != 'PATCH' OR data->'data' != '{{}}'; - ", - TableInfoFlags::IGNORE_EMPTY_UPDATE - ); - - let insert_statement = db.prepare_v3(SQL, 0)?; - CrudTransactionMode::Manual { - stmt: insert_statement, - } + CrudTransactionMode::Manual(Default::default()) }, }); @@ -214,11 +199,7 @@ SELECT * FROM insertion WHERE (NOT (?3 & {})) OR data->>'op' != 'PATCH' OR data- // In manual mode, users need to update the $local bucket themselves. false } - CrudTransactionMode::Simple { - had_writes, - stmt: _, - set_updated_rows: _, - } => had_writes, + CrudTransactionMode::Simple(simple) => simple.had_writes, }; if needs_local_bucket_update { @@ -234,6 +215,57 @@ SELECT * FROM insertion WHERE (NOT (?3 & {})) OR data->>'op' != 'PATCH' OR data- } } +impl ManualCrudTransactionMode { + fn raw_crud_statement(&mut self, db: *mut sqlite::sqlite3) -> Result<&ManagedStmt, ResultCode> { + prepare_lazy(&mut self.stmt, || { + const SQL: &str = formatcp!( + "\ +WITH insertion (tx_id, data) AS (VALUES (?1, ?2)) +INSERT INTO ps_crud(tx_id, data) +SELECT * FROM insertion WHERE (NOT (?3 & {})) OR data->>'op' != 'PATCH' OR data->'data' != '{{}}'; + ", + TableInfoFlags::IGNORE_EMPTY_UPDATE + ); + + db.prepare_v3(SQL, 0) + }) + } +} + +impl SimpleCrudTransactionMode { + fn raw_crud_statement(&mut self, db: *mut sqlite::sqlite3) -> Result<&ManagedStmt, ResultCode> { + prepare_lazy(&mut self.stmt, || { + // language=SQLite + db.prepare_v3("INSERT INTO ps_crud(tx_id, data) VALUES (?, ?)", 0) + }) + } + + fn set_updated_rows_statement( + &mut self, + db: *mut sqlite::sqlite3, + ) -> Result<&ManagedStmt, ResultCode> { + prepare_lazy(&mut self.set_updated_rows, || { + // language=SQLite + db.prepare_v3( + "INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?, ?)", + 0, + ) + }) + } +} + +/// A variant of `Option.get_or_insert` that handles insertions returning errors. +fn prepare_lazy( + stmt: &mut Option, + prepare: impl FnOnce() -> Result, +) -> Result<&ManagedStmt, ResultCode> { + if let None = stmt { + *stmt = Some(prepare()?); + } + + return Ok(unsafe { stmt.as_ref().unwrap_unchecked() }); +} + extern "C" fn connect( db: *mut sqlite::sqlite3, _aux: *mut c_void, From 05833857173fc77256cd38dc0fbf8baf0605354d Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 23 Jun 2025 12:25:37 -0400 Subject: [PATCH 3/3] Update local on first write --- crates/core/src/crud_vtab.rs | 37 ++++++++++++++---------------------- 1 file changed, 14 insertions(+), 23 deletions(-) diff --git a/crates/core/src/crud_vtab.rs b/crates/core/src/crud_vtab.rs index beb15b4c..d5363901 100644 --- a/crates/core/src/crud_vtab.rs +++ b/crates/core/src/crud_vtab.rs @@ -160,7 +160,7 @@ impl VirtualTable { set_updated_rows.bind_text(1, row_type, sqlite::Destructor::STATIC)?; set_updated_rows.bind_text(2, id, sqlite::Destructor::STATIC)?; set_updated_rows.exec()?; - simple.had_writes = true; + simple.record_local_write(db)?; } } @@ -191,26 +191,7 @@ impl VirtualTable { Ok(()) } - fn end_transaction(&mut self) -> Result<(), SQLiteError> { - let tx = self.current_tx.take(); - if let Some(tx) = tx { - let needs_local_bucket_update = match tx.mode { - CrudTransactionMode::Manual { .. } => { - // In manual mode, users need to update the $local bucket themselves. - false - } - CrudTransactionMode::Simple(simple) => simple.had_writes, - }; - - if needs_local_bucket_update { - self.db.exec_safe(formatcp!("INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) VALUES('$local', 0, {MAX_OP_ID})"))?; - } - } - - Ok(()) - } - - fn clear_transaction_state(&mut self) { + fn end_transaction(&mut self) { self.current_tx = None; } } @@ -252,6 +233,15 @@ impl SimpleCrudTransactionMode { ) }) } + + fn record_local_write(&mut self, db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { + if !self.had_writes { + db.exec_safe(formatcp!("INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) VALUES('$local', 0, {MAX_OP_ID})"))?; + self.had_writes = true; + } + + Ok(()) + } } /// A variant of `Option.get_or_insert` that handles insertions returning errors. @@ -324,12 +314,13 @@ extern "C" fn begin(vtab: *mut sqlite::vtab) -> c_int { extern "C" fn commit(vtab: *mut sqlite::vtab) -> c_int { let tab = unsafe { &mut *(vtab.cast::()) }; - vtab_result(vtab, tab.end_transaction()) + tab.end_transaction(); + ResultCode::OK as c_int } extern "C" fn rollback(vtab: *mut sqlite::vtab) -> c_int { let tab = unsafe { &mut *(vtab.cast::()) }; - tab.clear_transaction_state(); + tab.end_transaction(); // ps_tx will be rolled back automatically ResultCode::OK as c_int }