From d95d8bfdb50c322a15456859ba563403a92399a8 Mon Sep 17 00:00:00 2001 From: Denis Andrasec Date: Tue, 6 Feb 2024 16:31:45 +0100 Subject: [PATCH 1/5] introduce task queue --- dart/lib/src/sentry_client.dart | 10 ++- dart/lib/src/sentry_options.dart | 9 ++ dart/lib/src/transport/task_queue.dart | 27 ++++++ dart/test/transport/tesk_queue_test.dart | 100 +++++++++++++++++++++++ 4 files changed, 145 insertions(+), 1 deletion(-) create mode 100644 dart/lib/src/transport/task_queue.dart create mode 100644 dart/test/transport/tesk_queue_test.dart diff --git a/dart/lib/src/sentry_client.dart b/dart/lib/src/sentry_client.dart index 709bda104f..53c0c19715 100644 --- a/dart/lib/src/sentry_client.dart +++ b/dart/lib/src/sentry_client.dart @@ -17,6 +17,7 @@ import 'sentry_stack_trace_factory.dart'; import 'transport/http_transport.dart'; import 'transport/noop_transport.dart'; import 'transport/spotlight_http_transport.dart'; +import 'transport/task_queue.dart'; import 'utils/isolate_utils.dart'; import 'version.dart'; import 'sentry_envelope.dart'; @@ -32,6 +33,10 @@ const _defaultIpAddress = '{{auto}}'; /// Logs crash reports and events to the Sentry.io service. class SentryClient { final SentryOptions _options; + late final _taskQueue = TaskQueue( + _options.maxQueueSize, + _options.logger, + ); final Random? _random; @@ -514,6 +519,9 @@ class SentryClient { Future _attachClientReportsAndSend(SentryEnvelope envelope) { final clientReport = _options.recorder.flush(); envelope.addClientReport(clientReport); - return _options.transport.send(envelope); + return _taskQueue.enqueue( + () => _options.transport.send(envelope), + SentryId.empty(), + ); } } diff --git a/dart/lib/src/sentry_options.dart b/dart/lib/src/sentry_options.dart index 918a5d5758..1f969fdf14 100644 --- a/dart/lib/src/sentry_options.dart +++ b/dart/lib/src/sentry_options.dart @@ -81,6 +81,15 @@ class SentryOptions { _maxSpans = maxSpans; } + int _maxQueueSize = 30; + + int get maxQueueSize => _maxQueueSize; + + set maxQueueSize(int count) { + assert(count > 0); + _maxQueueSize = count; + } + /// Configures up to which size request bodies should be included in events. /// This does not change whether an event is captured. MaxRequestBodySize maxRequestBodySize = MaxRequestBodySize.never; diff --git a/dart/lib/src/transport/task_queue.dart b/dart/lib/src/transport/task_queue.dart new file mode 100644 index 0000000000..70fa68c5cc --- /dev/null +++ b/dart/lib/src/transport/task_queue.dart @@ -0,0 +1,27 @@ +import 'dart:async'; + +import '../../sentry.dart'; + +typedef Task = Future Function(); + +class TaskQueue { + TaskQueue(this._maxQueueSize, this._logger); + + final int _maxQueueSize; + final SentryLogger _logger; + + int _queueCount = 0; + + Future enqueue(Task task, T fallbackResult) async { + if (_queueCount >= _maxQueueSize) { + _logger(SentryLevel.warning, + 'Task dropped due to backpressure. Avoid capturing in a tight loop.'); + return fallbackResult; + } else { + _queueCount++; + final result = await task(); + _queueCount--; + return result; + } + } +} diff --git a/dart/test/transport/tesk_queue_test.dart b/dart/test/transport/tesk_queue_test.dart new file mode 100644 index 0000000000..178ccc6e6f --- /dev/null +++ b/dart/test/transport/tesk_queue_test.dart @@ -0,0 +1,100 @@ +import 'dart:async'; + +import 'package:sentry/sentry.dart'; +import 'package:sentry/src/transport/task_queue.dart'; +import 'package:test/test.dart'; + +import '../mocks.dart'; + +void main() { + group("called sync", () { + late Fixture fixture; + + setUp(() { + fixture = Fixture(); + }); + + test("enqueue only executed `maxQueueSize` times when not awaiting", + () async { + final sut = fixture.getSut(5); + + var completedTasks = 0; + + for (int i = 0; i < 10; i++) { + sut.enqueue(() async { + print('Task $i'); + await Future.delayed(Duration(milliseconds: 1)); + completedTasks += 1; + return 1 + 1; + }, -1); + } + + // This will always await the other futures, even if they are running longer, as it was scheduled after them. + print('Started waiting for first 5 tasks'); + await Future.delayed(Duration(milliseconds: 1)); + print('Stopped waiting for first 5 tasks'); + + expect(completedTasks, 5); + }); + + test("enqueue picks up tasks again after await in-between", () async { + final sut = fixture.getSut(5); + + var completedTasks = 0; + + for (int i = 1; i <= 10; i++) { + sut.enqueue(() async { + print('Started task $i'); + await Future.delayed(Duration(milliseconds: 1)); + print('Completed task $i'); + completedTasks += 1; + return 1 + 1; + }, -1); + } + + print('Started waiting for first 5 tasks'); + await Future.delayed(Duration(milliseconds: 1)); + print('Stopped waiting for first 5 tasks'); + + for (int i = 6; i <= 15; i++) { + sut.enqueue(() async { + print('Started task $i'); + await Future.delayed(Duration(milliseconds: 1)); + print('Completed task $i'); + completedTasks += 1; + return 1 + 1; + }, -1); + } + + print('Started waiting for second 5 tasks'); + await Future.delayed(Duration(milliseconds: 5)); + print('Stopped waiting for second 5 tasks'); + + expect(completedTasks, 10); // 10 were dropped + }); + + test("enqueue executes all tasks when awaiting", () async { + final sut = fixture.getSut(5); + + var completedTasks = 0; + + for (int i = 0; i < 10; i++) { + await sut.enqueue(() async { + print('Task $i'); + await Future.delayed(Duration(milliseconds: 1)); + completedTasks += 1; + return 1 + 1; + }, -1); + } + expect(completedTasks, 10); + }); + }); +} + +class Fixture { + final options = SentryOptions(dsn: fakeDsn); + + TaskQueue getSut(int maxQueueSize) { + return TaskQueue(maxQueueSize, options.logger); + } +} From c96c3ce8df66abf1a59b8a84103aafbe16723c9a Mon Sep 17 00:00:00 2001 From: Denis Andrasec Date: Mon, 4 Mar 2024 13:07:19 +0100 Subject: [PATCH 2/5] handle trow in task --- dart/lib/src/transport/task_queue.dart | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/dart/lib/src/transport/task_queue.dart b/dart/lib/src/transport/task_queue.dart index 70fa68c5cc..386a4980f2 100644 --- a/dart/lib/src/transport/task_queue.dart +++ b/dart/lib/src/transport/task_queue.dart @@ -19,9 +19,11 @@ class TaskQueue { return fallbackResult; } else { _queueCount++; - final result = await task(); - _queueCount--; - return result; + try { + return await task(); + } finally { + _queueCount--; + } } } } From 5e94958a858cbb16a8b001b1589d28cf09af571c Mon Sep 17 00:00:00 2001 From: Denis Andrasec Date: Mon, 4 Mar 2024 13:26:42 +0100 Subject: [PATCH 3/5] handle throwing tasks --- dart/test/transport/tesk_queue_test.dart | 38 +++++++++++++++++------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/dart/test/transport/tesk_queue_test.dart b/dart/test/transport/tesk_queue_test.dart index 178ccc6e6f..80dc97161c 100644 --- a/dart/test/transport/tesk_queue_test.dart +++ b/dart/test/transport/tesk_queue_test.dart @@ -16,17 +16,17 @@ void main() { test("enqueue only executed `maxQueueSize` times when not awaiting", () async { - final sut = fixture.getSut(5); + final sut = fixture.getSut(maxQueueSize: 5); var completedTasks = 0; for (int i = 0; i < 10; i++) { - sut.enqueue(() async { + unawaited(sut.enqueue(() async { print('Task $i'); await Future.delayed(Duration(milliseconds: 1)); completedTasks += 1; return 1 + 1; - }, -1); + }, -1)); } // This will always await the other futures, even if they are running longer, as it was scheduled after them. @@ -38,18 +38,18 @@ void main() { }); test("enqueue picks up tasks again after await in-between", () async { - final sut = fixture.getSut(5); + final sut = fixture.getSut(maxQueueSize: 5); var completedTasks = 0; for (int i = 1; i <= 10; i++) { - sut.enqueue(() async { + unawaited(sut.enqueue(() async { print('Started task $i'); await Future.delayed(Duration(milliseconds: 1)); print('Completed task $i'); completedTasks += 1; return 1 + 1; - }, -1); + }, -1)); } print('Started waiting for first 5 tasks'); @@ -57,13 +57,13 @@ void main() { print('Stopped waiting for first 5 tasks'); for (int i = 6; i <= 15; i++) { - sut.enqueue(() async { + unawaited(sut.enqueue(() async { print('Started task $i'); await Future.delayed(Duration(milliseconds: 1)); print('Completed task $i'); completedTasks += 1; return 1 + 1; - }, -1); + }, -1)); } print('Started waiting for second 5 tasks'); @@ -74,7 +74,7 @@ void main() { }); test("enqueue executes all tasks when awaiting", () async { - final sut = fixture.getSut(5); + final sut = fixture.getSut(maxQueueSize: 5); var completedTasks = 0; @@ -88,13 +88,31 @@ void main() { } expect(completedTasks, 10); }); + + test("throwing tasks still execute as expected", () async { + final sut = fixture.getSut(maxQueueSize: 5); + + var completedTasks = 0; + + for (int i = 0; i < 10; i++) { + try { + await sut.enqueue(() async { + completedTasks += 1; + throw Error(); + }, -1); + } catch (_) { + // Ignore + } + } + expect(completedTasks, 10); + }); }); } class Fixture { final options = SentryOptions(dsn: fakeDsn); - TaskQueue getSut(int maxQueueSize) { + TaskQueue getSut({required int maxQueueSize}) { return TaskQueue(maxQueueSize, options.logger); } } From 6b0edd64aacdb068135b29c683e4cc0c6b1841fc Mon Sep 17 00:00:00 2001 From: Denis Andrasec Date: Mon, 4 Mar 2024 13:40:47 +0100 Subject: [PATCH 4/5] Add documentation --- dart/lib/src/sentry_options.dart | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/dart/lib/src/sentry_options.dart b/dart/lib/src/sentry_options.dart index 651bf97188..1de3a49d0f 100644 --- a/dart/lib/src/sentry_options.dart +++ b/dart/lib/src/sentry_options.dart @@ -83,8 +83,13 @@ class SentryOptions { int _maxQueueSize = 30; + /// Returns the max number of events Sentry will send when calling capture + /// methods in a tight loop. Default is 30. int get maxQueueSize => _maxQueueSize; + /// Sets how many unawaited events can be sent by Sentry. (e.g. capturing + /// events in a tight loop) at once. If you need to send more, please use the + /// await keyword. set maxQueueSize(int count) { assert(count > 0); _maxQueueSize = count; From 6c70f4dfbf4c999ab06be52624591487c70c035c Mon Sep 17 00:00:00 2001 From: Denis Andrasec Date: Mon, 4 Mar 2024 13:43:49 +0100 Subject: [PATCH 5/5] add changelog entry --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8377034385..1ef90672d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Features - Use `recordHttpBreadcrumbs` to set iOS `enableNetworkBreadcrumbs` ([#1884](https://github.com/getsentry/sentry-dart/pull/1884)) +- Add `maxQueueSize` to limit the number of unawaited events sent to Sentry ([#1868]((https://github.com/getsentry/sentry-dart/pull/1868)) ## 7.16.1