diff --git a/crates/core/src/crud_vtab.rs b/crates/core/src/crud_vtab.rs index 5da65c16..d5363901 100644 --- a/crates/core/src/crud_vtab.rs +++ b/crates/core/src/crud_vtab.rs @@ -49,14 +49,20 @@ struct ActiveCrudTransaction { } enum CrudTransactionMode { - Manual { - stmt: ManagedStmt, - }, - Simple { - stmt: ManagedStmt, - set_updated_rows: ManagedStmt, - update_local_bucket: ManagedStmt, - }, + Manual(ManualCrudTransactionMode), + Simple(SimpleCrudTransactionMode), +} + +#[derive(Default)] +struct ManualCrudTransactionMode { + stmt: Option, +} + +#[derive(Default)] +struct SimpleCrudTransactionMode { + stmt: Option, + set_updated_rows: Option, + had_writes: bool, } impl VirtualTable { @@ -73,14 +79,15 @@ impl VirtualTable { } } - fn handle_insert(&self, args: &[*mut sqlite::value]) -> Result<(), SQLiteError> { + fn handle_insert(&mut self, args: &[*mut sqlite::value]) -> Result<(), SQLiteError> { let current_tx = self .current_tx - .as_ref() + .as_mut() .ok_or_else(|| SQLiteError(ResultCode::MISUSE, Some(String::from("No tx_id"))))?; + let db = self.db; - match ¤t_tx.mode { - CrudTransactionMode::Manual { stmt } => { + match &mut current_tx.mode { + CrudTransactionMode::Manual(manual) => { // Columns are (data TEXT, options INT HIDDEN) let data = args[0].text(); let flags = match args[1].value_type() { @@ -88,16 +95,13 @@ impl VirtualTable { _ => TableInfoFlags(args[1].int() as u32), }; + let stmt = manual.raw_crud_statement(db)?; stmt.bind_int64(1, current_tx.tx_id)?; stmt.bind_text(2, data, sqlite::Destructor::STATIC)?; stmt.bind_int(3, flags.0 as i32)?; stmt.exec()?; } - CrudTransactionMode::Simple { - stmt, - set_updated_rows, - update_local_bucket, - } => { + CrudTransactionMode::Simple(simple) => { // Columns are (op TEXT, id TEXT, type TEXT, data TEXT, old_values TEXT, metadata TEXT, options INT HIDDEN) let flags = match args[6].value_type() { sqlite_nostd::ColumnType::Null => TableInfoFlags::default(), @@ -133,6 +137,7 @@ impl VirtualTable { // First, we insert into ps_crud like the manual vtab would too. We have to create // the JSON out of the individual components for that. + let stmt = simple.raw_crud_statement(db)?; stmt.bind_int64(1, current_tx.tx_id)?; let serialized = serde_json::to_string(&CrudEntry { @@ -151,10 +156,11 @@ impl VirtualTable { stmt.exec()?; // However, we also set ps_updated_rows and update the $local bucket + let set_updated_rows = simple.set_updated_rows_statement(db)?; set_updated_rows.bind_text(1, row_type, sqlite::Destructor::STATIC)?; set_updated_rows.bind_text(2, id, sqlite::Destructor::STATIC)?; set_updated_rows.exec()?; - update_local_bucket.exec()?; + simple.record_local_write(db)?; } } @@ -176,39 +182,78 @@ impl VirtualTable { self.current_tx = Some(ActiveCrudTransaction { tx_id, mode: if self.is_simple { - CrudTransactionMode::Simple { - // language=SQLite - stmt: db.prepare_v3("INSERT INTO ps_crud(tx_id, data) VALUES (?, ?)", 0)?, - // language=SQLite - set_updated_rows: db.prepare_v3( - "INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?, ?)", - 0, - )?, - update_local_bucket: db.prepare_v3(formatcp!("INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) VALUES('$local', 0, {MAX_OP_ID})"), 0)?, - } + CrudTransactionMode::Simple(Default::default()) } else { - const SQL: &str = formatcp!( - "\ + CrudTransactionMode::Manual(Default::default()) + }, + }); + + Ok(()) + } + + fn end_transaction(&mut self) { + self.current_tx = None; + } +} + +impl ManualCrudTransactionMode { + fn raw_crud_statement(&mut self, db: *mut sqlite::sqlite3) -> Result<&ManagedStmt, ResultCode> { + prepare_lazy(&mut self.stmt, || { + const SQL: &str = formatcp!( + "\ WITH insertion (tx_id, data) AS (VALUES (?1, ?2)) INSERT INTO ps_crud(tx_id, data) SELECT * FROM insertion WHERE (NOT (?3 & {})) OR data->>'op' != 'PATCH' OR data->'data' != '{{}}'; ", - TableInfoFlags::IGNORE_EMPTY_UPDATE - ); + TableInfoFlags::IGNORE_EMPTY_UPDATE + ); - let insert_statement = db.prepare_v3(SQL, 0)?; - CrudTransactionMode::Manual { - stmt: insert_statement, - } - }, - }); + db.prepare_v3(SQL, 0) + }) + } +} + +impl SimpleCrudTransactionMode { + fn raw_crud_statement(&mut self, db: *mut sqlite::sqlite3) -> Result<&ManagedStmt, ResultCode> { + prepare_lazy(&mut self.stmt, || { + // language=SQLite + db.prepare_v3("INSERT INTO ps_crud(tx_id, data) VALUES (?, ?)", 0) + }) + } + + fn set_updated_rows_statement( + &mut self, + db: *mut sqlite::sqlite3, + ) -> Result<&ManagedStmt, ResultCode> { + prepare_lazy(&mut self.set_updated_rows, || { + // language=SQLite + db.prepare_v3( + "INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?, ?)", + 0, + ) + }) + } + + fn record_local_write(&mut self, db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { + if !self.had_writes { + db.exec_safe(formatcp!("INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) VALUES('$local', 0, {MAX_OP_ID})"))?; + self.had_writes = true; + } Ok(()) } +} - fn end_transaction(&mut self) { - self.current_tx = None; +/// A variant of `Option.get_or_insert` that handles insertions returning errors. +fn prepare_lazy( + stmt: &mut Option, + prepare: impl FnOnce() -> Result, +) -> Result<&ManagedStmt, ResultCode> { + if let None = stmt { + *stmt = Some(prepare()?); } + + return Ok(unsafe { stmt.as_ref().unwrap_unchecked() }); } extern "C" fn connect( @@ -295,7 +340,7 @@ extern "C" fn update( ResultCode::MISUSE as c_int } else if rowid.value_type() == sqlite::ColumnType::Null { // INSERT - let tab = unsafe { &*(vtab.cast::()) }; + let tab = unsafe { &mut *(vtab.cast::()) }; let result = tab.handle_insert(&args[2..]); vtab_result(vtab, result) } else { diff --git a/dart/test/crud_test.dart b/dart/test/crud_test.dart index 8c50c2ad..558997df 100644 --- a/dart/test/crud_test.dart +++ b/dart/test/crud_test.dart @@ -322,6 +322,51 @@ void main() { '{"op":"PUT","id":"foo","type":"users","data":{"my":"value"},"old":{"previous":"value"}}', }); }); + + test('resets state after commit', () { + db.execute('BEGIN'); + db.execute( + 'INSERT INTO powersync_crud (op, id, type) VALUES (?, ?, ?)', [ + 'DELETE', + 'foo', + 'users', + ]); + db.execute('commit'); + + db.execute( + 'INSERT INTO powersync_crud (op, id, type) VALUES (?, ?, ?)', [ + 'DELETE', + 'foo', + 'users', + ]); + expect(db.select('SELECT * FROM ps_crud').map((r) => r['tx_id']), + [1, 2]); + }); + + test('resets state after rollback', () { + db.execute('BEGIN'); + db.execute( + 'INSERT INTO powersync_crud (op, id, type) VALUES (?, ?, ?)', [ + 'DELETE', + 'foo', + 'users', + ]); + db.execute('rollback'); + + db.execute( + 'INSERT INTO powersync_crud (op, id, type) VALUES (?, ?, ?)', [ + 'DELETE', + 'foo2', + 'users', + ]); + expect(db.select('SELECT * FROM ps_crud'), [ + { + 'id': 1, + 'data': '{"op":"DELETE","id":"foo2","type":"users"}', + 'tx_id': 1, + } + ]); + }); }); });