From 42d6616ff8b41becd46f0f356e052589408eef1f Mon Sep 17 00:00:00 2001 From: Denis Andrasec Date: Wed, 23 Oct 2024 11:42:19 +0200 Subject: [PATCH 1/4] Handle backpressure earlier in pipeline --- dart/lib/src/sentry.dart | 62 ++++++++++++++++++-------- dart/lib/src/sentry_client.dart | 10 +---- dart/lib/src/transport/task_queue.dart | 25 +++++++++-- 3 files changed, 66 insertions(+), 31 deletions(-) diff --git a/dart/lib/src/sentry.dart b/dart/lib/src/sentry.dart index 9ff45afcb4..4286f818f2 100644 --- a/dart/lib/src/sentry.dart +++ b/dart/lib/src/sentry.dart @@ -24,6 +24,7 @@ import 'sentry_options.dart'; import 'sentry_user_feedback.dart'; import 'tracing.dart'; import 'sentry_attachment/sentry_attachment.dart'; +import 'transport/task_queue.dart'; /// Configuration options callback typedef OptionsConfiguration = FutureOr Function(SentryOptions); @@ -34,6 +35,7 @@ typedef AppRunner = FutureOr Function(); /// Sentry SDK main entry point class Sentry { static Hub _hub = NoOpHub(); + static TaskQueue _taskQueue = NoOpTaskQueue(); Sentry._(); @@ -56,6 +58,10 @@ class Sentry { if (config is Future) { await config; } + _taskQueue = DefaultTaskQueue( + sentryOptions.maxQueueSize, + sentryOptions.logger, + ); } catch (exception, stackTrace) { sentryOptions.logger( SentryLevel.error, @@ -181,11 +187,15 @@ class Sentry { Hint? hint, ScopeCallback? withScope, }) => - _hub.captureEvent( - event, - stackTrace: stackTrace, - hint: hint, - withScope: withScope, + _taskQueue.enqueue( + () => _hub.captureEvent( + event, + stackTrace: stackTrace, + hint: hint, + withScope: withScope, + ), + SentryId.empty(), + 'captureEvent', ); /// Reports the [throwable] and optionally its [stackTrace] to Sentry.io. @@ -195,11 +205,15 @@ class Sentry { Hint? hint, ScopeCallback? withScope, }) => - _hub.captureException( - throwable, - stackTrace: stackTrace, - hint: hint, - withScope: withScope, + _taskQueue.enqueue( + () => _hub.captureException( + throwable, + stackTrace: stackTrace, + hint: hint, + withScope: withScope, + ), + SentryId.empty(), + 'captureException', ); /// Reports a [message] to Sentry.io. @@ -211,13 +225,17 @@ class Sentry { Hint? hint, ScopeCallback? withScope, }) => - _hub.captureMessage( - message, - level: level, - template: template, - params: params, - hint: hint, - withScope: withScope, + _taskQueue.enqueue( + () => _hub.captureMessage( + message, + level: level, + template: template, + params: params, + hint: hint, + withScope: withScope, + ), + SentryId.empty(), + 'captureMessage', ); /// Reports a [userFeedback] to Sentry.io. @@ -236,7 +254,15 @@ class Sentry { Hint? hint, ScopeCallback? withScope, }) => - _hub.captureFeedback(feedback, hint: hint, withScope: withScope); + _taskQueue.enqueue( + () => _hub.captureFeedback( + feedback, + hint: hint, + withScope: withScope, + ), + SentryId.empty(), + 'captureFeedback', + ); /// Close the client SDK static Future close() async { diff --git a/dart/lib/src/sentry_client.dart b/dart/lib/src/sentry_client.dart index 43e5e0be9d..0ff4f49af7 100644 --- a/dart/lib/src/sentry_client.dart +++ b/dart/lib/src/sentry_client.dart @@ -25,7 +25,6 @@ import 'transport/http_transport.dart'; import 'transport/noop_transport.dart'; import 'transport/rate_limiter.dart'; import 'transport/spotlight_http_transport.dart'; -import 'transport/task_queue.dart'; import 'utils/isolate_utils.dart'; import 'utils/regex_utils.dart'; import 'utils/stacktrace_utils.dart'; @@ -39,10 +38,6 @@ const _defaultIpAddress = '{{auto}}'; /// Logs crash reports and events to the Sentry.io service. class SentryClient { final SentryOptions _options; - late final _taskQueue = TaskQueue( - _options.maxQueueSize, - _options.logger, - ); final Random? _random; @@ -630,9 +625,6 @@ class SentryClient { Future _attachClientReportsAndSend(SentryEnvelope envelope) { final clientReport = _options.recorder.flush(); envelope.addClientReport(clientReport); - return _taskQueue.enqueue( - () => _options.transport.send(envelope), - SentryId.empty(), - ); + return _options.transport.send(envelope); } } diff --git a/dart/lib/src/transport/task_queue.dart b/dart/lib/src/transport/task_queue.dart index 386a4980f2..bbd9d2f4c8 100644 --- a/dart/lib/src/transport/task_queue.dart +++ b/dart/lib/src/transport/task_queue.dart @@ -1,21 +1,30 @@ import 'dart:async'; +import 'package:meta/meta.dart'; + import '../../sentry.dart'; typedef Task = Future Function(); -class TaskQueue { - TaskQueue(this._maxQueueSize, this._logger); +@internal +abstract class TaskQueue { + Future enqueue(Task task, T fallbackResult, String warning); +} + +@internal +class DefaultTaskQueue implements TaskQueue { + DefaultTaskQueue(this._maxQueueSize, this._logger); final int _maxQueueSize; final SentryLogger _logger; int _queueCount = 0; - Future enqueue(Task task, T fallbackResult) async { + @override + Future enqueue(Task task, T fallbackResult, String taskName) async { if (_queueCount >= _maxQueueSize) { _logger(SentryLevel.warning, - 'Task dropped due to backpressure. Avoid capturing in a tight loop.'); + '$taskName dropped due to backpressure. Avoid capturing in a tight loop.'); return fallbackResult; } else { _queueCount++; @@ -27,3 +36,11 @@ class TaskQueue { } } } + +@internal +class NoOpTaskQueue implements TaskQueue { + @override + Future enqueue(Task task, T fallbackResult, String warning) { + return task(); + } +} From d40ffa730e21949c36e5b04496ff6c3b577fb28d Mon Sep 17 00:00:00 2001 From: Denis Andrasec Date: Wed, 23 Oct 2024 11:52:13 +0200 Subject: [PATCH 2/4] add changelog entry --- CHANGELOG.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 73e86ff0f4..505314c6f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,10 @@ ### Enhancements - Cache parsed DSN ([#2365](https://github.com/getsentry/sentry-dart/pull/2365)) - +- Handle backpressure earlier in pipeline ([#2371](https://github.com/getsentry/sentry-dart/pull/2371)) + - Drops max un-awaited parallel tasks earlier, so event processors & callbacks are not executed for them. + - Change by setting `SentryOptions.maxQueueSize`. Default is 30. + ## 8.10.0-beta.2 ### Fixes From 05bc7c529ca78fcedfb4b2047505020d6990904f Mon Sep 17 00:00:00 2001 From: Denis Andrasec Date: Wed, 23 Oct 2024 13:23:35 +0200 Subject: [PATCH 3/4] fix task_queue tests --- dart/test/transport/tesk_queue_test.dart | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/dart/test/transport/tesk_queue_test.dart b/dart/test/transport/tesk_queue_test.dart index af22672d97..1a7b18c1f5 100644 --- a/dart/test/transport/tesk_queue_test.dart +++ b/dart/test/transport/tesk_queue_test.dart @@ -25,7 +25,7 @@ void main() { await Future.delayed(Duration(milliseconds: 1)); completedTasks += 1; return 1 + 1; - }, -1)); + }, -1, 'foo')); } // This will always await the other futures, even if they are running longer, as it was scheduled after them. @@ -48,7 +48,7 @@ void main() { print('Completed task $i'); completedTasks += 1; return 1 + 1; - }, -1)); + }, -1, 'foo')); } print('Started waiting for first 5 tasks'); @@ -62,7 +62,7 @@ void main() { print('Completed task $i'); completedTasks += 1; return 1 + 1; - }, -1)); + }, -1, 'foo')); } print('Started waiting for second 5 tasks'); @@ -83,7 +83,7 @@ void main() { await Future.delayed(Duration(milliseconds: 1)); completedTasks += 1; return 1 + 1; - }, -1); + }, -1, 'foo'); } expect(completedTasks, 10); }); @@ -98,7 +98,7 @@ void main() { await sut.enqueue(() async { completedTasks += 1; throw Error(); - }, -1); + }, -1, 'foo'); } catch (_) { // Ignore } @@ -112,6 +112,6 @@ class Fixture { final options = defaultTestOptions(); TaskQueue getSut({required int maxQueueSize}) { - return TaskQueue(maxQueueSize, options.logger); + return DefaultTaskQueue(maxQueueSize, options.logger); } } From e0e0f7b1f3742d71597dbf7a3b0f180c487c996c Mon Sep 17 00:00:00 2001 From: Denis Andrasec Date: Wed, 23 Oct 2024 14:12:49 +0200 Subject: [PATCH 4/4] record dropped events --- dart/lib/src/sentry.dart | 29 +++++++++-------- dart/lib/src/transport/task_queue.dart | 27 ++++++++++++---- dart/test/transport/tesk_queue_test.dart | 40 ++++++++++++++++++++---- 3 files changed, 71 insertions(+), 25 deletions(-) diff --git a/dart/lib/src/sentry.dart b/dart/lib/src/sentry.dart index 4286f818f2..cd89d2fd4a 100644 --- a/dart/lib/src/sentry.dart +++ b/dart/lib/src/sentry.dart @@ -24,6 +24,7 @@ import 'sentry_options.dart'; import 'sentry_user_feedback.dart'; import 'tracing.dart'; import 'sentry_attachment/sentry_attachment.dart'; +import 'transport/data_category.dart'; import 'transport/task_queue.dart'; /// Configuration options callback @@ -61,6 +62,7 @@ class Sentry { _taskQueue = DefaultTaskQueue( sentryOptions.maxQueueSize, sentryOptions.logger, + sentryOptions.recorder, ); } catch (exception, stackTrace) { sentryOptions.logger( @@ -188,15 +190,16 @@ class Sentry { ScopeCallback? withScope, }) => _taskQueue.enqueue( - () => _hub.captureEvent( - event, - stackTrace: stackTrace, - hint: hint, - withScope: withScope, - ), - SentryId.empty(), - 'captureEvent', - ); + () => _hub.captureEvent( + event, + stackTrace: stackTrace, + hint: hint, + withScope: withScope, + ), + SentryId.empty(), + event.type != null + ? DataCategory.fromItemType(event.type!) + : DataCategory.unknown); /// Reports the [throwable] and optionally its [stackTrace] to Sentry.io. static Future captureException( @@ -213,7 +216,7 @@ class Sentry { withScope: withScope, ), SentryId.empty(), - 'captureException', + DataCategory.error, ); /// Reports a [message] to Sentry.io. @@ -235,7 +238,7 @@ class Sentry { withScope: withScope, ), SentryId.empty(), - 'captureMessage', + DataCategory.unknown, ); /// Reports a [userFeedback] to Sentry.io. @@ -261,7 +264,7 @@ class Sentry { withScope: withScope, ), SentryId.empty(), - 'captureFeedback', + DataCategory.unknown, ); /// Close the client SDK @@ -277,7 +280,7 @@ class Sentry { /// Last event id recorded by the current Hub static SentryId get lastEventId => _hub.lastEventId; - /// Adds a breacrumb to the current Scope + /// Adds a breadcrumb to the current Scope static Future addBreadcrumb(Breadcrumb crumb, {Hint? hint}) => _hub.addBreadcrumb(crumb, hint: hint); diff --git a/dart/lib/src/transport/task_queue.dart b/dart/lib/src/transport/task_queue.dart index bbd9d2f4c8..34daa81241 100644 --- a/dart/lib/src/transport/task_queue.dart +++ b/dart/lib/src/transport/task_queue.dart @@ -3,28 +3,39 @@ import 'dart:async'; import 'package:meta/meta.dart'; import '../../sentry.dart'; +import '../client_reports/client_report_recorder.dart'; +import '../client_reports/discard_reason.dart'; +import 'data_category.dart'; typedef Task = Future Function(); @internal abstract class TaskQueue { - Future enqueue(Task task, T fallbackResult, String warning); + Future enqueue(Task task, T fallbackResult, DataCategory category); } @internal class DefaultTaskQueue implements TaskQueue { - DefaultTaskQueue(this._maxQueueSize, this._logger); + DefaultTaskQueue(this._maxQueueSize, this._logger, this._recorder); final int _maxQueueSize; final SentryLogger _logger; + final ClientReportRecorder _recorder; int _queueCount = 0; @override - Future enqueue(Task task, T fallbackResult, String taskName) async { + Future enqueue( + Task task, + T fallbackResult, + DataCategory category, + ) async { if (_queueCount >= _maxQueueSize) { - _logger(SentryLevel.warning, - '$taskName dropped due to backpressure. Avoid capturing in a tight loop.'); + _recorder.recordLostEvent(DiscardReason.queueOverflow, category); + _logger( + SentryLevel.warning, + 'Task dropped due to reaching max ($_maxQueueSize} parallel tasks.).', + ); return fallbackResult; } else { _queueCount++; @@ -40,7 +51,11 @@ class DefaultTaskQueue implements TaskQueue { @internal class NoOpTaskQueue implements TaskQueue { @override - Future enqueue(Task task, T fallbackResult, String warning) { + Future enqueue( + Task task, + T fallbackResult, + DataCategory category, + ) { return task(); } } diff --git a/dart/test/transport/tesk_queue_test.dart b/dart/test/transport/tesk_queue_test.dart index 1a7b18c1f5..e61299173b 100644 --- a/dart/test/transport/tesk_queue_test.dart +++ b/dart/test/transport/tesk_queue_test.dart @@ -1,8 +1,11 @@ import 'dart:async'; +import 'package:sentry/src/client_reports/discard_reason.dart'; +import 'package:sentry/src/transport/data_category.dart'; import 'package:sentry/src/transport/task_queue.dart'; import 'package:test/test.dart'; +import '../mocks/mock_client_report_recorder.dart'; import '../test_utils.dart'; void main() { @@ -25,7 +28,7 @@ void main() { await Future.delayed(Duration(milliseconds: 1)); completedTasks += 1; return 1 + 1; - }, -1, 'foo')); + }, -1, DataCategory.error)); } // This will always await the other futures, even if they are running longer, as it was scheduled after them. @@ -48,7 +51,7 @@ void main() { print('Completed task $i'); completedTasks += 1; return 1 + 1; - }, -1, 'foo')); + }, -1, DataCategory.error)); } print('Started waiting for first 5 tasks'); @@ -62,7 +65,7 @@ void main() { print('Completed task $i'); completedTasks += 1; return 1 + 1; - }, -1, 'foo')); + }, -1, DataCategory.error)); } print('Started waiting for second 5 tasks'); @@ -83,7 +86,7 @@ void main() { await Future.delayed(Duration(milliseconds: 1)); completedTasks += 1; return 1 + 1; - }, -1, 'foo'); + }, -1, DataCategory.error); } expect(completedTasks, 10); }); @@ -98,20 +101,45 @@ void main() { await sut.enqueue(() async { completedTasks += 1; throw Error(); - }, -1, 'foo'); + }, -1, DataCategory.error); } catch (_) { // Ignore } } expect(completedTasks, 10); }); + + test('recording dropped event when category set', () async { + final sut = fixture.getSut(maxQueueSize: 5); + + for (int i = 0; i < 10; i++) { + unawaited(sut.enqueue(() async { + print('Task $i'); + return 1 + 1; + }, -1, DataCategory.error)); + } + + // This will always await the other futures, even if they are running longer, as it was scheduled after them. + print('Started waiting for first 5 tasks'); + await Future.delayed(Duration(milliseconds: 1)); + print('Stopped waiting for first 5 tasks'); + + expect(fixture.clientReportRecorder.discardedEvents.length, 5); + for (final event in fixture.clientReportRecorder.discardedEvents) { + expect(event.reason, DiscardReason.queueOverflow); + expect(event.category, DataCategory.error); + expect(event.quantity, 1); + } + }); }); } class Fixture { final options = defaultTestOptions(); + late var clientReportRecorder = MockClientReportRecorder(); + TaskQueue getSut({required int maxQueueSize}) { - return DefaultTaskQueue(maxQueueSize, options.logger); + return DefaultTaskQueue(maxQueueSize, options.logger, clientReportRecorder); } }