diff --git a/Cargo.lock b/Cargo.lock index 4c081087..5f37d73d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -252,7 +252,6 @@ dependencies = [ "serde", "serde_json", "sqlite_nostd", - "streaming-iterator", "uuid", ] @@ -418,12 +417,6 @@ dependencies = [ "sqlite3_capi", ] -[[package]] -name = "streaming-iterator" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b2231b7c3057d5e4ad0156fb3dc807d900806020c5ffa3ee6ff2c8c76fb8520" - [[package]] name = "syn" version = "1.0.109" diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index a4bc4d18..e39d2b72 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -22,7 +22,6 @@ serde = { version = "1.0", default-features = false, features = ["alloc", "deriv const_format = "0.2.34" futures-lite = { version = "2.6.0", default-features = false, features = ["alloc"] } rustc-hash = { version = "2.1", default-features = false } -streaming-iterator = { version = "0.1.9", default-features = false, features = ["alloc"] } [dependencies.uuid] version = "1.4.1" diff --git a/crates/core/src/sync/storage_adapter.rs b/crates/core/src/sync/storage_adapter.rs index ed71b79b..c2d0fdea 100644 --- a/crates/core/src/sync/storage_adapter.rs +++ b/crates/core/src/sync/storage_adapter.rs @@ -3,7 +3,6 @@ use core::{assert_matches::debug_assert_matches, fmt::Display}; use alloc::{string::ToString, vec::Vec}; use serde::Serialize; use sqlite_nostd::{self as sqlite, Connection, ManagedStmt, ResultCode}; -use streaming_iterator::StreamingIterator; use crate::{ error::SQLiteError, @@ -25,7 +24,7 @@ use super::{ /// used frequently as an optimization, but we're not taking advantage of that yet. pub struct StorageAdapter { pub db: *mut sqlite::sqlite3, - progress_stmt: ManagedStmt, + pub progress_stmt: ManagedStmt, time_stmt: ManagedStmt, } @@ -78,37 +77,22 @@ impl StorageAdapter { Ok(()) } - pub fn local_progress( - &self, - ) -> Result< - impl StreamingIterator>, - ResultCode, - > { - self.progress_stmt.reset()?; - - fn step(stmt: &ManagedStmt) -> Result, ResultCode> { - if stmt.step()? == ResultCode::ROW { - let bucket = stmt.column_text(0)?; - let count_at_last = stmt.column_int64(1); - let count_since_last = stmt.column_int64(2); - - return Ok(Some(PersistedBucketProgress { - bucket, - count_at_last, - count_since_last, - })); - } - + pub fn step_progress(&self) -> Result, ResultCode> { + if self.progress_stmt.step()? == ResultCode::ROW { + let bucket = self.progress_stmt.column_text(0)?; + let count_at_last = self.progress_stmt.column_int64(1); + let count_since_last = self.progress_stmt.column_int64(2); + + Ok(Some(PersistedBucketProgress { + bucket, + count_at_last, + count_since_last, + })) + } else { + // Done + self.progress_stmt.reset()?; Ok(None) } - - Ok(streaming_iterator::from_fn(|| { - match step(&self.progress_stmt) { - Err(e) => Some(Err(e)), - Ok(Some(other)) => Some(Ok(other)), - Ok(None) => None, - } - })) } pub fn reset_progress(&self) -> Result<(), ResultCode> { @@ -239,10 +223,11 @@ impl StorageAdapter { } pub fn now(&self) -> Result { - self.time_stmt.reset()?; self.time_stmt.step()?; + let res = Timestamp(self.time_stmt.column_int64(0)); + self.time_stmt.reset()?; - Ok(Timestamp(self.time_stmt.column_int64(0))) + Ok(res) } } diff --git a/crates/core/src/sync/streaming_sync.rs b/crates/core/src/sync/streaming_sync.rs index d5f2f516..1c74a587 100644 --- a/crates/core/src/sync/streaming_sync.rs +++ b/crates/core/src/sync/streaming_sync.rs @@ -425,11 +425,10 @@ impl StreamingSyncIteration { &self, checkpoint: &OwnedCheckpoint, ) -> Result { - let local_progress = self.adapter.local_progress()?; let SyncProgressFromCheckpoint { progress, needs_counter_reset, - } = SyncDownloadProgress::for_checkpoint(checkpoint, local_progress)?; + } = SyncDownloadProgress::for_checkpoint(checkpoint, &self.adapter)?; if needs_counter_reset { self.adapter.reset_progress()?; diff --git a/crates/core/src/sync/sync_status.rs b/crates/core/src/sync/sync_status.rs index e6744fa7..95340950 100644 --- a/crates/core/src/sync/sync_status.rs +++ b/crates/core/src/sync/sync_status.rs @@ -3,11 +3,12 @@ use core::{cell::RefCell, hash::BuildHasher}; use rustc_hash::FxBuildHasher; use serde::Serialize; use sqlite_nostd::ResultCode; -use streaming_iterator::StreamingIterator; + +use crate::sync::storage_adapter::StorageAdapter; use super::{ bucket_priority::BucketPriority, interface::Instruction, line::DataLine, - storage_adapter::PersistedBucketProgress, streaming_sync::OwnedCheckpoint, + streaming_sync::OwnedCheckpoint, }; /// Information about a progressing download. @@ -187,9 +188,7 @@ pub struct SyncProgressFromCheckpoint { impl SyncDownloadProgress { pub fn for_checkpoint<'a>( checkpoint: &OwnedCheckpoint, - mut local_progress: impl StreamingIterator< - Item = Result, ResultCode>, - >, + adapter: &StorageAdapter, ) -> Result { let mut buckets = BTreeMap::::new(); let mut needs_reset = false; @@ -206,12 +205,11 @@ impl SyncDownloadProgress { ); } - while let Some(row) = local_progress.next() { - let row = match row { - Ok(row) => row, - Err(e) => return Err(*e), - }; + // Go through local bucket states to detect pending progress from previous sync iterations + // that may have been interrupted. + adapter.progress_stmt.reset()?; + while let Some(row) = adapter.step_progress()? { let Some(progress) = buckets.get_mut(row.bucket) else { continue; }; @@ -232,6 +230,8 @@ impl SyncDownloadProgress { } } + adapter.progress_stmt.reset()?; + Ok(SyncProgressFromCheckpoint { progress: Self { buckets }, needs_counter_reset: needs_reset, diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart index fe786660..a55c6a09 100644 --- a/dart/test/sync_test.dart +++ b/dart/test/sync_test.dart @@ -52,6 +52,14 @@ void _syncTests({ db.execute('begin'); final [row] = db.select('SELECT powersync_control(?, ?)', [operation, data]); + + // Make sure that powersync_control doesn't leave any busy statements + // behind. + // TODO: Re-enable after we can guarantee sqlite_stmt being available + // const statement = 'SELECT * FROM sqlite_stmt WHERE busy AND sql != ?;'; + // final busy = db.select(statement, [statement]); + // expect(busy, isEmpty); + db.execute('commit'); return jsonDecode(row.columnAt(0)); }