From d6ed25b91de410843a43a6286581bb77879fd283 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 3 Jun 2025 22:11:26 +0200 Subject: [PATCH 1/8] Allow using external tables --- crates/core/src/schema/mod.rs | 9 +- crates/core/src/schema/table_info.rs | 22 ++ crates/core/src/sync/interface.rs | 3 + crates/core/src/sync/storage_adapter.rs | 9 +- crates/core/src/sync/streaming_sync.rs | 32 ++- crates/core/src/sync_local.rs | 297 +++++++++++++++++++----- crates/core/src/util.rs | 4 - 7 files changed, 297 insertions(+), 79 deletions(-) diff --git a/crates/core/src/schema/mod.rs b/crates/core/src/schema/mod.rs index 96fb732..a4e7309 100644 --- a/crates/core/src/schema/mod.rs +++ b/crates/core/src/schema/mod.rs @@ -5,11 +5,14 @@ use alloc::vec::Vec; use serde::Deserialize; use sqlite::ResultCode; use sqlite_nostd as sqlite; -pub use table_info::{DiffIncludeOld, Table, TableInfoFlags}; +pub use table_info::{ + DiffIncludeOld, PendingStatement, PendingStatementValue, RawTableDefinition, Table, + TableInfoFlags, +}; -#[derive(Deserialize)] +#[derive(Deserialize, Default)] pub struct Schema { - tables: Vec, + pub tables: Vec, } pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { diff --git a/crates/core/src/schema/table_info.rs b/crates/core/src/schema/table_info.rs index 6f7b4ea..4da8d41 100644 --- a/crates/core/src/schema/table_info.rs +++ b/crates/core/src/schema/table_info.rs @@ -17,6 +17,8 @@ pub struct Table { pub diff_include_old: Option, #[serde(flatten)] pub flags: TableInfoFlags, + #[serde(default)] + pub raw: Option, } impl Table { @@ -229,3 +231,23 @@ impl<'de> Deserialize<'de> for TableInfoFlags { ) } } + +#[derive(Deserialize)] +pub struct RawTableDefinition { + pub put: PendingStatement, + pub delete: PendingStatement, +} + +#[derive(Deserialize)] +pub struct PendingStatement { + pub sql: String, + /// This vec should contain an entry for each parameter in [sql]. + pub params: Vec, +} + +#[derive(Deserialize)] +pub enum PendingStatementValue { + Id, + Column(String), + // TODO: Stuff like a raw object of put data? +} diff --git a/crates/core/src/sync/interface.rs b/crates/core/src/sync/interface.rs index aca5eb9..64ae502 100644 --- a/crates/core/src/sync/interface.rs +++ b/crates/core/src/sync/interface.rs @@ -12,6 +12,7 @@ use sqlite_nostd::{self as sqlite, ColumnType}; use sqlite_nostd::{Connection, Context}; use crate::error::SQLiteError; +use crate::schema::Schema; use super::streaming_sync::SyncClient; use super::sync_status::DownloadSyncStatus; @@ -22,6 +23,8 @@ pub struct StartSyncStream { /// Bucket parameters to include in the request when opening a sync stream. #[serde(default)] pub parameters: Option>, + #[serde(default)] + pub schema: Schema, } /// A request sent from a client SDK to the [SyncClient] with a `powersync_control` invocation. diff --git a/crates/core/src/sync/storage_adapter.rs b/crates/core/src/sync/storage_adapter.rs index ed71b79..daf911f 100644 --- a/crates/core/src/sync/storage_adapter.rs +++ b/crates/core/src/sync/storage_adapter.rs @@ -9,6 +9,7 @@ use crate::{ error::SQLiteError, ext::SafeManagedStmt, operations::delete_bucket, + schema::Schema, sync::checkpoint::{validate_checkpoint, ChecksumMismatch}, sync_local::{PartialSyncOperation, SyncOperation}, }; @@ -145,6 +146,7 @@ impl StorageAdapter { &self, checkpoint: &OwnedCheckpoint, priority: Option, + schema: &Schema, ) -> Result { let mismatched_checksums = validate_checkpoint(checkpoint.buckets.values(), priority, self.db)?; @@ -201,14 +203,15 @@ impl StorageAdapter { // TODO: Avoid this serialization, it's currently used to bind JSON SQL parameters. let serialized_args = serde_json::to_string(&args)?; - SyncOperation::new( + let mut sync = SyncOperation::new( self.db, Some(PartialSyncOperation { priority, args: &serialized_args, }), - ) - .apply() + ); + sync.use_schema(schema); + sync.apply() } }?; diff --git a/crates/core/src/sync/streaming_sync.rs b/crates/core/src/sync/streaming_sync.rs index d5f2f51..c3351d5 100644 --- a/crates/core/src/sync/streaming_sync.rs +++ b/crates/core/src/sync/streaming_sync.rs @@ -14,7 +14,12 @@ use alloc::{ }; use futures_lite::FutureExt; -use crate::{bson, error::SQLiteError, kv::client_id, sync::checkpoint::OwnedBucketChecksum}; +use crate::{ + bson, + error::SQLiteError, + kv::client_id, + sync::{checkpoint::OwnedBucketChecksum, interface::StartSyncStream}, +}; use sqlite_nostd::{self as sqlite, ResultCode}; use super::{ @@ -52,7 +57,7 @@ impl SyncClient { SyncControlRequest::StartSyncStream(options) => { self.state.tear_down()?; - let mut handle = SyncIterationHandle::new(self.db, options.parameters)?; + let mut handle = SyncIterationHandle::new(self.db, options)?; let instructions = handle.initialize()?; self.state = ClientState::IterationActive(handle); @@ -120,13 +125,10 @@ struct SyncIterationHandle { impl SyncIterationHandle { /// Creates a new sync iteration in a pending state by preparing statements for /// [StorageAdapter] and setting up the initial downloading state for [StorageAdapter] . - fn new( - db: *mut sqlite::sqlite3, - parameters: Option>, - ) -> Result { + fn new(db: *mut sqlite::sqlite3, options: StartSyncStream) -> Result { let runner = StreamingSyncIteration { db, - parameters, + options, adapter: StorageAdapter::new(db)?, status: SyncStatusContainer::new(), }; @@ -191,7 +193,7 @@ impl<'a> ActiveEvent<'a> { struct StreamingSyncIteration { db: *mut sqlite::sqlite3, adapter: StorageAdapter, - parameters: Option>, + options: StartSyncStream, status: SyncStatusContainer, } @@ -244,7 +246,9 @@ impl StreamingSyncIteration { SyncEvent::BinaryLine { data } => bson::from_bytes(data)?, SyncEvent::UploadFinished => { if let Some(checkpoint) = validated_but_not_applied.take() { - let result = self.adapter.sync_local(&checkpoint, None)?; + let result = + self.adapter + .sync_local(&checkpoint, None, &self.options.schema)?; match result { SyncLocalResult::ChangesApplied => { @@ -320,7 +324,9 @@ impl StreamingSyncIteration { ), )); }; - let result = self.adapter.sync_local(target, None)?; + let result = self + .adapter + .sync_local(target, None, &self.options.schema)?; match result { SyncLocalResult::ChecksumFailure(checkpoint_result) => { @@ -363,7 +369,9 @@ impl StreamingSyncIteration { ), )); }; - let result = self.adapter.sync_local(target, Some(priority))?; + let result = + self.adapter + .sync_local(target, Some(priority), &self.options.schema)?; match result { SyncLocalResult::ChecksumFailure(checkpoint_result) => { @@ -459,7 +467,7 @@ impl StreamingSyncIteration { raw_data: true, binary_data: true, client_id: client_id(self.db)?, - parameters: self.parameters.take(), + parameters: self.options.parameters.take(), }; event diff --git a/crates/core/src/sync_local.rs b/crates/core/src/sync_local.rs index f884e88..c6bc70d 100644 --- a/crates/core/src/sync_local.rs +++ b/crates/core/src/sync_local.rs @@ -1,16 +1,17 @@ -use alloc::collections::BTreeSet; +use alloc::collections::btree_map::BTreeMap; use alloc::format; -use alloc::string::String; +use alloc::string::{String, ToString}; use alloc::vec::Vec; use serde::Deserialize; use crate::error::{PSResult, SQLiteError}; +use crate::schema::{PendingStatement, PendingStatementValue, RawTableDefinition, Schema}; use crate::sync::BucketPriority; use sqlite_nostd::{self as sqlite, Destructor, ManagedStmt, Value}; use sqlite_nostd::{ColumnType, Connection, ResultCode}; use crate::ext::SafeManagedStmt; -use crate::util::{internal_table_name, quote_internal_name}; +use crate::util::quote_internal_name; pub fn sync_local(db: *mut sqlite::sqlite3, data: &V) -> Result { let mut operation = SyncOperation::from_args(db, data)?; @@ -27,7 +28,7 @@ pub struct PartialSyncOperation<'a> { pub struct SyncOperation<'a> { db: *mut sqlite::sqlite3, - data_tables: BTreeSet, + schema: ParsedDatabaseSchema<'a>, partial: Option>, } @@ -63,11 +64,15 @@ impl<'a> SyncOperation<'a> { pub fn new(db: *mut sqlite::sqlite3, partial: Option>) -> Self { Self { db, - data_tables: BTreeSet::new(), + schema: ParsedDatabaseSchema::new(), partial, } } + pub fn use_schema(&mut self, schema: &'a Schema) { + self.schema.add_from_schema(schema); + } + fn can_apply_sync_changes(&self) -> Result { // Don't publish downloaded data until the upload queue is empty (except for downloaded data // in priority 0, which is published earlier). @@ -126,48 +131,61 @@ impl<'a> SyncOperation<'a> { let id = statement.column_text(1)?; let data = statement.column_text(2); - let table_name = internal_table_name(type_name); - - if self.data_tables.contains(&table_name) { - let quoted = quote_internal_name(type_name, false); - - // is_err() is essentially a NULL check here. - // NULL data means no PUT operations found, so we delete the row. - if data.is_err() { - // DELETE - if last_delete_table.as_deref() != Some("ed) { - // Prepare statement when the table changed - last_delete_statement = Some( - self.db - .prepare_v2(&format!("DELETE FROM {} WHERE id = ?", quoted)) - .into_db_result(self.db)?, - ); - last_delete_table = Some(quoted.clone()); + if let Some(known) = self.schema.tables.get_mut(type_name) { + if let Some(raw) = &mut known.raw { + match data { + Ok(data) => { + let stmt = raw.put_statement(self.db)?; + stmt.bind_for_put(id, data)?; + stmt.stmt.exec()?; + } + Err(_) => { + let stmt = raw.delete_statement(self.db)?; + stmt.bind_for_delete(id)?; + stmt.stmt.exec()?; + } } - let delete_statement = last_delete_statement.as_mut().unwrap(); - - delete_statement.reset()?; - delete_statement.bind_text(1, id, sqlite::Destructor::STATIC)?; - delete_statement.exec()?; } else { - // INSERT/UPDATE - if last_insert_table.as_deref() != Some("ed) { - // Prepare statement when the table changed - last_insert_statement = Some( - self.db - .prepare_v2(&format!( - "REPLACE INTO {}(id, data) VALUES(?, ?)", - quoted - )) - .into_db_result(self.db)?, - ); - last_insert_table = Some(quoted.clone()); + let quoted = quote_internal_name(type_name, false); + + // is_err() is essentially a NULL check here. + // NULL data means no PUT operations found, so we delete the row. + if data.is_err() { + // DELETE + if last_delete_table.as_deref() != Some("ed) { + // Prepare statement when the table changed + last_delete_statement = Some( + self.db + .prepare_v2(&format!("DELETE FROM {} WHERE id = ?", quoted)) + .into_db_result(self.db)?, + ); + last_delete_table = Some(quoted.clone()); + } + let delete_statement = last_delete_statement.as_mut().unwrap(); + + delete_statement.reset()?; + delete_statement.bind_text(1, id, sqlite::Destructor::STATIC)?; + delete_statement.exec()?; + } else { + // INSERT/UPDATE + if last_insert_table.as_deref() != Some("ed) { + // Prepare statement when the table changed + last_insert_statement = Some( + self.db + .prepare_v2(&format!( + "REPLACE INTO {}(id, data) VALUES(?, ?)", + quoted + )) + .into_db_result(self.db)?, + ); + last_insert_table = Some(quoted.clone()); + } + let insert_statement = last_insert_statement.as_mut().unwrap(); + insert_statement.reset()?; + insert_statement.bind_text(1, id, sqlite::Destructor::STATIC)?; + insert_statement.bind_text(2, data?, sqlite::Destructor::STATIC)?; + insert_statement.exec()?; } - let insert_statement = last_insert_statement.as_mut().unwrap(); - insert_statement.reset()?; - insert_statement.bind_text(1, id, sqlite::Destructor::STATIC)?; - insert_statement.bind_text(2, data?, sqlite::Destructor::STATIC)?; - insert_statement.exec()?; } } else { if data.is_err() { @@ -214,19 +232,7 @@ impl<'a> SyncOperation<'a> { } fn collect_tables(&mut self) -> Result<(), SQLiteError> { - // language=SQLite - let statement = self - .db - .prepare_v2( - "SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'", - ) - .into_db_result(self.db)?; - - while statement.step()? == ResultCode::ROW { - let name = statement.column_text(0)?; - self.data_tables.insert(String::from(name)); - } - Ok(()) + self.schema.add_from_db(self.db) } fn collect_full_operations(&self) -> Result { @@ -372,3 +378,180 @@ SELECT Ok(()) } } + +struct ParsedDatabaseSchema<'a> { + tables: BTreeMap>, +} + +impl<'a> ParsedDatabaseSchema<'a> { + fn new() -> Self { + Self { + tables: BTreeMap::new(), + } + } + + fn add_from_schema(&mut self, schema: &'a Schema) { + for table in &schema.tables { + if let Some(raw) = &table.raw { + self.tables + .insert(table.name.clone(), ParsedSchemaTable::raw(raw)); + } + } + } + + fn add_from_db(&mut self, db: *mut sqlite::sqlite3) -> Result<(), SQLiteError> { + // language=SQLite + let statement = db + .prepare_v2( + "SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'", + ) + .into_db_result(db)?; + + while statement.step()? == ResultCode::ROW { + let name = statement.column_text(0)?; + // Strip the ps_data__ prefix so that we can lookup tables by their sync protocol name. + let visible_name = name.get(9..).unwrap_or(name); + + // Tables which haven't been passed explicitly are assumed to not be raw tables. + self.tables + .insert(String::from(visible_name), ParsedSchemaTable::json_table()); + } + Ok(()) + } +} + +struct ParsedSchemaTable<'a> { + raw: Option>, +} + +struct RawTableWithCachedStatements<'a> { + definition: &'a RawTableDefinition, + cached_put: Option>, + cached_delete: Option>, +} + +impl<'a> RawTableWithCachedStatements<'a> { + fn put_statement( + &mut self, + db: *mut sqlite::sqlite3, + ) -> Result<&PreparedPendingStatement, SQLiteError> { + let cache_slot = &mut self.cached_put; + if let None = cache_slot { + let stmt = PreparedPendingStatement::prepare(db, &self.definition.put)?; + *cache_slot = Some(stmt); + } + + return Ok(cache_slot.as_ref().unwrap()); + } + + fn delete_statement( + &mut self, + db: *mut sqlite::sqlite3, + ) -> Result<&PreparedPendingStatement, SQLiteError> { + let cache_slot = &mut self.cached_delete; + if let None = cache_slot { + let stmt = PreparedPendingStatement::prepare(db, &self.definition.delete)?; + *cache_slot = Some(stmt); + } + + return Ok(cache_slot.as_ref().unwrap()); + } +} + +impl<'a> ParsedSchemaTable<'a> { + pub const fn json_table() -> Self { + Self { raw: None } + } + + pub fn raw(definition: &'a RawTableDefinition) -> Self { + Self { + raw: Some(RawTableWithCachedStatements { + definition, + cached_put: None, + cached_delete: None, + }), + } + } +} + +struct PreparedPendingStatement<'a> { + stmt: ManagedStmt, + params: &'a [PendingStatementValue], +} + +impl<'a> PreparedPendingStatement<'a> { + pub fn prepare( + db: *mut sqlite::sqlite3, + pending: &'a PendingStatement, + ) -> Result { + let stmt = db.prepare_v2(&pending.sql)?; + // TODO: Compare number of variables / other validity checks? + + Ok(Self { + stmt, + params: &pending.params, + }) + } + + pub fn bind_for_put(&self, id: &str, json_data: &str) -> Result<(), SQLiteError> { + use serde_json::Value; + + let parsed: Value = serde_json::from_str(json_data)?; + + for (i, source) in self.params.iter().enumerate() { + let i = (i + 1) as i32; + + match source { + PendingStatementValue::Id => { + self.stmt.bind_text(i, id, Destructor::STATIC)?; + } + PendingStatementValue::Column(column) => { + let parsed = parsed.as_object().ok_or_else(|| { + SQLiteError( + ResultCode::CONSTRAINT_DATATYPE, + Some("expected oplog data to be an object".to_string()), + ) + })?; + + match parsed.get(column) { + Some(Value::Bool(value)) => { + self.stmt.bind_int(i, if *value { 1 } else { 0 }) + } + Some(Value::Number(value)) => { + if let Some(value) = value.as_f64() { + // ??? there's no bind_double??? + self.stmt.bind_int64(i, value as i64) + } else if let Some(value) = value.as_u64() { + self.stmt.bind_int64(i, value as i64) + } else { + self.stmt.bind_int64(i, value.as_i64().unwrap()) + } + } + Some(Value::String(source)) => { + self.stmt.bind_text(i, &source, Destructor::STATIC) + } + _ => self.stmt.bind_null(i), + }?; + } + } + } + + Ok(()) + } + + pub fn bind_for_delete(&self, id: &str) -> Result<(), SQLiteError> { + for (i, source) in self.params.iter().enumerate() { + if let PendingStatementValue::Id = source { + self.stmt + .bind_text((i + 1) as i32, id, Destructor::STATIC)?; + } else { + return Err(SQLiteError( + ResultCode::MISUSE, + Some("Raw delete statement parameters must only reference id".to_string()), + )); + } + } + + Ok(()) + } +} diff --git a/crates/core/src/util.rs b/crates/core/src/util.rs index a9e0842..5e77768 100644 --- a/crates/core/src/util.rs +++ b/crates/core/src/util.rs @@ -32,10 +32,6 @@ pub fn quote_internal_name(name: &str, local_only: bool) -> String { } } -pub fn internal_table_name(name: &str) -> String { - return format!("ps_data__{}", name); -} - pub fn quote_identifier_prefixed(prefix: &str, name: &str) -> String { return format!("\"{:}{:}\"", prefix, name.replace("\"", "\"\"")); } From f28900a6b6b85495fa540ce85e407a677a405ab5 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 3 Jun 2025 22:42:42 +0200 Subject: [PATCH 2/8] Extremely basic support for raw tables --- crates/core/src/sync/storage_adapter.rs | 6 ++- crates/core/src/sync_local.rs | 10 ++-- dart/test/sync_test.dart | 70 ++++++++++++++++++++++++- 3 files changed, 77 insertions(+), 9 deletions(-) diff --git a/crates/core/src/sync/storage_adapter.rs b/crates/core/src/sync/storage_adapter.rs index daf911f..df42eb0 100644 --- a/crates/core/src/sync/storage_adapter.rs +++ b/crates/core/src/sync/storage_adapter.rs @@ -184,7 +184,11 @@ impl StorageAdapter { } let sync_result = match priority { - None => SyncOperation::new(self.db, None).apply(), + None => { + let mut sync = SyncOperation::new(self.db, None); + sync.use_schema(schema); + sync.apply() + } Some(priority) => { let args = PartialArgs { priority, diff --git a/crates/core/src/sync_local.rs b/crates/core/src/sync_local.rs index c6bc70d..d9bc0f7 100644 --- a/crates/core/src/sync_local.rs +++ b/crates/core/src/sync_local.rs @@ -136,7 +136,8 @@ impl<'a> SyncOperation<'a> { match data { Ok(data) => { let stmt = raw.put_statement(self.db)?; - stmt.bind_for_put(id, data)?; + let parsed: serde_json::Value = serde_json::from_str(data)?; + stmt.bind_for_put(id, &parsed)?; stmt.stmt.exec()?; } Err(_) => { @@ -493,11 +494,8 @@ impl<'a> PreparedPendingStatement<'a> { }) } - pub fn bind_for_put(&self, id: &str, json_data: &str) -> Result<(), SQLiteError> { + pub fn bind_for_put(&self, id: &str, json_data: &serde_json::Value) -> Result<(), SQLiteError> { use serde_json::Value; - - let parsed: Value = serde_json::from_str(json_data)?; - for (i, source) in self.params.iter().enumerate() { let i = (i + 1) as i32; @@ -506,7 +504,7 @@ impl<'a> PreparedPendingStatement<'a> { self.stmt.bind_text(i, id, Destructor::STATIC)?; } PendingStatementValue::Column(column) => { - let parsed = parsed.as_object().ok_or_else(|| { + let parsed = json_data.as_object().ok_or_else(|| { SQLiteError( ResultCode::CONSTRAINT_DATATYPE, Some("expected oplog data to be an object".to_string()), diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart index fe78666..acd20b9 100644 --- a/dart/test/sync_test.dart +++ b/dart/test/sync_test.dart @@ -92,7 +92,7 @@ void _syncTests({ List pushSyncData( String bucket, String opId, String rowId, Object op, Object? data, - {int checksum = 0}) { + {int checksum = 0, String objectType = 'items'}) { return syncLine({ 'data': { 'bucket': bucket, @@ -103,7 +103,7 @@ void _syncTests({ { 'op_id': opId, 'op': op, - 'object_type': 'items', + 'object_type': objectType, 'object_id': rowId, 'checksum': checksum, 'data': json.encode(data), @@ -676,6 +676,72 @@ void _syncTests({ expect(db.select('SELECT * FROM ps_buckets'), isEmpty); }); }); + + group('raw tables', () { + syncTest('smoke test', (_) { + db.execute( + 'CREATE TABLE users (id TEXT NOT NULL PRIMARY KEY, name TEXT NOT NULL) STRICT;'); + + invokeControl( + 'start', + json.encode({ + 'schema': { + 'tables': [ + { + 'name': 'users', + 'raw': { + 'put': { + 'sql': + 'INSERT OR REPLACE INTO users (id, name) VALUES (?, ?);', + 'params': [ + 'Id', + {'Column': 'name'} + ], + }, + 'delete': { + 'sql': 'DELETE FROM users WHERE id = ?', + 'params': ['Id'], + }, + }, + 'columns': [], + } + ], + }, + }), + ); + + // Insert + pushCheckpoint(buckets: [bucketDescription('a')]); + pushSyncData( + 'a', + '1', + 'my_user', + 'PUT', + {'name': 'First user'}, + objectType: 'users', + ); + pushCheckpointComplete(); + + final users = db.select('SELECT * FROM users;'); + expect(users, [ + {'id': 'my_user', 'name': 'First user'} + ]); + + // Delete + pushCheckpoint(buckets: [bucketDescription('a')]); + pushSyncData( + 'a', + '1', + 'my_user', + 'REMOVE', + null, + objectType: 'users', + ); + pushCheckpointComplete(); + + expect(db.select('SELECT * FROM users'), isEmpty); + }); + }); } const _schema = { From 227b58502975892025a18bf9e643c29162b4d38c Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 4 Jun 2025 20:27:58 +0200 Subject: [PATCH 3/8] Use separate array for raw tables --- crates/core/src/schema/mod.rs | 5 +++-- crates/core/src/schema/table_info.rs | 15 +++++++------- crates/core/src/sync_local.rs | 14 ++++++------- dart/test/schema_test.dart | 30 ++++++++++++++++++++++++++++ dart/test/sync_test.dart | 28 ++++++++++++-------------- 5 files changed, 59 insertions(+), 33 deletions(-) diff --git a/crates/core/src/schema/mod.rs b/crates/core/src/schema/mod.rs index a4e7309..cab6c0b 100644 --- a/crates/core/src/schema/mod.rs +++ b/crates/core/src/schema/mod.rs @@ -6,13 +6,14 @@ use serde::Deserialize; use sqlite::ResultCode; use sqlite_nostd as sqlite; pub use table_info::{ - DiffIncludeOld, PendingStatement, PendingStatementValue, RawTableDefinition, Table, - TableInfoFlags, + DiffIncludeOld, PendingStatement, PendingStatementValue, RawTable, Table, TableInfoFlags, }; #[derive(Deserialize, Default)] pub struct Schema { pub tables: Vec, + #[serde(default)] + pub raw_tables: Vec, } pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { diff --git a/crates/core/src/schema/table_info.rs b/crates/core/src/schema/table_info.rs index 4da8d41..03291c7 100644 --- a/crates/core/src/schema/table_info.rs +++ b/crates/core/src/schema/table_info.rs @@ -17,8 +17,13 @@ pub struct Table { pub diff_include_old: Option, #[serde(flatten)] pub flags: TableInfoFlags, - #[serde(default)] - pub raw: Option, +} + +#[derive(Deserialize)] +pub struct RawTable { + pub name: String, + pub put: PendingStatement, + pub delete: PendingStatement, } impl Table { @@ -232,12 +237,6 @@ impl<'de> Deserialize<'de> for TableInfoFlags { } } -#[derive(Deserialize)] -pub struct RawTableDefinition { - pub put: PendingStatement, - pub delete: PendingStatement, -} - #[derive(Deserialize)] pub struct PendingStatement { pub sql: String, diff --git a/crates/core/src/sync_local.rs b/crates/core/src/sync_local.rs index d9bc0f7..7497567 100644 --- a/crates/core/src/sync_local.rs +++ b/crates/core/src/sync_local.rs @@ -5,7 +5,7 @@ use alloc::vec::Vec; use serde::Deserialize; use crate::error::{PSResult, SQLiteError}; -use crate::schema::{PendingStatement, PendingStatementValue, RawTableDefinition, Schema}; +use crate::schema::{PendingStatement, PendingStatementValue, RawTable, Schema}; use crate::sync::BucketPriority; use sqlite_nostd::{self as sqlite, Destructor, ManagedStmt, Value}; use sqlite_nostd::{ColumnType, Connection, ResultCode}; @@ -392,11 +392,9 @@ impl<'a> ParsedDatabaseSchema<'a> { } fn add_from_schema(&mut self, schema: &'a Schema) { - for table in &schema.tables { - if let Some(raw) = &table.raw { - self.tables - .insert(table.name.clone(), ParsedSchemaTable::raw(raw)); - } + for raw in &schema.raw_tables { + self.tables + .insert(raw.name.clone(), ParsedSchemaTable::raw(raw)); } } @@ -426,7 +424,7 @@ struct ParsedSchemaTable<'a> { } struct RawTableWithCachedStatements<'a> { - definition: &'a RawTableDefinition, + definition: &'a RawTable, cached_put: Option>, cached_delete: Option>, } @@ -464,7 +462,7 @@ impl<'a> ParsedSchemaTable<'a> { Self { raw: None } } - pub fn raw(definition: &'a RawTableDefinition) -> Self { + pub fn raw(definition: &'a RawTable) -> Self { Self { raw: Some(RawTableWithCachedStatements { definition, diff --git a/dart/test/schema_test.dart b/dart/test/schema_test.dart index 18a4515..8d1646b 100644 --- a/dart/test/schema_test.dart +++ b/dart/test/schema_test.dart @@ -120,6 +120,36 @@ void main() { ); }); }); + + test('raw tables', () { + db.execute('SELECT powersync_replace_schema(?)', [ + json.encode({ + 'raw_tables': [ + { + 'name': 'users', + 'put': { + 'sql': 'INSERT OR REPLACE INTO users (id, name) VALUES (?, ?);', + 'params': [ + 'Id', + {'Column': 'name'} + ], + }, + 'delete': { + 'sql': 'DELETE FROM users WHERE id = ?', + 'params': ['Id'], + }, + } + ], + 'tables': [], + }) + ]); + + expect( + db.select( + "SELECT * FROM sqlite_schema WHERE type = 'table' AND name LIKE 'ps_data%'"), + isEmpty, + ); + }); }); } diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart index acd20b9..07fcf97 100644 --- a/dart/test/sync_test.dart +++ b/dart/test/sync_test.dart @@ -686,26 +686,24 @@ void _syncTests({ 'start', json.encode({ 'schema': { - 'tables': [ + 'raw_tables': [ { 'name': 'users', - 'raw': { - 'put': { - 'sql': - 'INSERT OR REPLACE INTO users (id, name) VALUES (?, ?);', - 'params': [ - 'Id', - {'Column': 'name'} - ], - }, - 'delete': { - 'sql': 'DELETE FROM users WHERE id = ?', - 'params': ['Id'], - }, + 'put': { + 'sql': + 'INSERT OR REPLACE INTO users (id, name) VALUES (?, ?);', + 'params': [ + 'Id', + {'Column': 'name'} + ], + }, + 'delete': { + 'sql': 'DELETE FROM users WHERE id = ?', + 'params': ['Id'], }, - 'columns': [], } ], + 'tables': [], }, }), ); From 7005f2a70787af52e02ceb0f976a1eb6dbd258d6 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 9 Jun 2025 18:01:07 +0200 Subject: [PATCH 4/8] Add function to determine whether local sync is active --- crates/core/src/lib.rs | 11 +++- crates/core/src/operations_vtab.rs | 24 ++++++-- crates/core/src/state.rs | 74 +++++++++++++++++++++++++ crates/core/src/sync/interface.rs | 6 +- crates/core/src/sync/mod.rs | 7 ++- crates/core/src/sync/storage_adapter.rs | 7 ++- crates/core/src/sync/streaming_sync.rs | 40 +++++++++---- crates/core/src/sync_local.rs | 27 +++++++-- dart/test/sync_test.dart | 41 ++++++++++++++ 9 files changed, 209 insertions(+), 28 deletions(-) create mode 100644 crates/core/src/state.rs diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 76edd45..5cb8327 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -9,9 +9,12 @@ extern crate alloc; use core::ffi::{c_char, c_int}; +use alloc::sync::Arc; use sqlite::ResultCode; use sqlite_nostd as sqlite; +use crate::state::DatabaseState; + mod bson; mod checkpoint; mod crud_vtab; @@ -26,6 +29,7 @@ mod migrations; mod operations; mod operations_vtab; mod schema; +mod state; mod sync; mod sync_local; mod util; @@ -53,6 +57,8 @@ pub extern "C" fn sqlite3_powersync_init( } fn init_extension(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { + let state = Arc::new(DatabaseState::new()); + crate::version::register(db)?; crate::views::register(db)?; crate::uuid::register(db)?; @@ -62,10 +68,11 @@ fn init_extension(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { crate::view_admin::register(db)?; crate::checkpoint::register(db)?; crate::kv::register(db)?; - sync::register(db)?; + crate::state::register(db, state.clone())?; + sync::register(db, state.clone())?; crate::schema::register(db)?; - crate::operations_vtab::register(db)?; + crate::operations_vtab::register(db, state.clone())?; crate::crud_vtab::register(db)?; Ok(()) diff --git a/crates/core/src/operations_vtab.rs b/crates/core/src/operations_vtab.rs index 96b5506..bb60308 100644 --- a/crates/core/src/operations_vtab.rs +++ b/crates/core/src/operations_vtab.rs @@ -1,6 +1,7 @@ extern crate alloc; use alloc::boxed::Box; +use alloc::sync::Arc; use core::ffi::{c_char, c_int, c_void}; use sqlite::{Connection, ResultCode, Value}; @@ -9,6 +10,7 @@ use sqlite_nostd as sqlite; use crate::operations::{ clear_remove_ops, delete_bucket, delete_pending_buckets, insert_operation, }; +use crate::state::DatabaseState; use crate::sync_local::sync_local; use crate::vtab_util::*; @@ -16,6 +18,7 @@ use crate::vtab_util::*; struct VirtualTable { base: sqlite::vtab, db: *mut sqlite::sqlite3, + state: Arc, target_applied: bool, target_validated: bool, @@ -23,7 +26,7 @@ struct VirtualTable { extern "C" fn connect( db: *mut sqlite::sqlite3, - _aux: *mut c_void, + aux: *mut c_void, _argc: c_int, _argv: *const *const c_char, vtab: *mut *mut sqlite::vtab, @@ -43,6 +46,14 @@ extern "C" fn connect( zErrMsg: core::ptr::null_mut(), }, db, + state: { + // Increase refcount - we can't use from_raw alone because we don't own the aux + // data (connect could be called multiple times). + let state = Arc::from_raw(aux as *mut DatabaseState); + let clone = state.clone(); + core::mem::forget(state); + clone + }, target_validated: false, target_applied: false, })); @@ -83,7 +94,7 @@ extern "C" fn update( let result = insert_operation(db, args[3].text()); vtab_result(vtab, result) } else if op == "sync_local" { - let result = sync_local(db, &args[3]); + let result = sync_local(&tab.state, db, &args[3]); if let Ok(result_row) = result { unsafe { *p_row_id = result_row; @@ -139,8 +150,13 @@ static MODULE: sqlite_nostd::module = sqlite_nostd::module { xIntegrity: None, }; -pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { - db.create_module_v2("powersync_operations", &MODULE, None, None)?; +pub fn register(db: *mut sqlite::sqlite3, state: Arc) -> Result<(), ResultCode> { + db.create_module_v2( + "powersync_operations", + &MODULE, + Some(Arc::into_raw(state) as *mut c_void), + Some(DatabaseState::destroy_arc), + )?; Ok(()) } diff --git a/crates/core/src/state.rs b/crates/core/src/state.rs new file mode 100644 index 0000000..79bbb4a --- /dev/null +++ b/crates/core/src/state.rs @@ -0,0 +1,74 @@ +use core::{ + ffi::{c_int, c_void}, + sync::atomic::{AtomicBool, Ordering}, +}; + +use alloc::sync::Arc; +use sqlite::{Connection, ResultCode}; +use sqlite_nostd::{self as sqlite, Context}; + +/// State that is shared for a SQLite database connection after the core extension has been +/// registered on it. +/// +/// `init_extension` allocates an instance of this in an `Arc` that is shared as user-data for +/// functions/vtabs that need access to it. +pub struct DatabaseState { + pub is_in_sync_local: AtomicBool, +} + +impl DatabaseState { + pub fn new() -> Self { + DatabaseState { + is_in_sync_local: AtomicBool::new(false), + } + } + + pub fn sync_local_guard<'a>(&'a self) -> impl Drop + use<'a> { + self.is_in_sync_local + .compare_exchange(false, true, Ordering::Acquire, Ordering::Acquire) + .expect("should not be syncing already"); + + struct ClearOnDrop<'a>(&'a DatabaseState); + + impl Drop for ClearOnDrop<'_> { + fn drop(&mut self) { + self.0.is_in_sync_local.store(false, Ordering::Release); + } + } + + ClearOnDrop(self) + } + + pub unsafe extern "C" fn destroy_arc(ptr: *mut c_void) { + drop(Arc::from_raw(ptr.cast::())); + } +} + +pub fn register(db: *mut sqlite::sqlite3, state: Arc) -> Result<(), ResultCode> { + unsafe extern "C" fn func( + ctx: *mut sqlite::context, + _argc: c_int, + _argv: *mut *mut sqlite::value, + ) { + let data = ctx.user_data().cast::(); + let data = unsafe { data.as_ref() }.unwrap(); + + ctx.result_int(if data.is_in_sync_local.load(Ordering::Relaxed) { + 1 + } else { + 0 + }); + } + + db.create_function_v2( + "powersync_in_sync_operation", + 0, + 0, + Some(Arc::into_raw(state) as *mut c_void), + Some(func), + None, + None, + Some(DatabaseState::destroy_arc), + )?; + Ok(()) +} diff --git a/crates/core/src/sync/interface.rs b/crates/core/src/sync/interface.rs index 64ae502..9afc2d5 100644 --- a/crates/core/src/sync/interface.rs +++ b/crates/core/src/sync/interface.rs @@ -5,6 +5,7 @@ use alloc::borrow::Cow; use alloc::boxed::Box; use alloc::rc::Rc; use alloc::string::ToString; +use alloc::sync::Arc; use alloc::{string::String, vec::Vec}; use serde::{Deserialize, Serialize}; use sqlite::{ResultCode, Value}; @@ -13,6 +14,7 @@ use sqlite_nostd::{Connection, Context}; use crate::error::SQLiteError; use crate::schema::Schema; +use crate::state::DatabaseState; use super::streaming_sync::SyncClient; use super::sync_status::DownloadSyncStatus; @@ -121,7 +123,7 @@ struct SqlController { client: SyncClient, } -pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { +pub fn register(db: *mut sqlite::sqlite3, state: Arc) -> Result<(), ResultCode> { extern "C" fn control( ctx: *mut sqlite::context, argc: c_int, @@ -202,7 +204,7 @@ pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { } let controller = Box::new(SqlController { - client: SyncClient::new(db), + client: SyncClient::new(db, state), }); db.create_function_v2( diff --git a/crates/core/src/sync/mod.rs b/crates/core/src/sync/mod.rs index fb4f02c..2a28044 100644 --- a/crates/core/src/sync/mod.rs +++ b/crates/core/src/sync/mod.rs @@ -1,3 +1,4 @@ +use alloc::sync::Arc; use sqlite_nostd::{self as sqlite, ResultCode}; mod bucket_priority; @@ -13,6 +14,8 @@ mod sync_status; pub use bucket_priority::BucketPriority; pub use checksum::Checksum; -pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { - interface::register(db) +use crate::state::DatabaseState; + +pub fn register(db: *mut sqlite::sqlite3, state: Arc) -> Result<(), ResultCode> { + interface::register(db, state) } diff --git a/crates/core/src/sync/storage_adapter.rs b/crates/core/src/sync/storage_adapter.rs index df42eb0..d866a06 100644 --- a/crates/core/src/sync/storage_adapter.rs +++ b/crates/core/src/sync/storage_adapter.rs @@ -1,6 +1,6 @@ use core::{assert_matches::debug_assert_matches, fmt::Display}; -use alloc::{string::ToString, vec::Vec}; +use alloc::{string::ToString, sync::Arc, vec::Vec}; use serde::Serialize; use sqlite_nostd::{self as sqlite, Connection, ManagedStmt, ResultCode}; use streaming_iterator::StreamingIterator; @@ -10,6 +10,7 @@ use crate::{ ext::SafeManagedStmt, operations::delete_bucket, schema::Schema, + state::DatabaseState, sync::checkpoint::{validate_checkpoint, ChecksumMismatch}, sync_local::{PartialSyncOperation, SyncOperation}, }; @@ -144,6 +145,7 @@ impl StorageAdapter { pub fn sync_local( &self, + state: &DatabaseState, checkpoint: &OwnedCheckpoint, priority: Option, schema: &Schema, @@ -185,7 +187,7 @@ impl StorageAdapter { let sync_result = match priority { None => { - let mut sync = SyncOperation::new(self.db, None); + let mut sync = SyncOperation::new(state, self.db, None); sync.use_schema(schema); sync.apply() } @@ -208,6 +210,7 @@ impl StorageAdapter { // TODO: Avoid this serialization, it's currently used to bind JSON SQL parameters. let serialized_args = serde_json::to_string(&args)?; let mut sync = SyncOperation::new( + state, self.db, Some(PartialSyncOperation { priority, diff --git a/crates/core/src/sync/streaming_sync.rs b/crates/core/src/sync/streaming_sync.rs index c3351d5..0fac873 100644 --- a/crates/core/src/sync/streaming_sync.rs +++ b/crates/core/src/sync/streaming_sync.rs @@ -10,6 +10,7 @@ use alloc::{ collections::{btree_map::BTreeMap, btree_set::BTreeSet}, format, string::{String, ToString}, + sync::Arc, vec::Vec, }; use futures_lite::FutureExt; @@ -18,6 +19,7 @@ use crate::{ bson, error::SQLiteError, kv::client_id, + state::DatabaseState, sync::{checkpoint::OwnedBucketChecksum, interface::StartSyncStream}, }; use sqlite_nostd::{self as sqlite, ResultCode}; @@ -37,14 +39,16 @@ use super::{ /// initialized. pub struct SyncClient { db: *mut sqlite::sqlite3, + db_state: Arc, /// The current [ClientState] (essentially an optional [StreamingSyncIteration]). state: ClientState, } impl SyncClient { - pub fn new(db: *mut sqlite::sqlite3) -> Self { + pub fn new(db: *mut sqlite::sqlite3, state: Arc) -> Self { Self { db, + db_state: state, state: ClientState::Idle, } } @@ -57,7 +61,7 @@ impl SyncClient { SyncControlRequest::StartSyncStream(options) => { self.state.tear_down()?; - let mut handle = SyncIterationHandle::new(self.db, options)?; + let mut handle = SyncIterationHandle::new(self.db, options, self.db_state.clone())?; let instructions = handle.initialize()?; self.state = ClientState::IterationActive(handle); @@ -125,10 +129,15 @@ struct SyncIterationHandle { impl SyncIterationHandle { /// Creates a new sync iteration in a pending state by preparing statements for /// [StorageAdapter] and setting up the initial downloading state for [StorageAdapter] . - fn new(db: *mut sqlite::sqlite3, options: StartSyncStream) -> Result { + fn new( + db: *mut sqlite::sqlite3, + options: StartSyncStream, + state: Arc, + ) -> Result { let runner = StreamingSyncIteration { db, options, + state, adapter: StorageAdapter::new(db)?, status: SyncStatusContainer::new(), }; @@ -192,6 +201,7 @@ impl<'a> ActiveEvent<'a> { struct StreamingSyncIteration { db: *mut sqlite::sqlite3, + state: Arc, adapter: StorageAdapter, options: StartSyncStream, status: SyncStatusContainer, @@ -246,9 +256,12 @@ impl StreamingSyncIteration { SyncEvent::BinaryLine { data } => bson::from_bytes(data)?, SyncEvent::UploadFinished => { if let Some(checkpoint) = validated_but_not_applied.take() { - let result = - self.adapter - .sync_local(&checkpoint, None, &self.options.schema)?; + let result = self.adapter.sync_local( + &self.state, + &checkpoint, + None, + &self.options.schema, + )?; match result { SyncLocalResult::ChangesApplied => { @@ -324,9 +337,9 @@ impl StreamingSyncIteration { ), )); }; - let result = self - .adapter - .sync_local(target, None, &self.options.schema)?; + let result = + self.adapter + .sync_local(&self.state, target, None, &self.options.schema)?; match result { SyncLocalResult::ChecksumFailure(checkpoint_result) => { @@ -369,9 +382,12 @@ impl StreamingSyncIteration { ), )); }; - let result = - self.adapter - .sync_local(target, Some(priority), &self.options.schema)?; + let result = self.adapter.sync_local( + &self.state, + target, + Some(priority), + &self.options.schema, + )?; match result { SyncLocalResult::ChecksumFailure(checkpoint_result) => { diff --git a/crates/core/src/sync_local.rs b/crates/core/src/sync_local.rs index 7497567..3d91a68 100644 --- a/crates/core/src/sync_local.rs +++ b/crates/core/src/sync_local.rs @@ -6,6 +6,7 @@ use serde::Deserialize; use crate::error::{PSResult, SQLiteError}; use crate::schema::{PendingStatement, PendingStatementValue, RawTable, Schema}; +use crate::state::DatabaseState; use crate::sync::BucketPriority; use sqlite_nostd::{self as sqlite, Destructor, ManagedStmt, Value}; use sqlite_nostd::{ColumnType, Connection, ResultCode}; @@ -13,8 +14,12 @@ use sqlite_nostd::{ColumnType, Connection, ResultCode}; use crate::ext::SafeManagedStmt; use crate::util::quote_internal_name; -pub fn sync_local(db: *mut sqlite::sqlite3, data: &V) -> Result { - let mut operation = SyncOperation::from_args(db, data)?; +pub fn sync_local( + state: &DatabaseState, + db: *mut sqlite::sqlite3, + data: &V, +) -> Result { + let mut operation: SyncOperation<'_> = SyncOperation::from_args(state, db, data)?; operation.apply() } @@ -27,14 +32,20 @@ pub struct PartialSyncOperation<'a> { } pub struct SyncOperation<'a> { + state: &'a DatabaseState, db: *mut sqlite::sqlite3, schema: ParsedDatabaseSchema<'a>, partial: Option>, } impl<'a> SyncOperation<'a> { - fn from_args(db: *mut sqlite::sqlite3, data: &'a V) -> Result { + fn from_args( + state: &'a DatabaseState, + db: *mut sqlite::sqlite3, + data: &'a V, + ) -> Result { Ok(Self::new( + state, db, match data.value_type() { ColumnType::Text => { @@ -61,8 +72,13 @@ impl<'a> SyncOperation<'a> { )) } - pub fn new(db: *mut sqlite::sqlite3, partial: Option>) -> Self { + pub fn new( + state: &'a DatabaseState, + db: *mut sqlite::sqlite3, + partial: Option>, + ) -> Self { Self { + state, db, schema: ParsedDatabaseSchema::new(), partial, @@ -109,6 +125,8 @@ impl<'a> SyncOperation<'a> { } pub fn apply(&mut self) -> Result { + let guard = self.state.sync_local_guard(); + if !self.can_apply_sync_changes()? { return Ok(0); } @@ -229,6 +247,7 @@ impl<'a> SyncOperation<'a> { self.set_last_applied_op()?; self.mark_completed()?; + drop(guard); Ok(1) } diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart index 07fcf97..47869de 100644 --- a/dart/test/sync_test.dart +++ b/dart/test/sync_test.dart @@ -677,6 +677,47 @@ void _syncTests({ }); }); + syncTest('sets powersync_in_sync_operation', (_) { + var [row] = db.select('SELECT powersync_in_sync_operation() as r'); + expect(row, {'r': 0}); + + var testInSyncInvocations = []; + + db.createFunction( + functionName: 'test_in_sync', + function: (args) { + testInSyncInvocations.add((args[0] as int) != 0); + return null; + }, + argumentCount: const AllowedArgumentCount(1), + directOnly: false, + ); + + db.execute(''' +CREATE TRIGGER foo AFTER INSERT ON ps_data__items BEGIN + SELECT test_in_sync(powersync_in_sync_operation()); +END; +'''); + + // Run an insert sync iteration to start the trigger + invokeControl('start', null); + pushCheckpoint(buckets: [bucketDescription('a')]); + pushSyncData( + 'a', + '1', + '1', + 'PUT', + {'col': 'foo'}, + objectType: 'items', + ); + pushCheckpointComplete(); + + expect(testInSyncInvocations, [true]); + + [row] = db.select('SELECT powersync_in_sync_operation() as r'); + expect(row, {'r': 0}); + }); + group('raw tables', () { syncTest('smoke test', (_) { db.execute( From bfb14b81b2170f4f2c96816c71d21c59d595aa18 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 10 Jun 2025 17:16:11 +0200 Subject: [PATCH 5/8] Disallow local writes during sync --- crates/core/src/crud_vtab.rs | 34 +++++++++++++++---- crates/core/src/lib.rs | 2 +- crates/core/src/sync/storage_adapter.rs | 2 +- dart/test/sync_test.dart | 44 +++++++++++++++++++++++++ 4 files changed, 73 insertions(+), 9 deletions(-) diff --git a/crates/core/src/crud_vtab.rs b/crates/core/src/crud_vtab.rs index d536390..b51c4f6 100644 --- a/crates/core/src/crud_vtab.rs +++ b/crates/core/src/crud_vtab.rs @@ -2,9 +2,10 @@ extern crate alloc; use alloc::boxed::Box; use alloc::string::String; +use alloc::sync::Arc; use const_format::formatcp; use core::ffi::{c_char, c_int, c_void, CStr}; -use core::ptr::null_mut; +use core::sync::atomic::Ordering; use serde::Serialize; use serde_json::value::RawValue; @@ -15,6 +16,7 @@ use sqlite_nostd::{self as sqlite, ColumnType}; use crate::error::SQLiteError; use crate::ext::SafeManagedStmt; use crate::schema::TableInfoFlags; +use crate::state::DatabaseState; use crate::util::MAX_OP_ID; use crate::vtab_util::*; @@ -41,6 +43,7 @@ struct VirtualTable { db: *mut sqlite::sqlite3, current_tx: Option, is_simple: bool, + state: Arc, } struct ActiveCrudTransaction { @@ -86,6 +89,15 @@ impl VirtualTable { .ok_or_else(|| SQLiteError(ResultCode::MISUSE, Some(String::from("No tx_id"))))?; let db = self.db; + if self.state.is_in_sync_local.load(Ordering::Relaxed) { + // Don't collect CRUD writes while we're syncing the local database - writes made here + // aren't writes we should upload. + // This normally doesn't happen because we insert directly into the data tables, but + // users might have custom raw tables used for sycing with triggers on them to call + // this function. And those specifically should not trigger here. + return Ok(()); + } + match &mut current_tx.mode { CrudTransactionMode::Manual(manual) => { // Columns are (data TEXT, options INT HIDDEN) @@ -258,7 +270,7 @@ fn prepare_lazy( extern "C" fn connect( db: *mut sqlite::sqlite3, - _aux: *mut c_void, + aux: *mut c_void, argc: c_int, argv: *const *const c_char, vtab: *mut *mut sqlite::vtab, @@ -289,6 +301,14 @@ extern "C" fn connect( pModule: core::ptr::null(), zErrMsg: core::ptr::null_mut(), }, + state: { + // Increase refcount - we can't use from_raw alone because we don't own the aux + // data (connect could be called multiple times). + let state = Arc::from_raw(aux as *mut DatabaseState); + let clone = state.clone(); + core::mem::forget(state); + clone + }, db, current_tx: None, is_simple, @@ -380,20 +400,20 @@ static MODULE: sqlite_nostd::module = sqlite_nostd::module { xIntegrity: None, }; -pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { +pub fn register(db: *mut sqlite::sqlite3, state: Arc) -> Result<(), ResultCode> { sqlite::convert_rc(sqlite::create_module_v2( db, SIMPLE_NAME.as_ptr(), &MODULE, - null_mut(), - None, + Arc::into_raw(state.clone()) as *mut c_void, + Some(DatabaseState::destroy_arc), ))?; sqlite::convert_rc(sqlite::create_module_v2( db, MANUAL_NAME.as_ptr(), &MODULE, - null_mut(), - None, + Arc::into_raw(state) as *mut c_void, + Some(DatabaseState::destroy_arc), ))?; Ok(()) diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 5cb8327..da0438c 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -73,7 +73,7 @@ fn init_extension(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { crate::schema::register(db)?; crate::operations_vtab::register(db, state.clone())?; - crate::crud_vtab::register(db)?; + crate::crud_vtab::register(db, state)?; Ok(()) } diff --git a/crates/core/src/sync/storage_adapter.rs b/crates/core/src/sync/storage_adapter.rs index d866a06..2b91118 100644 --- a/crates/core/src/sync/storage_adapter.rs +++ b/crates/core/src/sync/storage_adapter.rs @@ -1,6 +1,6 @@ use core::{assert_matches::debug_assert_matches, fmt::Display}; -use alloc::{string::ToString, sync::Arc, vec::Vec}; +use alloc::{string::ToString, vec::Vec}; use serde::Serialize; use sqlite_nostd::{self as sqlite, Connection, ManagedStmt, ResultCode}; use streaming_iterator::StreamingIterator; diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart index 47869de..bc0c6fb 100644 --- a/dart/test/sync_test.dart +++ b/dart/test/sync_test.dart @@ -780,6 +780,50 @@ END; expect(db.select('SELECT * FROM users'), isEmpty); }); + + test("can't use crud vtab during sync", () { + db.execute( + 'CREATE TABLE users (id TEXT NOT NULL PRIMARY KEY, name TEXT NOT NULL) STRICT;'); + + invokeControl( + 'start', + json.encode({ + 'schema': { + 'raw_tables': [ + { + 'name': 'users', + 'put': { + // Inserting into powersync_crud_ during a sync operation is + // forbidden, that vtab should only collect local writes. + 'sql': "INSERT INTO powersync_crud_(data) VALUES (?);", + 'params': [ + {'Column': 'name'} + ], + }, + 'delete': { + 'sql': 'DELETE FROM users WHERE id = ?', + 'params': ['Id'], + }, + } + ], + 'tables': [], + }, + }), + ); + + // Insert + pushCheckpoint(buckets: [bucketDescription('a')]); + pushSyncData( + 'a', + '1', + 'my_user', + 'PUT', + {'name': 'First user'}, + objectType: 'users', + ); + + expect(pushCheckpointComplete, throwsA(isA())); + }); }); } From b7eacc31b1ed70d144c932b048c58e8ab3401522 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 11 Jun 2025 10:51:13 +0200 Subject: [PATCH 6/8] At least mention new format in options --- docs/sync.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/sync.md b/docs/sync.md index b24684b..f5759a0 100644 --- a/docs/sync.md +++ b/docs/sync.md @@ -13,6 +13,9 @@ The following commands are supported: The payload can either be `null` or an JSON object with: - An optional `parameters: Record` entry, specifying parameters to include in the request to the sync service. + - A `schema: { tables: Table[], raw_tables: RawTable[] }` entry specifying the schema of the database to + use. Regular tables are also inferred from the database itself, but raw tables need to be specified. + If no raw tables are used, the `schema` entry can be omitted. 2. `stop`: No payload, requests the current sync iteration (if any) to be shut down. 3. `line_text`: Payload is a serialized JSON object received from the sync service. 4. `line_binary`: Payload is a BSON-encoded object received from the sync service. From 18d87e3b8486836b0e0d379b3323b86f092e25df Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 23 Jun 2025 07:50:44 -0400 Subject: [PATCH 7/8] Noop instead of exception --- dart/test/sync_test.dart | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart index bc0c6fb..459dc87 100644 --- a/dart/test/sync_test.dart +++ b/dart/test/sync_test.dart @@ -781,7 +781,7 @@ END; expect(db.select('SELECT * FROM users'), isEmpty); }); - test("can't use crud vtab during sync", () { + test("crud vtab is no-op during sync", () { db.execute( 'CREATE TABLE users (id TEXT NOT NULL PRIMARY KEY, name TEXT NOT NULL) STRICT;'); @@ -793,8 +793,6 @@ END; { 'name': 'users', 'put': { - // Inserting into powersync_crud_ during a sync operation is - // forbidden, that vtab should only collect local writes. 'sql': "INSERT INTO powersync_crud_(data) VALUES (?);", 'params': [ {'Column': 'name'} @@ -822,7 +820,8 @@ END; objectType: 'users', ); - expect(pushCheckpointComplete, throwsA(isA())); + pushCheckpointComplete(); + expect(db.select('SELECT * FROM ps_crud'), isEmpty); }); }); } From cf68afd023556fe96230cb3dde307d71c364c3a9 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 23 Jun 2025 14:45:26 -0400 Subject: [PATCH 8/8] Bind doubles, compare parameters --- crates/core/src/sync_local.rs | 17 ++++++++++++++--- sqlite-rs-embedded | 2 +- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/crates/core/src/sync_local.rs b/crates/core/src/sync_local.rs index 3d91a68..a5ce465 100644 --- a/crates/core/src/sync_local.rs +++ b/crates/core/src/sync_local.rs @@ -501,8 +501,20 @@ impl<'a> PreparedPendingStatement<'a> { pub fn prepare( db: *mut sqlite::sqlite3, pending: &'a PendingStatement, - ) -> Result { + ) -> Result { let stmt = db.prepare_v2(&pending.sql)?; + if stmt.bind_parameter_count() as usize != pending.params.len() { + return Err(SQLiteError( + ResultCode::MISUSE, + Some(format!( + "Statement {} has {} parameters, but {} values were provided as sources.", + &pending.sql, + stmt.bind_parameter_count(), + pending.params.len(), + )), + )); + } + // TODO: Compare number of variables / other validity checks? Ok(Self { @@ -534,8 +546,7 @@ impl<'a> PreparedPendingStatement<'a> { } Some(Value::Number(value)) => { if let Some(value) = value.as_f64() { - // ??? there's no bind_double??? - self.stmt.bind_int64(i, value as i64) + self.stmt.bind_double(i, value) } else if let Some(value) = value.as_u64() { self.stmt.bind_int64(i, value as i64) } else { diff --git a/sqlite-rs-embedded b/sqlite-rs-embedded index 8f453f3..0ea6d5f 160000 --- a/sqlite-rs-embedded +++ b/sqlite-rs-embedded @@ -1 +1 @@ -Subproject commit 8f453f36aef9f889d355fb86bcd6ce680a95a00d +Subproject commit 0ea6d5f32b9281437ee1cdad5a1931b3f02b9789