@@ -208,11 +208,13 @@ impl StreamingSyncIteration {
208208 }
209209
210210 async fn run ( mut self ) -> Result < ( ) , SQLiteError > {
211- let mut validated = None :: < OwnedCheckpoint > ;
212- let mut applied = None :: < OwnedCheckpoint > ;
213-
214211 let mut target = SyncTarget :: BeforeCheckpoint ( self . prepare_request ( ) . await ?) ;
215212
213+ // A checkpoint that has been fully received and validated, but couldn't be applied due to
214+ // pending local data. We will retry applying this checkpoint when the client SDK informs us
215+ // that it has finished uploading changes.
216+ let mut validated_but_not_applied = None :: < OwnedCheckpoint > ;
217+
216218 loop {
217219 let event = Self :: receive_event ( ) . await ;
218220
@@ -223,14 +225,44 @@ impl StreamingSyncIteration {
223225 SyncEvent :: TearDown => break ,
224226 SyncEvent :: TextLine { data } => serde_json:: from_str ( data) ?,
225227 SyncEvent :: BinaryLine { data } => bson:: from_bytes ( data) ?,
228+ SyncEvent :: UploadFinished => {
229+ if let Some ( checkpoint) = validated_but_not_applied. take ( ) {
230+ let result = self . adapter . sync_local ( & checkpoint, None ) ?;
231+
232+ match result {
233+ SyncLocalResult :: ChangesApplied => {
234+ event. instructions . push ( Instruction :: LogLine {
235+ severity : LogSeverity :: DEBUG ,
236+ line : "Applied pending checkpoint after completed upload"
237+ . into ( ) ,
238+ } ) ;
239+
240+ self . handle_checkpoint_applied ( & checkpoint, event) ?;
241+ }
242+ _ => {
243+ event. instructions . push ( Instruction :: LogLine {
244+ severity : LogSeverity :: WARNING ,
245+ line : "Could not apply pending checkpoint even after completed upload"
246+ . into ( ) ,
247+ } ) ;
248+ }
249+ }
250+ }
251+
252+ continue ;
253+ }
226254 SyncEvent :: DidRefreshToken => {
227255 // Break so that the client SDK starts another iteration.
228256 break ;
229257 }
230258 } ;
231259
260+ self . status
261+ . update ( |s| s. mark_connected ( ) , & mut event. instructions ) ;
262+
232263 match line {
233264 SyncLine :: Checkpoint ( checkpoint) => {
265+ validated_but_not_applied = None ;
234266 let to_delete = target. track_checkpoint ( & checkpoint) ;
235267
236268 self . adapter
@@ -252,6 +284,7 @@ impl StreamingSyncIteration {
252284 } ;
253285
254286 target. apply_diff ( & diff) ;
287+ validated_but_not_applied = None ;
255288 self . adapter
256289 . delete_buckets ( diff. removed_buckets . iter ( ) . copied ( ) ) ?;
257290
@@ -280,29 +313,25 @@ impl StreamingSyncIteration {
280313 // await new Promise((resolve) => setTimeout(resolve, 50));
281314 event. instructions . push ( Instruction :: LogLine {
282315 severity : LogSeverity :: WARNING ,
283- line : format ! ( "Could not apply checkpoint, {checkpoint_result}" ) ,
316+ line : format ! ( "Could not apply checkpoint, {checkpoint_result}" )
317+ . into ( ) ,
284318 } ) ;
285319 break ;
286320 }
287321 SyncLocalResult :: PendingLocalChanges => {
288322 event. instructions . push ( Instruction :: LogLine {
289- severity : LogSeverity :: WARNING ,
290- line : format ! ( "TODO: Await pending uploads and try again" ) ,
291- } ) ;
292- break ;
323+ severity : LogSeverity :: INFO ,
324+ line : "Could not apply checkpoint due to local data. Will retry at completed upload or next checkpoint." . into ( ) ,
325+ } ) ;
326+
327+ validated_but_not_applied = Some ( target. clone ( ) ) ;
293328 }
294329 SyncLocalResult :: ChangesApplied => {
295330 event. instructions . push ( Instruction :: LogLine {
296331 severity : LogSeverity :: DEBUG ,
297- line : format ! ( "Validated and applied checkpoint" ) ,
332+ line : "Validated and applied checkpoint" . into ( ) ,
298333 } ) ;
299- event. instructions . push ( Instruction :: DidCompleteSync { } ) ;
300-
301- let now = self . adapter . now ( ) ?;
302- self . status . update (
303- |status| status. applied_checkpoint ( target, now) ,
304- & mut event. instructions ,
305- ) ;
334+ self . handle_checkpoint_applied ( target, event) ?;
306335 }
307336 }
308337 }
@@ -328,7 +357,8 @@ impl StreamingSyncIteration {
328357 severity : LogSeverity :: WARNING ,
329358 line : format ! (
330359 "Could not apply partial checkpoint, {checkpoint_result}"
331- ) ,
360+ )
361+ . into ( ) ,
332362 } ) ;
333363 break ;
334364 }
@@ -407,6 +437,22 @@ impl StreamingSyncIteration {
407437 . push ( Instruction :: EstablishSyncStream { request } ) ;
408438 Ok ( local_bucket_names)
409439 }
440+
441+ fn handle_checkpoint_applied (
442+ & mut self ,
443+ target : & OwnedCheckpoint ,
444+ event : & mut ActiveEvent ,
445+ ) -> Result < ( ) , ResultCode > {
446+ event. instructions . push ( Instruction :: DidCompleteSync { } ) ;
447+
448+ let now = self . adapter . now ( ) ?;
449+ self . status . update (
450+ |status| status. applied_checkpoint ( target, now) ,
451+ & mut event. instructions ,
452+ ) ;
453+
454+ Ok ( ( ) )
455+ }
410456}
411457
412458#[ derive( Debug ) ]
@@ -451,7 +497,7 @@ impl SyncTarget {
451497 }
452498}
453499
454- #[ derive( Debug ) ]
500+ #[ derive( Debug , Clone ) ]
455501pub struct OwnedCheckpoint {
456502 pub last_op_id : i64 ,
457503 pub write_checkpoint : Option < i64 > ,
@@ -485,7 +531,7 @@ impl OwnedCheckpoint {
485531 }
486532}
487533
488- #[ derive( Debug ) ]
534+ #[ derive( Debug , Clone ) ]
489535pub struct OwnedBucketChecksum {
490536 pub bucket : String ,
491537 pub checksum : i32 ,
0 commit comments