From 996beeb6971d85088f6234fe558b3ecb161d9760 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 5 Feb 2025 12:57:45 +0100 Subject: [PATCH 1/2] Add tests for stream throttling --- .../lib/src/update_notification.dart | 25 ++-- packages/sqlite_async/pubspec.yaml | 1 + .../test/update_notification_test.dart | 134 ++++++++++++++++++ 3 files changed, 148 insertions(+), 12 deletions(-) create mode 100644 packages/sqlite_async/test/update_notification_test.dart diff --git a/packages/sqlite_async/lib/src/update_notification.dart b/packages/sqlite_async/lib/src/update_notification.dart index 8141df9..7832e32 100644 --- a/packages/sqlite_async/lib/src/update_notification.dart +++ b/packages/sqlite_async/lib/src/update_notification.dart @@ -62,12 +62,8 @@ class UpdateNotification { static StreamTransformer filterTablesTransformer(Iterable tables) { Set normalized = {for (var table in tables) table.toLowerCase()}; - return StreamTransformer.fromHandlers(handleData: (data, sink) { - if (data.containsAny(normalized)) { - sink.add(data); - } - }); + return StreamTransformer.fromBind( + (source) => source.where((data) => data.containsAny(normalized))); } } @@ -77,21 +73,22 @@ class UpdateNotification { /// Behaviour: /// If there was no event in "timeout", and one comes in, it is pushed immediately. /// Otherwise, we wait until the timeout is over. -Stream _throttleStream(Stream input, Duration timeout, +Stream _throttleStream(Stream input, Duration timeout, {bool throttleFirst = false, T Function(T, T)? add, T? addOne}) async* { var nextPing = Completer(); + var done = false; T? lastData; var listener = input.listen((data) { - if (lastData is T && add != null) { - lastData = add(lastData as T, data); + if (lastData != null && add != null) { + lastData = add(lastData!, data); } else { lastData = data; } if (!nextPing.isCompleted) { nextPing.complete(); } - }); + }, onDone: () => done = true); try { if (addOne != null) { @@ -100,7 +97,7 @@ Stream _throttleStream(Stream input, Duration timeout, if (throttleFirst) { await Future.delayed(timeout); } - while (true) { + while (!done) { // If a value is available now, we'll use it immediately. // If not, this waits for it. await nextPing.future; @@ -114,6 +111,10 @@ Stream _throttleStream(Stream input, Duration timeout, await Future.delayed(timeout); } } finally { - listener.cancel(); + if (lastData case final data?) { + yield data; + } + + await listener.cancel(); } } diff --git a/packages/sqlite_async/pubspec.yaml b/packages/sqlite_async/pubspec.yaml index d333166..8aac3d6 100644 --- a/packages/sqlite_async/pubspec.yaml +++ b/packages/sqlite_async/pubspec.yaml @@ -34,6 +34,7 @@ dev_dependencies: stream_channel: ^2.1.2 path: ^1.9.0 test_descriptor: ^2.0.2 + fake_async: ^1.3.3 platforms: android: diff --git a/packages/sqlite_async/test/update_notification_test.dart b/packages/sqlite_async/test/update_notification_test.dart new file mode 100644 index 0000000..1410862 --- /dev/null +++ b/packages/sqlite_async/test/update_notification_test.dart @@ -0,0 +1,134 @@ +import 'dart:async'; + +import 'package:fake_async/fake_async.dart'; +import 'package:sqlite_async/src/update_notification.dart'; +import 'package:test/test.dart'; + +void main() { + group('Update notifications', () { + const timeout = Duration(seconds: 10); + const halfTimeout = Duration(seconds: 5); + + group('throttle', () { + test('can add initial', () { + fakeAsync((control) { + final source = StreamController(sync: true); + final events = []; + + UpdateNotification.throttleStream(source.stream, timeout, + addOne: UpdateNotification({'a'})).listen(events.add); + + control.flushMicrotasks(); + expect(events, hasLength(1)); + control.elapse(halfTimeout); + + source.add(UpdateNotification({'b'})); + expect(events, hasLength(1)); // Still a delay from the initial one + + control.elapse(halfTimeout); + expect(events, hasLength(2)); + }); + }); + + test('sends events after initial throttle', () { + fakeAsync((control) { + final source = StreamController(sync: true); + final events = []; + + UpdateNotification.throttleStream(source.stream, timeout) + .listen(events.add); + + source.add(UpdateNotification({'a'})); + control.elapse(halfTimeout); + expect(events, isEmpty); + + control.elapse(halfTimeout); + expect(events, hasLength(1)); + }); + }); + + test('merges events', () { + fakeAsync((control) { + final source = StreamController(sync: true); + final events = []; + + UpdateNotification.throttleStream(source.stream, timeout) + .listen(events.add); + + source.add(UpdateNotification({'a'})); + control.elapse(halfTimeout); + expect(events, isEmpty); + + source.add(UpdateNotification({'b'})); + control.elapse(halfTimeout); + expect(events, [ + UpdateNotification({'a', 'b'}) + ]); + }); + }); + + test('forwards cancellations', () { + fakeAsync((control) { + var cancelled = false; + final source = StreamController(sync: true) + ..onCancel = () => cancelled = true; + + final sub = UpdateNotification.throttleStream(source.stream, timeout) + .listen((_) => fail('unexpected event'), + onDone: () => fail('unexpected done')); + + source.add(UpdateNotification({'a'})); + control.elapse(halfTimeout); + + sub.cancel(); + control.flushTimers(); + + expect(cancelled, isTrue); + expect(control.pendingTimers, isEmpty); + }); + }); + + test('closes when source closes', () { + fakeAsync((control) { + final source = StreamController(sync: true) + ..onCancel = () => Future.value(); + final events = []; + var done = false; + + UpdateNotification.throttleStream(source.stream, timeout) + .listen(events.add, onDone: () => done = true); + + source + // These two are combined due to throttleFirst + ..add(UpdateNotification({'a'})) + ..add(UpdateNotification({'b'})) + ..close(); + + control.flushTimers(); + expect(events, [ + UpdateNotification({'a', 'b'}) + ]); + expect(done, isTrue); + expect(control.pendingTimers, isEmpty); + }); + }); + }); + + test('filter tables', () async { + final source = StreamController(sync: true); + final events = []; + final subscription = UpdateNotification.filterTablesTransformer(['a']) + .bind(source.stream) + .listen(events.add); + + source.add(UpdateNotification({'a', 'b'})); + expect(events, hasLength(1)); + + source.add(UpdateNotification({'b'})); + expect(events, hasLength(1)); + + await subscription.cancel(); + expect(source.hasListener, isFalse); + }); + }); +} From 3a0b54a0ae897900634e6f00b7585a17336a8422 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 5 Feb 2025 13:16:06 +0100 Subject: [PATCH 2/2] Fix closing after delay --- .../lib/src/update_notification.dart | 10 +++++++++- .../test/update_notification_test.dart | 20 +++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/packages/sqlite_async/lib/src/update_notification.dart b/packages/sqlite_async/lib/src/update_notification.dart index 7832e32..0c8f2c6 100644 --- a/packages/sqlite_async/lib/src/update_notification.dart +++ b/packages/sqlite_async/lib/src/update_notification.dart @@ -88,7 +88,13 @@ Stream _throttleStream(Stream input, Duration timeout, if (!nextPing.isCompleted) { nextPing.complete(); } - }, onDone: () => done = true); + }, onDone: () { + if (!nextPing.isCompleted) { + nextPing.complete(); + } + + done = true; + }); try { if (addOne != null) { @@ -101,6 +107,8 @@ Stream _throttleStream(Stream input, Duration timeout, // If a value is available now, we'll use it immediately. // If not, this waits for it. await nextPing.future; + if (done) break; + // Capture any new values coming in while we wait. nextPing = Completer(); T data = lastData as T; diff --git a/packages/sqlite_async/test/update_notification_test.dart b/packages/sqlite_async/test/update_notification_test.dart index 1410862..0a00ccb 100644 --- a/packages/sqlite_async/test/update_notification_test.dart +++ b/packages/sqlite_async/test/update_notification_test.dart @@ -112,6 +112,26 @@ void main() { expect(control.pendingTimers, isEmpty); }); }); + + test('closes when source closes after delay', () { + fakeAsync((control) { + final source = StreamController(sync: true) + ..onCancel = () => Future.value(); + final events = []; + var done = false; + + UpdateNotification.throttleStream(source.stream, timeout) + .listen(events.add, onDone: () => done = true); + + control.elapse(const Duration(hours: 1)); + source.close(); + + control.flushTimers(); + expect(events, isEmpty); + expect(done, isTrue); + expect(control.pendingTimers, isEmpty); + }); + }); }); test('filter tables', () async {