Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ serde = { version = "1.0", default-features = false, features = ["alloc", "deriv
const_format = "0.2.34"
futures-lite = { version = "2.6.0", default-features = false, features = ["alloc"] }
rustc-hash = { version = "2.1", default-features = false }
streaming-iterator = { version = "0.1.9", default-features = false, features = ["alloc"] }

[dependencies.uuid]
version = "1.4.1"
Expand Down
51 changes: 18 additions & 33 deletions crates/core/src/sync/storage_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use core::{assert_matches::debug_assert_matches, fmt::Display};
use alloc::{string::ToString, vec::Vec};
use serde::Serialize;
use sqlite_nostd::{self as sqlite, Connection, ManagedStmt, ResultCode};
use streaming_iterator::StreamingIterator;

use crate::{
error::SQLiteError,
Expand All @@ -25,7 +24,7 @@ use super::{
/// used frequently as an optimization, but we're not taking advantage of that yet.
pub struct StorageAdapter {
pub db: *mut sqlite::sqlite3,
progress_stmt: ManagedStmt,
pub progress_stmt: ManagedStmt,
time_stmt: ManagedStmt,
}

Expand Down Expand Up @@ -78,37 +77,22 @@ impl StorageAdapter {
Ok(())
}

pub fn local_progress(
&self,
) -> Result<
impl StreamingIterator<Item = Result<PersistedBucketProgress, ResultCode>>,
ResultCode,
> {
self.progress_stmt.reset()?;

fn step(stmt: &ManagedStmt) -> Result<Option<PersistedBucketProgress>, ResultCode> {
if stmt.step()? == ResultCode::ROW {
let bucket = stmt.column_text(0)?;
let count_at_last = stmt.column_int64(1);
let count_since_last = stmt.column_int64(2);

return Ok(Some(PersistedBucketProgress {
bucket,
count_at_last,
count_since_last,
}));
}

pub fn step_progress(&self) -> Result<Option<PersistedBucketProgress>, ResultCode> {
if self.progress_stmt.step()? == ResultCode::ROW {
let bucket = self.progress_stmt.column_text(0)?;
let count_at_last = self.progress_stmt.column_int64(1);
let count_since_last = self.progress_stmt.column_int64(2);

Ok(Some(PersistedBucketProgress {
bucket,
count_at_last,
count_since_last,
}))
} else {
// Done
self.progress_stmt.reset()?;
Ok(None)
}

Ok(streaming_iterator::from_fn(|| {
match step(&self.progress_stmt) {
Err(e) => Some(Err(e)),
Ok(Some(other)) => Some(Ok(other)),
Ok(None) => None,
}
}))
}

pub fn reset_progress(&self) -> Result<(), ResultCode> {
Expand Down Expand Up @@ -239,10 +223,11 @@ impl StorageAdapter {
}

pub fn now(&self) -> Result<Timestamp, ResultCode> {
self.time_stmt.reset()?;
self.time_stmt.step()?;
let res = Timestamp(self.time_stmt.column_int64(0));
self.time_stmt.reset()?;

Ok(Timestamp(self.time_stmt.column_int64(0)))
Ok(res)
}
}

Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/sync/streaming_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,11 +425,10 @@ impl StreamingSyncIteration {
&self,
checkpoint: &OwnedCheckpoint,
) -> Result<SyncDownloadProgress, SQLiteError> {
let local_progress = self.adapter.local_progress()?;
let SyncProgressFromCheckpoint {
progress,
needs_counter_reset,
} = SyncDownloadProgress::for_checkpoint(checkpoint, local_progress)?;
} = SyncDownloadProgress::for_checkpoint(checkpoint, &self.adapter)?;

if needs_counter_reset {
self.adapter.reset_progress()?;
Expand Down
20 changes: 10 additions & 10 deletions crates/core/src/sync/sync_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ use core::{cell::RefCell, hash::BuildHasher};
use rustc_hash::FxBuildHasher;
use serde::Serialize;
use sqlite_nostd::ResultCode;
use streaming_iterator::StreamingIterator;

use crate::sync::storage_adapter::StorageAdapter;

use super::{
bucket_priority::BucketPriority, interface::Instruction, line::DataLine,
storage_adapter::PersistedBucketProgress, streaming_sync::OwnedCheckpoint,
streaming_sync::OwnedCheckpoint,
};

/// Information about a progressing download.
Expand Down Expand Up @@ -187,9 +188,7 @@ pub struct SyncProgressFromCheckpoint {
impl SyncDownloadProgress {
pub fn for_checkpoint<'a>(
checkpoint: &OwnedCheckpoint,
mut local_progress: impl StreamingIterator<
Item = Result<PersistedBucketProgress<'a>, ResultCode>,
>,
adapter: &StorageAdapter,
) -> Result<SyncProgressFromCheckpoint, ResultCode> {
let mut buckets = BTreeMap::<String, BucketProgress>::new();
let mut needs_reset = false;
Expand All @@ -206,12 +205,11 @@ impl SyncDownloadProgress {
);
}

while let Some(row) = local_progress.next() {
let row = match row {
Ok(row) => row,
Err(e) => return Err(*e),
};
// Go through local bucket states to detect pending progress from previous sync iterations
// that may have been interrupted.
adapter.progress_stmt.reset()?;

while let Some(row) = adapter.step_progress()? {
let Some(progress) = buckets.get_mut(row.bucket) else {
continue;
};
Expand All @@ -232,6 +230,8 @@ impl SyncDownloadProgress {
}
}

adapter.progress_stmt.reset()?;

Ok(SyncProgressFromCheckpoint {
progress: Self { buckets },
needs_counter_reset: needs_reset,
Expand Down
8 changes: 8 additions & 0 deletions dart/test/sync_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ void _syncTests<T>({
db.execute('begin');
final [row] =
db.select('SELECT powersync_control(?, ?)', [operation, data]);

// Make sure that powersync_control doesn't leave any busy statements
// behind.
// TODO: Re-enable after we can guarantee sqlite_stmt being available
// const statement = 'SELECT * FROM sqlite_stmt WHERE busy AND sql != ?;';
// final busy = db.select(statement, [statement]);
// expect(busy, isEmpty);

db.execute('commit');
return jsonDecode(row.columnAt(0));
}
Expand Down