@@ -27,6 +27,13 @@ class StreamingSyncImplementation {
27
27
28
28
final Future <void > Function () uploadCrud;
29
29
30
+ // An internal controller which is used to trigger CRUD uploads internally
31
+ // e.g. when reconnecting.
32
+ // This is only a broadcast controller since the `crudLoop` method is public
33
+ // and could potentially be called multiple times externally.
34
+ final StreamController <Null > _internalCrudTriggerController =
35
+ StreamController <Null >.broadcast ();
36
+
30
37
final Stream crudUpdateTriggerStream;
31
38
32
39
final StreamController <SyncStatus > _statusStreamController =
@@ -92,6 +99,9 @@ class StreamingSyncImplementation {
92
99
if (_safeToClose) {
93
100
_client.close ();
94
101
}
102
+
103
+ await _internalCrudTriggerController.close ();
104
+
95
105
// wait for completeAbort() to be called
96
106
await future;
97
107
@@ -144,7 +154,7 @@ class StreamingSyncImplementation {
144
154
145
155
// On error, wait a little before retrying
146
156
// When aborting, don't wait
147
- await Future . any ([ Future . delayed (retryDelay), _abort ! .onAbort] );
157
+ await _delayRetry ( );
148
158
}
149
159
}
150
160
} finally {
@@ -155,10 +165,14 @@ class StreamingSyncImplementation {
155
165
Future <void > crudLoop () async {
156
166
await uploadAllCrud ();
157
167
158
- await for (var _ in crudUpdateTriggerStream) {
159
- if (_abort? .aborted == true ) {
160
- break ;
161
- }
168
+ // Trigger a CRUD upload whenever the upstream trigger fires
169
+ // as-well-as whenever the sync stream reconnects.
170
+ // This has the potential (in rare cases) to affect the crudThrottleTime,
171
+ // but it should not result in excessive uploads since the
172
+ // sync reconnects are also throttled.
173
+ // The stream here is closed on abort.
174
+ await for (var _ in mergeStreams (
175
+ [crudUpdateTriggerStream, _internalCrudTriggerController.stream])) {
162
176
await uploadAllCrud ();
163
177
}
164
178
}
@@ -170,6 +184,13 @@ class StreamingSyncImplementation {
170
184
171
185
while (true ) {
172
186
try {
187
+ // It's possible that an abort or disconnect operation could
188
+ // be followed by a `close` operation. The close would cause these
189
+ // operations, which use the DB, to throw an exception. Breaking the loop
190
+ // here prevents unnecessary potential (caught) exceptions.
191
+ if (aborted) {
192
+ break ;
193
+ }
173
194
// This is the first item in the FIFO CRUD queue.
174
195
CrudEntry ? nextCrudItem = await adapter.nextCrudItem ();
175
196
if (nextCrudItem != null ) {
@@ -196,7 +217,7 @@ class StreamingSyncImplementation {
196
217
checkedCrudItem = null ;
197
218
isolateLogger.warning ('Data upload error' , e, stacktrace);
198
219
_updateStatus (uploading: false , uploadError: e);
199
- await Future . delayed (retryDelay );
220
+ await _delayRetry ( );
200
221
if (! isConnected) {
201
222
// Exit the upload loop if the sync stream is no longer connected
202
223
break ;
@@ -298,6 +319,9 @@ class StreamingSyncImplementation {
298
319
Future <void >? credentialsInvalidation;
299
320
bool haveInvalidated = false ;
300
321
322
+ // Trigger a CRUD upload on reconnect
323
+ _internalCrudTriggerController.add (null );
324
+
301
325
await for (var line in merged) {
302
326
if (aborted) {
303
327
break ;
@@ -465,6 +489,12 @@ class StreamingSyncImplementation {
465
489
yield parseStreamingSyncLine (line as Map <String , dynamic >);
466
490
}
467
491
}
492
+
493
+ /// Delays the standard `retryDelay` Duration, but exits early if
494
+ /// an abort has been requested.
495
+ Future <void > _delayRetry () async {
496
+ await Future .any ([Future .delayed (retryDelay), _abort! .onAbort]);
497
+ }
468
498
}
469
499
470
500
/// Attempt to give a basic summary of the error for cases where the full error
0 commit comments