diff --git a/CHANGELOG.md b/CHANGELOG.md index a21020bdcc..9260793a20 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,7 +27,10 @@ ### Enhancements - Cache parsed DSN ([#2365](https://github.com/getsentry/sentry-dart/pull/2365)) - +- Handle backpressure earlier in pipeline ([#2371](https://github.com/getsentry/sentry-dart/pull/2371)) + - Drops max un-awaited parallel tasks earlier, so event processors & callbacks are not executed for them. + - Change by setting `SentryOptions.maxQueueSize`. Default is 30. + ## 8.10.0-beta.2 ### Fixes diff --git a/dart/lib/src/sentry.dart b/dart/lib/src/sentry.dart index 9ff45afcb4..cd89d2fd4a 100644 --- a/dart/lib/src/sentry.dart +++ b/dart/lib/src/sentry.dart @@ -24,6 +24,8 @@ import 'sentry_options.dart'; import 'sentry_user_feedback.dart'; import 'tracing.dart'; import 'sentry_attachment/sentry_attachment.dart'; +import 'transport/data_category.dart'; +import 'transport/task_queue.dart'; /// Configuration options callback typedef OptionsConfiguration = FutureOr Function(SentryOptions); @@ -34,6 +36,7 @@ typedef AppRunner = FutureOr Function(); /// Sentry SDK main entry point class Sentry { static Hub _hub = NoOpHub(); + static TaskQueue _taskQueue = NoOpTaskQueue(); Sentry._(); @@ -56,6 +59,11 @@ class Sentry { if (config is Future) { await config; } + _taskQueue = DefaultTaskQueue( + sentryOptions.maxQueueSize, + sentryOptions.logger, + sentryOptions.recorder, + ); } catch (exception, stackTrace) { sentryOptions.logger( SentryLevel.error, @@ -181,12 +189,17 @@ class Sentry { Hint? hint, ScopeCallback? withScope, }) => - _hub.captureEvent( - event, - stackTrace: stackTrace, - hint: hint, - withScope: withScope, - ); + _taskQueue.enqueue( + () => _hub.captureEvent( + event, + stackTrace: stackTrace, + hint: hint, + withScope: withScope, + ), + SentryId.empty(), + event.type != null + ? DataCategory.fromItemType(event.type!) + : DataCategory.unknown); /// Reports the [throwable] and optionally its [stackTrace] to Sentry.io. static Future captureException( @@ -195,11 +208,15 @@ class Sentry { Hint? hint, ScopeCallback? withScope, }) => - _hub.captureException( - throwable, - stackTrace: stackTrace, - hint: hint, - withScope: withScope, + _taskQueue.enqueue( + () => _hub.captureException( + throwable, + stackTrace: stackTrace, + hint: hint, + withScope: withScope, + ), + SentryId.empty(), + DataCategory.error, ); /// Reports a [message] to Sentry.io. @@ -211,13 +228,17 @@ class Sentry { Hint? hint, ScopeCallback? withScope, }) => - _hub.captureMessage( - message, - level: level, - template: template, - params: params, - hint: hint, - withScope: withScope, + _taskQueue.enqueue( + () => _hub.captureMessage( + message, + level: level, + template: template, + params: params, + hint: hint, + withScope: withScope, + ), + SentryId.empty(), + DataCategory.unknown, ); /// Reports a [userFeedback] to Sentry.io. @@ -236,7 +257,15 @@ class Sentry { Hint? hint, ScopeCallback? withScope, }) => - _hub.captureFeedback(feedback, hint: hint, withScope: withScope); + _taskQueue.enqueue( + () => _hub.captureFeedback( + feedback, + hint: hint, + withScope: withScope, + ), + SentryId.empty(), + DataCategory.unknown, + ); /// Close the client SDK static Future close() async { @@ -251,7 +280,7 @@ class Sentry { /// Last event id recorded by the current Hub static SentryId get lastEventId => _hub.lastEventId; - /// Adds a breacrumb to the current Scope + /// Adds a breadcrumb to the current Scope static Future addBreadcrumb(Breadcrumb crumb, {Hint? hint}) => _hub.addBreadcrumb(crumb, hint: hint); diff --git a/dart/lib/src/sentry_client.dart b/dart/lib/src/sentry_client.dart index 43e5e0be9d..0ff4f49af7 100644 --- a/dart/lib/src/sentry_client.dart +++ b/dart/lib/src/sentry_client.dart @@ -25,7 +25,6 @@ import 'transport/http_transport.dart'; import 'transport/noop_transport.dart'; import 'transport/rate_limiter.dart'; import 'transport/spotlight_http_transport.dart'; -import 'transport/task_queue.dart'; import 'utils/isolate_utils.dart'; import 'utils/regex_utils.dart'; import 'utils/stacktrace_utils.dart'; @@ -39,10 +38,6 @@ const _defaultIpAddress = '{{auto}}'; /// Logs crash reports and events to the Sentry.io service. class SentryClient { final SentryOptions _options; - late final _taskQueue = TaskQueue( - _options.maxQueueSize, - _options.logger, - ); final Random? _random; @@ -630,9 +625,6 @@ class SentryClient { Future _attachClientReportsAndSend(SentryEnvelope envelope) { final clientReport = _options.recorder.flush(); envelope.addClientReport(clientReport); - return _taskQueue.enqueue( - () => _options.transport.send(envelope), - SentryId.empty(), - ); + return _options.transport.send(envelope); } } diff --git a/dart/lib/src/transport/task_queue.dart b/dart/lib/src/transport/task_queue.dart index 386a4980f2..34daa81241 100644 --- a/dart/lib/src/transport/task_queue.dart +++ b/dart/lib/src/transport/task_queue.dart @@ -1,21 +1,41 @@ import 'dart:async'; +import 'package:meta/meta.dart'; + import '../../sentry.dart'; +import '../client_reports/client_report_recorder.dart'; +import '../client_reports/discard_reason.dart'; +import 'data_category.dart'; typedef Task = Future Function(); -class TaskQueue { - TaskQueue(this._maxQueueSize, this._logger); +@internal +abstract class TaskQueue { + Future enqueue(Task task, T fallbackResult, DataCategory category); +} + +@internal +class DefaultTaskQueue implements TaskQueue { + DefaultTaskQueue(this._maxQueueSize, this._logger, this._recorder); final int _maxQueueSize; final SentryLogger _logger; + final ClientReportRecorder _recorder; int _queueCount = 0; - Future enqueue(Task task, T fallbackResult) async { + @override + Future enqueue( + Task task, + T fallbackResult, + DataCategory category, + ) async { if (_queueCount >= _maxQueueSize) { - _logger(SentryLevel.warning, - 'Task dropped due to backpressure. Avoid capturing in a tight loop.'); + _recorder.recordLostEvent(DiscardReason.queueOverflow, category); + _logger( + SentryLevel.warning, + 'Task dropped due to reaching max ($_maxQueueSize} parallel tasks.).', + ); return fallbackResult; } else { _queueCount++; @@ -27,3 +47,15 @@ class TaskQueue { } } } + +@internal +class NoOpTaskQueue implements TaskQueue { + @override + Future enqueue( + Task task, + T fallbackResult, + DataCategory category, + ) { + return task(); + } +} diff --git a/dart/test/transport/tesk_queue_test.dart b/dart/test/transport/tesk_queue_test.dart index af22672d97..e61299173b 100644 --- a/dart/test/transport/tesk_queue_test.dart +++ b/dart/test/transport/tesk_queue_test.dart @@ -1,8 +1,11 @@ import 'dart:async'; +import 'package:sentry/src/client_reports/discard_reason.dart'; +import 'package:sentry/src/transport/data_category.dart'; import 'package:sentry/src/transport/task_queue.dart'; import 'package:test/test.dart'; +import '../mocks/mock_client_report_recorder.dart'; import '../test_utils.dart'; void main() { @@ -25,7 +28,7 @@ void main() { await Future.delayed(Duration(milliseconds: 1)); completedTasks += 1; return 1 + 1; - }, -1)); + }, -1, DataCategory.error)); } // This will always await the other futures, even if they are running longer, as it was scheduled after them. @@ -48,7 +51,7 @@ void main() { print('Completed task $i'); completedTasks += 1; return 1 + 1; - }, -1)); + }, -1, DataCategory.error)); } print('Started waiting for first 5 tasks'); @@ -62,7 +65,7 @@ void main() { print('Completed task $i'); completedTasks += 1; return 1 + 1; - }, -1)); + }, -1, DataCategory.error)); } print('Started waiting for second 5 tasks'); @@ -83,7 +86,7 @@ void main() { await Future.delayed(Duration(milliseconds: 1)); completedTasks += 1; return 1 + 1; - }, -1); + }, -1, DataCategory.error); } expect(completedTasks, 10); }); @@ -98,20 +101,45 @@ void main() { await sut.enqueue(() async { completedTasks += 1; throw Error(); - }, -1); + }, -1, DataCategory.error); } catch (_) { // Ignore } } expect(completedTasks, 10); }); + + test('recording dropped event when category set', () async { + final sut = fixture.getSut(maxQueueSize: 5); + + for (int i = 0; i < 10; i++) { + unawaited(sut.enqueue(() async { + print('Task $i'); + return 1 + 1; + }, -1, DataCategory.error)); + } + + // This will always await the other futures, even if they are running longer, as it was scheduled after them. + print('Started waiting for first 5 tasks'); + await Future.delayed(Duration(milliseconds: 1)); + print('Stopped waiting for first 5 tasks'); + + expect(fixture.clientReportRecorder.discardedEvents.length, 5); + for (final event in fixture.clientReportRecorder.discardedEvents) { + expect(event.reason, DiscardReason.queueOverflow); + expect(event.category, DataCategory.error); + expect(event.quantity, 1); + } + }); }); } class Fixture { final options = defaultTestOptions(); + late var clientReportRecorder = MockClientReportRecorder(); + TaskQueue getSut({required int maxQueueSize}) { - return TaskQueue(maxQueueSize, options.logger); + return DefaultTaskQueue(maxQueueSize, options.logger, clientReportRecorder); } }