diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 3839d97..a4bc4d1 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -17,7 +17,7 @@ sqlite_nostd = { workspace=true } bytes = { version = "1.4", default-features = false } num-traits = { version = "0.2.15", default-features = false } num-derive = "0.3" -serde_json = { version = "1.0", default-features = false, features = ["alloc"] } +serde_json = { version = "1.0", default-features = false, features = ["alloc", "raw_value"] } serde = { version = "1.0", default-features = false, features = ["alloc", "derive", "rc"] } const_format = "0.2.34" futures-lite = { version = "2.6.0", default-features = false, features = ["alloc"] } diff --git a/crates/core/src/crud_vtab.rs b/crates/core/src/crud_vtab.rs index 2eb7225..5da65c1 100644 --- a/crates/core/src/crud_vtab.rs +++ b/crates/core/src/crud_vtab.rs @@ -3,23 +3,33 @@ extern crate alloc; use alloc::boxed::Box; use alloc::string::String; use const_format::formatcp; -use core::ffi::{c_char, c_int, c_void}; +use core::ffi::{c_char, c_int, c_void, CStr}; +use core::ptr::null_mut; +use serde::Serialize; +use serde_json::value::RawValue; use sqlite::{Connection, ResultCode, Value}; -use sqlite_nostd as sqlite; use sqlite_nostd::ManagedStmt; -use sqlite_nostd::ResultCode::NULL; +use sqlite_nostd::{self as sqlite, ColumnType}; use crate::error::SQLiteError; use crate::ext::SafeManagedStmt; use crate::schema::TableInfoFlags; +use crate::util::MAX_OP_ID; use crate::vtab_util::*; +const MANUAL_NAME: &CStr = c"powersync_crud_"; +const SIMPLE_NAME: &CStr = c"powersync_crud"; + // Structure: // CREATE TABLE powersync_crud_(data TEXT, options INT HIDDEN); +// CREATE TABLE powersync_crud(op TEXT, id TEXT, type TEXT, data TEXT, old_values TEXT, metadata TEXT, options INT HIDDEN); // // This is a insert-only virtual table. It generates transaction ids in ps_tx, and inserts data in // ps_crud(tx_id, data). +// The second form (without the trailing underscore) takes the data to insert as individual +// components and constructs the data to insert into `ps_crud` internally. It will also update +// `ps_updated_rows` and the `$local` bucket. // // Using a virtual table like this allows us to hook into xBegin, xCommit and xRollback to automatically // increment transaction ids. These are only called when powersync_crud_ is used as part of a transaction, @@ -29,22 +39,201 @@ use crate::vtab_util::*; struct VirtualTable { base: sqlite::vtab, db: *mut sqlite::sqlite3, - current_tx: Option, - insert_statement: Option, + current_tx: Option, + is_simple: bool, +} + +struct ActiveCrudTransaction { + tx_id: i64, + mode: CrudTransactionMode, +} + +enum CrudTransactionMode { + Manual { + stmt: ManagedStmt, + }, + Simple { + stmt: ManagedStmt, + set_updated_rows: ManagedStmt, + update_local_bucket: ManagedStmt, + }, +} + +impl VirtualTable { + fn value_to_json<'a>(value: &'a *mut sqlite::value) -> Option<&'a RawValue> { + match value.value_type() { + ColumnType::Text => { + Some(unsafe { + // Safety: RawValue is a transparent type wrapping a str. We assume that it + // contains valid JSON. + core::mem::transmute::<&'a str, &'a RawValue>(value.text()) + }) + } + _ => None, + } + } + + fn handle_insert(&self, args: &[*mut sqlite::value]) -> Result<(), SQLiteError> { + let current_tx = self + .current_tx + .as_ref() + .ok_or_else(|| SQLiteError(ResultCode::MISUSE, Some(String::from("No tx_id"))))?; + + match ¤t_tx.mode { + CrudTransactionMode::Manual { stmt } => { + // Columns are (data TEXT, options INT HIDDEN) + let data = args[0].text(); + let flags = match args[1].value_type() { + sqlite_nostd::ColumnType::Null => TableInfoFlags::default(), + _ => TableInfoFlags(args[1].int() as u32), + }; + + stmt.bind_int64(1, current_tx.tx_id)?; + stmt.bind_text(2, data, sqlite::Destructor::STATIC)?; + stmt.bind_int(3, flags.0 as i32)?; + stmt.exec()?; + } + CrudTransactionMode::Simple { + stmt, + set_updated_rows, + update_local_bucket, + } => { + // Columns are (op TEXT, id TEXT, type TEXT, data TEXT, old_values TEXT, metadata TEXT, options INT HIDDEN) + let flags = match args[6].value_type() { + sqlite_nostd::ColumnType::Null => TableInfoFlags::default(), + _ => TableInfoFlags(args[6].int() as u32), + }; + let op = args[0].text(); + let id = args[1].text(); + let row_type = args[2].text(); + let metadata = args[5]; + let data = Self::value_to_json(&args[3]); + + if flags.ignore_empty_update() + && op == "PATCH" + && data.map(|r| r.get()) == Some("{}") + { + // Ignore this empty update + return Ok(()); + } + + #[derive(Serialize)] + struct CrudEntry<'a> { + op: &'a str, + id: &'a str, + #[serde(rename = "type")] + row_type: &'a str, + #[serde(skip_serializing_if = "Option::is_none")] + data: Option<&'a RawValue>, + #[serde(skip_serializing_if = "Option::is_none")] + old: Option<&'a RawValue>, + #[serde(skip_serializing_if = "Option::is_none")] + metadata: Option<&'a str>, + } + + // First, we insert into ps_crud like the manual vtab would too. We have to create + // the JSON out of the individual components for that. + stmt.bind_int64(1, current_tx.tx_id)?; + + let serialized = serde_json::to_string(&CrudEntry { + op, + id, + row_type, + data: data, + old: Self::value_to_json(&args[4]), + metadata: if metadata.value_type() == ColumnType::Text { + Some(metadata.text()) + } else { + None + }, + })?; + stmt.bind_text(2, &serialized, sqlite::Destructor::STATIC)?; + stmt.exec()?; + + // However, we also set ps_updated_rows and update the $local bucket + set_updated_rows.bind_text(1, row_type, sqlite::Destructor::STATIC)?; + set_updated_rows.bind_text(2, id, sqlite::Destructor::STATIC)?; + set_updated_rows.exec()?; + update_local_bucket.exec()?; + } + } + + Ok(()) + } + + fn begin(&mut self) -> Result<(), SQLiteError> { + let db = self.db; + + // language=SQLite + let statement = + db.prepare_v2("UPDATE ps_tx SET next_tx = next_tx + 1 WHERE id = 1 RETURNING next_tx")?; + let tx_id = if statement.step()? == ResultCode::ROW { + statement.column_int64(0) - 1 + } else { + return Err(SQLiteError::from(ResultCode::ABORT)); + }; + + self.current_tx = Some(ActiveCrudTransaction { + tx_id, + mode: if self.is_simple { + CrudTransactionMode::Simple { + // language=SQLite + stmt: db.prepare_v3("INSERT INTO ps_crud(tx_id, data) VALUES (?, ?)", 0)?, + // language=SQLite + set_updated_rows: db.prepare_v3( + "INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?, ?)", + 0, + )?, + update_local_bucket: db.prepare_v3(formatcp!("INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) VALUES('$local', 0, {MAX_OP_ID})"), 0)?, + } + } else { + const SQL: &str = formatcp!( + "\ +WITH insertion (tx_id, data) AS (VALUES (?1, ?2)) +INSERT INTO ps_crud(tx_id, data) +SELECT * FROM insertion WHERE (NOT (?3 & {})) OR data->>'op' != 'PATCH' OR data->'data' != '{{}}'; + ", + TableInfoFlags::IGNORE_EMPTY_UPDATE + ); + + let insert_statement = db.prepare_v3(SQL, 0)?; + CrudTransactionMode::Manual { + stmt: insert_statement, + } + }, + }); + + Ok(()) + } + + fn end_transaction(&mut self) { + self.current_tx = None; + } } extern "C" fn connect( db: *mut sqlite::sqlite3, _aux: *mut c_void, - _argc: c_int, - _argv: *const *const c_char, + argc: c_int, + argv: *const *const c_char, vtab: *mut *mut sqlite::vtab, _err: *mut *mut c_char, ) -> c_int { - if let Err(rc) = sqlite::declare_vtab( - db, - "CREATE TABLE powersync_crud_(data TEXT, options INT HIDDEN);", - ) { + let args = sqlite::args!(argc, argv); + let Some(name) = args.get(0) else { + return ResultCode::MISUSE as c_int; + }; + + let name = unsafe { CStr::from_ptr(*name) }; + let is_simple = name == SIMPLE_NAME; + + let sql = if is_simple { + "CREATE TABLE powersync_crud(op TEXT, id TEXT, type TEXT, data TEXT, old_values TEXT, metadata TEXT, options INT HIDDEN);" + } else { + "CREATE TABLE powersync_crud_(data TEXT, options INT HIDDEN);" + }; + + if let Err(rc) = sqlite::declare_vtab(db, sql) { return rc as c_int; } @@ -57,7 +246,7 @@ extern "C" fn connect( }, db, current_tx: None, - insert_statement: None, + is_simple, })); *vtab = tab.cast::(); let _ = sqlite::vtab_config(db, 0); @@ -72,81 +261,25 @@ extern "C" fn disconnect(vtab: *mut sqlite::vtab) -> c_int { ResultCode::OK as c_int } -fn begin_impl(tab: &mut VirtualTable) -> Result<(), SQLiteError> { - let db = tab.db; - - const SQL: &str = formatcp!( - "\ -WITH insertion (tx_id, data) AS (VALUES (?1, ?2)) -INSERT INTO ps_crud(tx_id, data) -SELECT * FROM insertion WHERE (NOT (?3 & {})) OR data->>'op' != 'PATCH' OR data->'data' != '{{}}'; - ", - TableInfoFlags::IGNORE_EMPTY_UPDATE - ); - - let insert_statement = db.prepare_v3(SQL, 0)?; - tab.insert_statement = Some(insert_statement); - - // language=SQLite - let statement = - db.prepare_v2("UPDATE ps_tx SET next_tx = next_tx + 1 WHERE id = 1 RETURNING next_tx")?; - if statement.step()? == ResultCode::ROW { - let tx_id = statement.column_int64(0) - 1; - tab.current_tx = Some(tx_id); - } else { - return Err(SQLiteError::from(ResultCode::ABORT)); - } - - Ok(()) -} - extern "C" fn begin(vtab: *mut sqlite::vtab) -> c_int { let tab = unsafe { &mut *(vtab.cast::()) }; - let result = begin_impl(tab); + let result = tab.begin(); vtab_result(vtab, result) } extern "C" fn commit(vtab: *mut sqlite::vtab) -> c_int { let tab = unsafe { &mut *(vtab.cast::()) }; - tab.current_tx = None; - tab.insert_statement = None; + tab.end_transaction(); ResultCode::OK as c_int } extern "C" fn rollback(vtab: *mut sqlite::vtab) -> c_int { let tab = unsafe { &mut *(vtab.cast::()) }; - tab.current_tx = None; - tab.insert_statement = None; + tab.end_transaction(); // ps_tx will be rolled back automatically ResultCode::OK as c_int } -fn insert_operation( - vtab: *mut sqlite::vtab, - data: &str, - flags: TableInfoFlags, -) -> Result<(), SQLiteError> { - let tab = unsafe { &mut *(vtab.cast::()) }; - if tab.current_tx.is_none() { - return Err(SQLiteError( - ResultCode::MISUSE, - Some(String::from("No tx_id")), - )); - } - let current_tx = tab.current_tx.unwrap(); - // language=SQLite - let statement = tab - .insert_statement - .as_ref() - .ok_or(SQLiteError::from(NULL))?; - statement.bind_int64(1, current_tx)?; - statement.bind_text(2, data, sqlite::Destructor::STATIC)?; - statement.bind_int(3, flags.0 as i32)?; - statement.exec()?; - - Ok(()) -} - extern "C" fn update( vtab: *mut sqlite::vtab, argc: c_int, @@ -162,12 +295,8 @@ extern "C" fn update( ResultCode::MISUSE as c_int } else if rowid.value_type() == sqlite::ColumnType::Null { // INSERT - let data = args[2].text(); - let flags = match args[3].value_type() { - sqlite_nostd::ColumnType::Null => TableInfoFlags::default(), - _ => TableInfoFlags(args[3].int() as u32), - }; - let result = insert_operation(vtab, data, flags); + let tab = unsafe { &*(vtab.cast::()) }; + let result = tab.handle_insert(&args[2..]); vtab_result(vtab, result) } else { // UPDATE - not supported @@ -207,7 +336,20 @@ static MODULE: sqlite_nostd::module = sqlite_nostd::module { }; pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { - db.create_module_v2("powersync_crud_", &MODULE, None, None)?; + sqlite::convert_rc(sqlite::create_module_v2( + db, + SIMPLE_NAME.as_ptr(), + &MODULE, + null_mut(), + None, + ))?; + sqlite::convert_rc(sqlite::create_module_v2( + db, + MANUAL_NAME.as_ptr(), + &MODULE, + null_mut(), + None, + ))?; Ok(()) } diff --git a/crates/core/src/migrations.rs b/crates/core/src/migrations.rs index 8fd5357..676fcd8 100644 --- a/crates/core/src/migrations.rs +++ b/crates/core/src/migrations.rs @@ -12,7 +12,7 @@ use crate::error::{PSResult, SQLiteError}; use crate::fix_data::apply_v035_fix; use crate::sync::BucketPriority; -pub const LATEST_VERSION: i32 = 9; +pub const LATEST_VERSION: i32 = 10; pub fn powersync_migrate( ctx: *mut sqlite::context, @@ -370,5 +370,21 @@ json_object('sql', 'DELETE FROM ps_migration WHERE id >= 9') local_db.exec_safe(stmt).into_db_result(local_db)?; } + if current_version < 10 && target_version >= 10 { + // We want to re-create views and triggers because their definition at version 10 and above + // might reference vtabs that don't exist on older versions. These views will be re-created + // by applying the PowerSync user schema after these internal migrations finish. + local_db + .exec_safe( + "\ +INSERT INTO ps_migration(id, down_migrations) VALUES (10, json_array( + json_object('sql', 'SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated'''), + json_object('sql', 'DELETE FROM ps_migration WHERE id >= 10') +)); + ", + ) + .into_db_result(local_db)?; + } + Ok(()) } diff --git a/crates/core/src/schema/table_info.rs b/crates/core/src/schema/table_info.rs index 4224221..6f7b4ea 100644 --- a/crates/core/src/schema/table_info.rs +++ b/crates/core/src/schema/table_info.rs @@ -149,6 +149,10 @@ impl TableInfoFlags { self.0 & Self::INCLUDE_OLD_ONLY_WHEN_CHANGED != 0 } + pub const fn ignore_empty_update(self) -> bool { + self.0 & Self::IGNORE_EMPTY_UPDATE != 0 + } + const fn with_flag(self, flag: u32) -> Self { Self(self.0 | flag) } diff --git a/crates/core/src/views.rs b/crates/core/src/views.rs index 03cbdd8..feed3da 100644 --- a/crates/core/src/views.rs +++ b/crates/core/src/views.rs @@ -84,21 +84,23 @@ fn powersync_trigger_delete_sql_impl( let trigger_name = quote_identifier_prefixed("ps_view_delete_", view_name); let type_string = quote_string(name); - let old_fragment: Cow<'static, str> = match &table_info.diff_include_old { - Some(include_old) => { - let json = match include_old { - DiffIncludeOld::OnlyForColumns { columns } => { - json_object_fragment("OLD", &mut columns.iter().map(|c| c.as_str())) - } - DiffIncludeOld::ForAllColumns => { - json_object_fragment("OLD", &mut table_info.column_names()) - } - }?; - - format!(", 'old', {json}").into() - } - None => "".into(), - }; + let (old_data_name, old_data_value): (&'static str, Cow<'static, str>) = + match &table_info.diff_include_old { + Some(include_old) => { + let mut json = match include_old { + DiffIncludeOld::OnlyForColumns { columns } => { + json_object_fragment("OLD", &mut columns.iter().map(|c| c.as_str())) + } + DiffIncludeOld::ForAllColumns => { + json_object_fragment("OLD", &mut table_info.column_names()) + } + }?; + + json.insert(0, ','); + (",old_values", json.into()) + } + None => ("", "".into()), + }; return if !local_only && !insert_only { let mut trigger = format!( @@ -108,9 +110,7 @@ INSTEAD OF DELETE ON {quoted_name} FOR EACH ROW BEGIN DELETE FROM {internal_name} WHERE id = OLD.id; -INSERT INTO powersync_crud_(data) VALUES(json_object('op', 'DELETE', 'type', {type_string}, 'id', OLD.id{old_fragment})); -INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES({type_string}, OLD.id); -INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) VALUES('$local', 0, {MAX_OP_ID}); +INSERT INTO powersync_crud(op,id,type{old_data_name}) VALUES ('DELETE',OLD.id,{type_string}{old_data_value}); END" ); @@ -126,9 +126,7 @@ FOR EACH ROW WHEN NEW._deleted IS TRUE BEGIN DELETE FROM {internal_name} WHERE id = NEW.id; -INSERT INTO powersync_crud_(data) VALUES(json_object('op', 'DELETE', 'type', {type_string}, 'id', NEW.id{old_fragment}, 'metadata', NEW._metadata)); -INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES({type_string}, NEW.id); -INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) VALUES('$local', 0, {MAX_OP_ID}); +INSERT INTO powersync_crud(op,id,type,metadata{old_data_name}) VALUES ('DELETE',OLD.id,{type_string},NEW._metadata{old_data_value}); END" ).expect("writing to string should be infallible"); } @@ -178,10 +176,10 @@ fn powersync_trigger_insert_sql_impl( let json_fragment = json_object_fragment("NEW", &mut table_info.column_names())?; - let metadata_fragment = if table_info.flags.include_metadata() { - ", 'metadata', NEW._metadata" + let (metadata_key, metadata_value) = if table_info.flags.include_metadata() { + (",metadata", ",NEW._metadata") } else { - "" + ("", "") }; return if !local_only && !insert_only { @@ -196,12 +194,9 @@ fn powersync_trigger_insert_sql_impl( WHEN (typeof(NEW.id) != 'text') THEN RAISE (FAIL, 'id should be text') END; - INSERT INTO {internal_name} - SELECT NEW.id, {json_fragment}; - INSERT INTO powersync_crud_(data) VALUES(json_object('op', 'PUT', 'type', {:}, 'id', NEW.id, 'data', json(powersync_diff('{{}}', {:})){metadata_fragment})); - INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES({type_string}, NEW.id); - INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) VALUES('$local', 0, {MAX_OP_ID}); - END", type_string, json_fragment); + INSERT INTO {internal_name} SELECT NEW.id, {json_fragment}; + INSERT INTO powersync_crud(op,id,type,data{metadata_key}) VALUES ('PUT',NEW.id,{type_string},json(powersync_diff('{{}}', {:})){metadata_value}); + END", json_fragment); Ok(trigger) } else if local_only { let trigger = format!( @@ -215,6 +210,8 @@ fn powersync_trigger_insert_sql_impl( ); Ok(trigger) } else if insert_only { + // This is using the manual powersync_crud_ instead of powersync_crud because insert-only + // writes shouldn't prevent us from receiving new data. let trigger = format!("\ CREATE TRIGGER {trigger_name} INSTEAD OF INSERT ON {quoted_name} @@ -282,15 +279,15 @@ fn powersync_trigger_update_sql_impl( } } - let old_fragment: Cow<'static, str> = match old_values_fragment { - Some(f) => format!(", 'old', {f}").into(), - None => "".into(), + let (old_key, old_value): (&'static str, Cow<'static, str>) = match old_values_fragment { + Some(f) => (",old_values", format!(",{f}").into()), + None => ("", "".into()), }; - let metadata_fragment = if table_info.flags.include_metadata() { - ", 'metadata', NEW._metadata" + let (metadata_key, metadata_value) = if table_info.flags.include_metadata() { + (",metadata", ",NEW._metadata") } else { - "" + ("", "") }; return if !local_only && !insert_only { @@ -316,10 +313,8 @@ BEGIN UPDATE {internal_name} SET data = {json_fragment_new} WHERE id = NEW.id; - INSERT INTO powersync_crud_(data, options) VALUES(json_object('op', 'PATCH', 'type', {:}, 'id', NEW.id, 'data', json(powersync_diff({:}, {:})){:}{:}), {flags}); - INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES({type_string}, NEW.id); - INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) VALUES('$local', 0, {MAX_OP_ID}); -END", type_string, json_fragment_old, json_fragment_new, old_fragment, metadata_fragment); + INSERT INTO powersync_crud(op,type,id,data,options{old_key}{metadata_key}) VALUES ('PATCH',{type_string},NEW.id,json(powersync_diff({:}, {:})),{flags}{old_value}{metadata_value}); +END", json_fragment_old, json_fragment_new); Ok(trigger) } else if local_only { debug_assert!(!table_info.flags.include_metadata()); diff --git a/dart/test/crud_test.dart b/dart/test/crud_test.dart index 8a7991d..8c50c2a 100644 --- a/dart/test/crud_test.dart +++ b/dart/test/crud_test.dart @@ -227,6 +227,104 @@ void main() { }); } + group('crud vtab', () { + setUp(() { + db.select('select powersync_init()'); + }); + + group('simple', () { + test('can insert', () { + db.execute( + 'INSERT INTO powersync_crud (op, id, type, data) VALUES (?, ?, ?, ?)', + [ + 'PUT', + 'foo', + 'users', + json.encode({'my': 'value'}) + ]); + + final [row] = db.select('SELECT * FROM ps_crud'); + expect(row, { + 'id': 1, + 'tx_id': 1, + 'data': + '{"op":"PUT","id":"foo","type":"users","data":{"my":"value"}}', + }); + }); + + test('updates local bucket and updated rows', () { + db.execute( + 'INSERT INTO powersync_crud (op, id, type, data) VALUES (?, ?, ?, ?)', + [ + 'PUT', + 'foo', + 'users', + json.encode({'my': 'value'}) + ]); + + expect(db.select('SELECT * FROM ps_updated_rows'), [ + {'row_type': 'users', 'row_id': 'foo'} + ]); + expect(db.select('SELECT * FROM ps_buckets'), [ + allOf( + containsPair('name', r'$local'), + containsPair('target_op', 9223372036854775807), + ) + ]); + }); + + test('does not require data', () { + db.execute( + 'INSERT INTO powersync_crud (op, id, type) VALUES (?, ?, ?)', [ + 'DELETE', + 'foo', + 'users', + ]); + + final [row] = db.select('SELECT * FROM ps_crud'); + expect(row, { + 'id': 1, + 'tx_id': 1, + 'data': '{"op":"DELETE","id":"foo","type":"users"}', + }); + }); + + test('can insert metadata', () { + db.execute( + 'INSERT INTO powersync_crud (op, id, type, metadata) VALUES (?, ?, ?, ?)', + ['DELETE', 'foo', 'users', 'my metadata']); + + final [row] = db.select('SELECT * FROM ps_crud'); + expect(row, { + 'id': 1, + 'tx_id': 1, + 'data': + '{"op":"DELETE","id":"foo","type":"users","metadata":"my metadata"}', + }); + }); + + test('can insert old data', () { + db.execute( + 'INSERT INTO powersync_crud (op, id, type, data, old_values) VALUES (?, ?, ?, ?, ?)', + [ + 'PUT', + 'foo', + 'users', + json.encode({'my': 'value'}), + json.encode({'previous': 'value'}) + ]); + + final [row] = db.select('SELECT * FROM ps_crud'); + expect(row, { + 'id': 1, + 'tx_id': 1, + 'data': + '{"op":"PUT","id":"foo","type":"users","data":{"my":"value"},"old":{"previous":"value"}}', + }); + }); + }); + }); + group('tracks previous values', () { void createTable([Map options = const {}]) { final tableSchema = { diff --git a/dart/test/migration_test.dart b/dart/test/migration_test.dart index a149ada..b18d166 100644 --- a/dart/test/migration_test.dart +++ b/dart/test/migration_test.dart @@ -119,7 +119,7 @@ void main() { final schema = getSchema(db); final expected = - '${fixtures.finalState.replaceAll(RegExp(r';INSERT INTO ps_migration.*'), '').trim()}\n${fixtures.schema5.trim()}'; + '${fixtures.finalState.replaceAll(RegExp(r';INSERT INTO ps_migration.*'), '').trim()}\n${fixtures.currentDeveloperSchema.trim()}'; expect(schema, equals(expected)); }); diff --git a/dart/test/utils/migration_fixtures.dart b/dart/test/utils/migration_fixtures.dart index 283885f..ce8b2b8 100644 --- a/dart/test/utils/migration_fixtures.dart +++ b/dart/test/utils/migration_fixtures.dart @@ -1,5 +1,5 @@ /// The current database version -const databaseVersion = 9; +const databaseVersion = 10; /// This is the base database state that we expect at various schema versions. /// Generated by loading the specific library version, and exporting the schema. @@ -307,6 +307,53 @@ const expectedState = { ;INSERT INTO ps_migration(id, down_migrations) VALUES(7, '[{"sql":"INSERT OR REPLACE INTO ps_kv(key, value) SELECT ''last_synced_at'', last_synced_at FROM ps_sync_state WHERE priority = 2147483647"},{"sql":"DROP TABLE ps_sync_state"},{"sql":"DELETE FROM ps_migration WHERE id >= 7"}]') ;INSERT INTO ps_migration(id, down_migrations) VALUES(8, '[{"sql":"ALTER TABLE ps_sync_state RENAME TO ps_sync_state_new"},{"sql":"CREATE TABLE ps_sync_state (\n priority INTEGER NOT NULL,\n last_synced_at TEXT NOT NULL\n) STRICT"},{"sql":"INSERT INTO ps_sync_state SELECT * FROM ps_sync_state_new"},{"sql":"DROP TABLE ps_sync_state_new"},{"sql":"DELETE FROM ps_migration WHERE id >= 8"}]') ;INSERT INTO ps_migration(id, down_migrations) VALUES(9, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_at_last"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_since_last"},{"sql":"DELETE FROM ps_migration WHERE id >= 9"}]') +''', + 10: r''' +;CREATE TABLE ps_buckets( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL, + last_applied_op INTEGER NOT NULL DEFAULT 0, + last_op INTEGER NOT NULL DEFAULT 0, + target_op INTEGER NOT NULL DEFAULT 0, + add_checksum INTEGER NOT NULL DEFAULT 0, + op_checksum INTEGER NOT NULL DEFAULT 0, + pending_delete INTEGER NOT NULL DEFAULT 0 +, count_at_last INTEGER NOT NULL DEFAULT 0, count_since_last INTEGER NOT NULL DEFAULT 0) STRICT +;CREATE TABLE ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT, tx_id INTEGER) +;CREATE TABLE ps_kv(key TEXT PRIMARY KEY NOT NULL, value BLOB) +;CREATE TABLE ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT) +;CREATE TABLE ps_oplog( + bucket INTEGER NOT NULL, + op_id INTEGER NOT NULL, + row_type TEXT, + row_id TEXT, + key TEXT, + data TEXT, + hash INTEGER NOT NULL) STRICT +;CREATE TABLE ps_sync_state ( + priority INTEGER NOT NULL PRIMARY KEY, + last_synced_at TEXT NOT NULL +) STRICT +;CREATE TABLE ps_tx(id INTEGER PRIMARY KEY NOT NULL, current_tx INTEGER, next_tx INTEGER) +;CREATE TABLE ps_untyped(type TEXT NOT NULL, id TEXT NOT NULL, data TEXT, PRIMARY KEY (type, id)) +;CREATE TABLE ps_updated_rows( + row_type TEXT, + row_id TEXT, + PRIMARY KEY(row_type, row_id)) STRICT, WITHOUT ROWID +;CREATE UNIQUE INDEX ps_buckets_name ON ps_buckets (name) +;CREATE INDEX ps_oplog_key ON ps_oplog (bucket, key) +;CREATE INDEX ps_oplog_opid ON ps_oplog (bucket, op_id) +;CREATE INDEX ps_oplog_row ON ps_oplog (row_type, row_id) +;INSERT INTO ps_migration(id, down_migrations) VALUES(1, null) +;INSERT INTO ps_migration(id, down_migrations) VALUES(2, '[{"sql":"DELETE FROM ps_migration WHERE id >= 2","params":[]},{"sql":"DROP TABLE ps_tx","params":[]},{"sql":"ALTER TABLE ps_crud DROP COLUMN tx_id","params":[]}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(3, '[{"sql":"DELETE FROM ps_migration WHERE id >= 3"},{"sql":"DROP TABLE ps_kv"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(5, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_5"},{"sql":"ALTER TABLE ps_oplog RENAME TO ps_oplog_5"},{"sql":"CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)"},{"sql":"INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5"},{"sql":"CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)"},{"sql":"CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0"},{"sql":"CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)"},{"sql":"CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket"},{"sql":"DROP TABLE ps_oplog_5"},{"sql":"DROP TABLE ps_buckets_5"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r"},{"sql":"INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)"},{"sql":"DROP TABLE ps_updated_rows"},{"sql":"DELETE FROM ps_migration WHERE id >= 5"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(6, '[{"sql":"DELETE FROM ps_migration WHERE id >= 6"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(7, '[{"sql":"INSERT OR REPLACE INTO ps_kv(key, value) SELECT ''last_synced_at'', last_synced_at FROM ps_sync_state WHERE priority = 2147483647"},{"sql":"DROP TABLE ps_sync_state"},{"sql":"DELETE FROM ps_migration WHERE id >= 7"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(8, '[{"sql":"ALTER TABLE ps_sync_state RENAME TO ps_sync_state_new"},{"sql":"CREATE TABLE ps_sync_state (\n priority INTEGER NOT NULL,\n last_synced_at TEXT NOT NULL\n) STRICT"},{"sql":"INSERT INTO ps_sync_state SELECT * FROM ps_sync_state_new"},{"sql":"DROP TABLE ps_sync_state_new"},{"sql":"DELETE FROM ps_migration WHERE id >= 8"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(9, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_at_last"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_since_last"},{"sql":"DELETE FROM ps_migration WHERE id >= 9"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(10, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"DELETE FROM ps_migration WHERE id >= 10"}]') ''', }; @@ -398,6 +445,17 @@ const data1 = { (2, 3, 'lists', 'l1', '', '{}', 3) ;INSERT INTO ps_updated_rows(row_type, row_id) VALUES ('lists', 'l2') +''', + 10: r''' +;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last) VALUES + (1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0), + (2, 'b2', 0, 0, 0, 1005, 3, 0, 0, 0) +;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES + (1, 1, 'todos', 't1', '', '{}', 100), + (1, 2, 'todos', 't2', '', '{}', 20), + (2, 3, 'lists', 'l1', '', '{}', 3) +;INSERT INTO ps_updated_rows(row_type, row_id) VALUES + ('lists', 'l2') ''' }; @@ -442,6 +500,7 @@ final dataDown1 = { 6: data1[5]!, 7: data1[5]!, 8: data1[5]!, + 9: data1[9]!, }; final finalData1 = data1[databaseVersion]!; @@ -527,9 +586,7 @@ INSTEAD OF DELETE ON "lists" FOR EACH ROW BEGIN DELETE FROM "ps_data__lists" WHERE id = OLD.id; -INSERT INTO powersync_crud_(data) VALUES(json_object('op', 'DELETE', 'type', 'lists', 'id', OLD.id)); -INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES('lists', OLD.id); -INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) VALUES('$local', 0, 9223372036854775807); +INSERT INTO powersync_crud(op,id,type) VALUES ('DELETE',OLD.id,'lists'); END ;CREATE TRIGGER "ps_view_insert_lists" INSTEAD OF INSERT ON "lists" @@ -541,11 +598,46 @@ END WHEN (typeof(NEW.id) != 'text') THEN RAISE (FAIL, 'id should be text') END; - INSERT INTO "ps_data__lists" - SELECT NEW.id, json_object('description', NEW."description"); - INSERT INTO powersync_crud_(data) VALUES(json_object('op', 'PUT', 'type', 'lists', 'id', NEW.id, 'data', json(powersync_diff('{}', json_object('description', NEW."description"))))); - INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES('lists', NEW.id); - INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) VALUES('$local', 0, 9223372036854775807); + INSERT INTO "ps_data__lists" SELECT NEW.id, json_object('description', NEW."description"); + INSERT INTO powersync_crud(op,id,type,data) VALUES ('PUT',NEW.id,'lists',json(powersync_diff('{}', json_object('description', NEW."description")))); + END +;CREATE TRIGGER "ps_view_update_lists" +INSTEAD OF UPDATE ON "lists" +FOR EACH ROW +BEGIN + SELECT CASE + WHEN (OLD.id != NEW.id) + THEN RAISE (FAIL, 'Cannot update id') + END; + UPDATE "ps_data__lists" + SET data = json_object('description', NEW."description") + WHERE id = NEW.id; + INSERT INTO powersync_crud(op,type,id,data,options) VALUES ('PATCH','lists',NEW.id,json(powersync_diff(json_object('description', OLD."description"), json_object('description', NEW."description"))),0); +END +'''; + +const currentDeveloperSchema = r''' +;CREATE TABLE "ps_data__lists"(id TEXT PRIMARY KEY NOT NULL, data TEXT) +;CREATE VIEW "lists"("id", "description") AS SELECT id, CAST(json_extract(data, '$.description') as TEXT) FROM "ps_data__lists" -- powersync-auto-generated +;CREATE TRIGGER "ps_view_delete_lists" +INSTEAD OF DELETE ON "lists" +FOR EACH ROW +BEGIN +DELETE FROM "ps_data__lists" WHERE id = OLD.id; +INSERT INTO powersync_crud(op,id,type) VALUES ('DELETE',OLD.id,'lists'); +END +;CREATE TRIGGER "ps_view_insert_lists" + INSTEAD OF INSERT ON "lists" + FOR EACH ROW + BEGIN + SELECT CASE + WHEN (NEW.id IS NULL) + THEN RAISE (FAIL, 'id is required') + WHEN (typeof(NEW.id) != 'text') + THEN RAISE (FAIL, 'id should be text') + END; + INSERT INTO "ps_data__lists" SELECT NEW.id, json_object('description', NEW."description"); + INSERT INTO powersync_crud(op,id,type,data) VALUES ('PUT',NEW.id,'lists',json(powersync_diff('{}', json_object('description', NEW."description")))); END ;CREATE TRIGGER "ps_view_update_lists" INSTEAD OF UPDATE ON "lists" @@ -558,8 +650,6 @@ BEGIN UPDATE "ps_data__lists" SET data = json_object('description', NEW."description") WHERE id = NEW.id; - INSERT INTO powersync_crud_(data, options) VALUES(json_object('op', 'PATCH', 'type', 'lists', 'id', NEW.id, 'data', json(powersync_diff(json_object('description', OLD."description"), json_object('description', NEW."description")))), 0); - INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES('lists', NEW.id); - INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) VALUES('$local', 0, 9223372036854775807); + INSERT INTO powersync_crud(op,type,id,data,options) VALUES ('PATCH','lists',NEW.id,json(powersync_diff(json_object('description', OLD."description"), json_object('description', NEW."description"))),0); END ''';