Skip to content

Commit d267563

Browse files
committed
Fix crudMutex and uploading status
1 parent b492be8 commit d267563

File tree

1 file changed

+41
-38
lines changed

1 file changed

+41
-38
lines changed

packages/powersync/lib/src/streaming_sync.dart

Lines changed: 41 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -164,49 +164,52 @@ class StreamingSyncImplementation {
164164
}
165165

166166
Future<void> uploadAllCrud() async {
167-
// Keep track of the first item in the CRUD queue for the last `uploadCrud` iteration.
168-
CrudEntry? checkedCrudItem;
169-
170-
while (true) {
171-
try {
172-
// This is the first item in the FIFO CRUD queue.
173-
CrudEntry? nextCrudItem = await adapter.nextCrudItem();
174-
if (nextCrudItem != null) {
175-
if (nextCrudItem.clientId == checkedCrudItem?.clientId) {
176-
// This will force a higher log level than exceptions which are caught here.
177-
isolateLogger.warning(
178-
"""Potentially previously uploaded CRUD entries are still present in the upload queue.
167+
return crudMutex.lock(() async {
168+
// Keep track of the first item in the CRUD queue for the last `uploadCrud` iteration.
169+
CrudEntry? checkedCrudItem;
170+
171+
while (true) {
172+
_updateStatus(uploading: true);
173+
try {
174+
// This is the first item in the FIFO CRUD queue.
175+
CrudEntry? nextCrudItem = await adapter.nextCrudItem();
176+
if (nextCrudItem != null) {
177+
if (nextCrudItem.clientId == checkedCrudItem?.clientId) {
178+
// This will force a higher log level than exceptions which are caught here.
179+
isolateLogger.warning(
180+
"""Potentially previously uploaded CRUD entries are still present in the upload queue.
179181
Make sure to handle uploads and complete CRUD transactions or batches by calling and awaiting their [.complete()] method.
180182
The next upload iteration will be delayed.""");
181-
throw Exception(
182-
'Delaying due to previously encountered CRUD item.');
183-
}
183+
throw Exception(
184+
'Delaying due to previously encountered CRUD item.');
185+
}
184186

185-
checkedCrudItem = nextCrudItem;
186-
await uploadCrud();
187-
_updateStatus(uploadError: _noError);
188-
} else {
189-
// Uploading is completed
190-
await adapter.updateLocalTarget(() => getWriteCheckpoint());
191-
break;
192-
}
193-
} catch (e, stacktrace) {
194-
checkedCrudItem = null;
195-
isolateLogger.warning('Data upload error', e, stacktrace);
196-
_updateStatus(uploading: false, uploadError: e);
197-
await Future.delayed(retryDelay);
198-
if (!isConnected) {
199-
// Exit the upload loop if the sync stream is no longer connected
200-
break;
187+
checkedCrudItem = nextCrudItem;
188+
await uploadCrud();
189+
_updateStatus(uploadError: _noError);
190+
} else {
191+
// Uploading is completed
192+
await adapter.updateLocalTarget(() => getWriteCheckpoint());
193+
break;
194+
}
195+
} catch (e, stacktrace) {
196+
checkedCrudItem = null;
197+
isolateLogger.warning('Data upload error', e, stacktrace);
198+
_updateStatus(uploading: false, uploadError: e);
199+
await Future.delayed(retryDelay);
200+
if (!isConnected) {
201+
// Exit the upload loop if the sync stream is no longer connected
202+
break;
203+
}
204+
isolateLogger.warning(
205+
"Caught exception when uploading. Upload will retry after a delay",
206+
e,
207+
stacktrace);
208+
} finally {
209+
_updateStatus(uploading: false);
201210
}
202-
isolateLogger.warning(
203-
"Caught exception when uploading. Upload will retry after a delay",
204-
e,
205-
stacktrace);
206-
} finally {
207-
_updateStatus(uploading: false);
208211
}
209-
}
212+
});
210213
}
211214

212215
Future<String> getWriteCheckpoint() async {

0 commit comments

Comments
 (0)