Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 86 additions & 41 deletions crates/core/src/crud_vtab.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,20 @@ struct ActiveCrudTransaction {
}

enum CrudTransactionMode {
Manual {
stmt: ManagedStmt,
},
Simple {
stmt: ManagedStmt,
set_updated_rows: ManagedStmt,
update_local_bucket: ManagedStmt,
},
Manual(ManualCrudTransactionMode),
Simple(SimpleCrudTransactionMode),
}

#[derive(Default)]
struct ManualCrudTransactionMode {
stmt: Option<ManagedStmt>,
}

#[derive(Default)]
struct SimpleCrudTransactionMode {
stmt: Option<ManagedStmt>,
set_updated_rows: Option<ManagedStmt>,
had_writes: bool,
}

impl VirtualTable {
Expand All @@ -73,31 +79,29 @@ impl VirtualTable {
}
}

fn handle_insert(&self, args: &[*mut sqlite::value]) -> Result<(), SQLiteError> {
fn handle_insert(&mut self, args: &[*mut sqlite::value]) -> Result<(), SQLiteError> {
let current_tx = self
.current_tx
.as_ref()
.as_mut()
.ok_or_else(|| SQLiteError(ResultCode::MISUSE, Some(String::from("No tx_id"))))?;
let db = self.db;

match &current_tx.mode {
CrudTransactionMode::Manual { stmt } => {
match &mut current_tx.mode {
CrudTransactionMode::Manual(manual) => {
// Columns are (data TEXT, options INT HIDDEN)
let data = args[0].text();
let flags = match args[1].value_type() {
sqlite_nostd::ColumnType::Null => TableInfoFlags::default(),
_ => TableInfoFlags(args[1].int() as u32),
};

let stmt = manual.raw_crud_statement(db)?;
stmt.bind_int64(1, current_tx.tx_id)?;
stmt.bind_text(2, data, sqlite::Destructor::STATIC)?;
stmt.bind_int(3, flags.0 as i32)?;
stmt.exec()?;
}
CrudTransactionMode::Simple {
stmt,
set_updated_rows,
update_local_bucket,
} => {
CrudTransactionMode::Simple(simple) => {
// Columns are (op TEXT, id TEXT, type TEXT, data TEXT, old_values TEXT, metadata TEXT, options INT HIDDEN)
let flags = match args[6].value_type() {
sqlite_nostd::ColumnType::Null => TableInfoFlags::default(),
Expand Down Expand Up @@ -133,6 +137,7 @@ impl VirtualTable {

// First, we insert into ps_crud like the manual vtab would too. We have to create
// the JSON out of the individual components for that.
let stmt = simple.raw_crud_statement(db)?;
stmt.bind_int64(1, current_tx.tx_id)?;

let serialized = serde_json::to_string(&CrudEntry {
Expand All @@ -151,10 +156,11 @@ impl VirtualTable {
stmt.exec()?;

// However, we also set ps_updated_rows and update the $local bucket
let set_updated_rows = simple.set_updated_rows_statement(db)?;
set_updated_rows.bind_text(1, row_type, sqlite::Destructor::STATIC)?;
set_updated_rows.bind_text(2, id, sqlite::Destructor::STATIC)?;
set_updated_rows.exec()?;
update_local_bucket.exec()?;
simple.record_local_write(db)?;
}
}

Expand All @@ -176,39 +182,78 @@ impl VirtualTable {
self.current_tx = Some(ActiveCrudTransaction {
tx_id,
mode: if self.is_simple {
CrudTransactionMode::Simple {
// language=SQLite
stmt: db.prepare_v3("INSERT INTO ps_crud(tx_id, data) VALUES (?, ?)", 0)?,
// language=SQLite
set_updated_rows: db.prepare_v3(
"INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?, ?)",
0,
)?,
update_local_bucket: db.prepare_v3(formatcp!("INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) VALUES('$local', 0, {MAX_OP_ID})"), 0)?,
}
CrudTransactionMode::Simple(Default::default())
} else {
const SQL: &str = formatcp!(
"\
CrudTransactionMode::Manual(Default::default())
},
});

Ok(())
}

fn end_transaction(&mut self) {
self.current_tx = None;
}
}

impl ManualCrudTransactionMode {
fn raw_crud_statement(&mut self, db: *mut sqlite::sqlite3) -> Result<&ManagedStmt, ResultCode> {
prepare_lazy(&mut self.stmt, || {
const SQL: &str = formatcp!(
"\
WITH insertion (tx_id, data) AS (VALUES (?1, ?2))
INSERT INTO ps_crud(tx_id, data)
SELECT * FROM insertion WHERE (NOT (?3 & {})) OR data->>'op' != 'PATCH' OR data->'data' != '{{}}';
",
TableInfoFlags::IGNORE_EMPTY_UPDATE
);
TableInfoFlags::IGNORE_EMPTY_UPDATE
);

let insert_statement = db.prepare_v3(SQL, 0)?;
CrudTransactionMode::Manual {
stmt: insert_statement,
}
},
});
db.prepare_v3(SQL, 0)
})
}
}

impl SimpleCrudTransactionMode {
fn raw_crud_statement(&mut self, db: *mut sqlite::sqlite3) -> Result<&ManagedStmt, ResultCode> {
prepare_lazy(&mut self.stmt, || {
// language=SQLite
db.prepare_v3("INSERT INTO ps_crud(tx_id, data) VALUES (?, ?)", 0)
})
}

fn set_updated_rows_statement(
&mut self,
db: *mut sqlite::sqlite3,
) -> Result<&ManagedStmt, ResultCode> {
prepare_lazy(&mut self.set_updated_rows, || {
// language=SQLite
db.prepare_v3(
"INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?, ?)",
0,
)
})
}

fn record_local_write(&mut self, db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
if !self.had_writes {
db.exec_safe(formatcp!("INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) VALUES('$local', 0, {MAX_OP_ID})"))?;
self.had_writes = true;
}

Ok(())
}
}

fn end_transaction(&mut self) {
self.current_tx = None;
/// A variant of `Option.get_or_insert` that handles insertions returning errors.
fn prepare_lazy(
stmt: &mut Option<ManagedStmt>,
prepare: impl FnOnce() -> Result<ManagedStmt, ResultCode>,
) -> Result<&ManagedStmt, ResultCode> {
if let None = stmt {
*stmt = Some(prepare()?);
}

return Ok(unsafe { stmt.as_ref().unwrap_unchecked() });
}

extern "C" fn connect(
Expand Down Expand Up @@ -295,7 +340,7 @@ extern "C" fn update(
ResultCode::MISUSE as c_int
} else if rowid.value_type() == sqlite::ColumnType::Null {
// INSERT
let tab = unsafe { &*(vtab.cast::<VirtualTable>()) };
let tab = unsafe { &mut *(vtab.cast::<VirtualTable>()) };
let result = tab.handle_insert(&args[2..]);
vtab_result(vtab, result)
} else {
Expand Down
45 changes: 45 additions & 0 deletions dart/test/crud_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,51 @@ void main() {
'{"op":"PUT","id":"foo","type":"users","data":{"my":"value"},"old":{"previous":"value"}}',
});
});

test('resets state after commit', () {
db.execute('BEGIN');
db.execute(
'INSERT INTO powersync_crud (op, id, type) VALUES (?, ?, ?)', [
'DELETE',
'foo',
'users',
]);
db.execute('commit');

db.execute(
'INSERT INTO powersync_crud (op, id, type) VALUES (?, ?, ?)', [
'DELETE',
'foo',
'users',
]);
expect(db.select('SELECT * FROM ps_crud').map((r) => r['tx_id']),
[1, 2]);
});

test('resets state after rollback', () {
db.execute('BEGIN');
db.execute(
'INSERT INTO powersync_crud (op, id, type) VALUES (?, ?, ?)', [
'DELETE',
'foo',
'users',
]);
db.execute('rollback');

db.execute(
'INSERT INTO powersync_crud (op, id, type) VALUES (?, ?, ?)', [
'DELETE',
'foo2',
'users',
]);
expect(db.select('SELECT * FROM ps_crud'), [
{
'id': 1,
'data': '{"op":"DELETE","id":"foo2","type":"users"}',
'tx_id': 1,
}
]);
});
});
});

Expand Down