diff --git a/CHANGELOG.md b/CHANGELOG.md index 611bdf693e..4818fc6767 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ - Call options.log for structured logs ([#3187](https://github.com/getsentry/sentry-dart/pull/3187)) - Remove async usage from `FlutterErrorIntegration` ([#3202](https://github.com/getsentry/sentry-dart/pull/3202)) - Tag all spans during app start with start type info ([#3190](https://github.com/getsentry/sentry-dart/pull/3190)) +- Improve `SentryLogBatcher` flush logic ([#3211](https://github.com/getsentry/sentry-dart/pull/3187)) ### Dependencies diff --git a/packages/dart/lib/src/sentry_envelope.dart b/packages/dart/lib/src/sentry_envelope.dart index 83cccb14b7..23cef605a3 100644 --- a/packages/dart/lib/src/sentry_envelope.dart +++ b/packages/dart/lib/src/sentry_envelope.dart @@ -1,4 +1,5 @@ import 'dart:convert'; +import 'dart:typed_data'; import 'client_reports/client_report.dart'; import 'protocol.dart'; @@ -9,6 +10,7 @@ import 'sentry_item_type.dart'; import 'sentry_options.dart'; import 'sentry_trace_context_header.dart'; import 'utils.dart'; +import 'package:meta/meta.dart'; /// Class representation of `Envelope` file. class SentryEnvelope { @@ -96,6 +98,37 @@ class SentryEnvelope { ); } + /// Create a [SentryEnvelope] containing raw log data payload. + /// This is used by the log batcher to send pre-encoded log batches. + @internal + factory SentryEnvelope.fromLogsData( + List> encodedLogs, + SdkVersion sdkVersion, + ) { + // Create the payload in the format expected by Sentry + // Format: {"items": [log1, log2, ...]} + final builder = BytesBuilder(copy: false); + builder.add(utf8.encode('{"items":[')); + for (int i = 0; i < encodedLogs.length; i++) { + if (i > 0) { + builder.add(utf8.encode(',')); + } + builder.add(encodedLogs[i]); + } + builder.add(utf8.encode(']}')); + + return SentryEnvelope( + SentryEnvelopeHeader( + null, + sdkVersion, + ), + [ + SentryEnvelopeItem.fromLogsData( + builder.takeBytes(), encodedLogs.length), + ], + ); + } + /// Stream binary data representation of `Envelope` file encoded. Stream> envelopeStream(SentryOptions options) async* { yield utf8JsonEncoder.convert(header.toJson()); diff --git a/packages/dart/lib/src/sentry_envelope_item.dart b/packages/dart/lib/src/sentry_envelope_item.dart index 06261380db..f626d97882 100644 --- a/packages/dart/lib/src/sentry_envelope_item.dart +++ b/packages/dart/lib/src/sentry_envelope_item.dart @@ -6,6 +6,7 @@ import 'sentry_attachment/sentry_attachment.dart'; import 'sentry_envelope_item_header.dart'; import 'sentry_item_type.dart'; import 'utils.dart'; +import 'package:meta/meta.dart'; /// Item holding header information and JSON encoded data. class SentryEnvelopeItem { @@ -78,6 +79,21 @@ class SentryEnvelopeItem { ); } + /// Create a [SentryEnvelopeItem] which holds pre-encoded log data. + /// This is used by the log batcher to send pre-encoded log batches. + @internal + factory SentryEnvelopeItem.fromLogsData(List payload, int logsCount) { + return SentryEnvelopeItem( + SentryEnvelopeItemHeader( + SentryItemType.log, + itemCount: logsCount, + contentType: 'application/vnd.sentry.items.log+json', + ), + () => payload, + originalObject: null, + ); + } + /// Header with info about type and length of data in bytes. final SentryEnvelopeItemHeader header; diff --git a/packages/dart/lib/src/sentry_log_batcher.dart b/packages/dart/lib/src/sentry_log_batcher.dart index d5f023f979..8fa4566a11 100644 --- a/packages/dart/lib/src/sentry_log_batcher.dart +++ b/packages/dart/lib/src/sentry_log_batcher.dart @@ -1,52 +1,97 @@ import 'dart:async'; -import 'sentry_envelope.dart'; import 'sentry_options.dart'; import 'protocol/sentry_log.dart'; +import 'protocol/sentry_level.dart'; +import 'sentry_envelope.dart'; +import 'utils.dart'; import 'package:meta/meta.dart'; @internal class SentryLogBatcher { - SentryLogBatcher(this._options, {Duration? flushTimeout, int? maxBufferSize}) - : _flushTimeout = flushTimeout ?? Duration(seconds: 5), - _maxBufferSize = maxBufferSize ?? 100; + SentryLogBatcher( + this._options, { + Duration? flushTimeout, + int? maxBufferSizeBytes, + }) : _flushTimeout = flushTimeout ?? Duration(seconds: 5), + _maxBufferSizeBytes = maxBufferSizeBytes ?? + 1024 * 1024; // 1MB default per BatchProcessor spec final SentryOptions _options; final Duration _flushTimeout; - final int _maxBufferSize; + final int _maxBufferSizeBytes; - final _logBuffer = []; + // Store encoded log data instead of raw logs to avoid re-serialization + final List> _encodedLogs = []; + int _encodedLogsSize = 0; Timer? _flushTimer; + /// Adds a log to the buffer. void addLog(SentryLog log) { - _logBuffer.add(log); + try { + final encodedLog = utf8JsonEncoder.convert(log.toJson()); - _flushTimer?.cancel(); + _encodedLogs.add(encodedLog); + _encodedLogsSize += encodedLog.length; - if (_logBuffer.length >= _maxBufferSize) { - return flush(); - } else { - _flushTimer = Timer(_flushTimeout, flush); + // Flush if size threshold is reached + if (_encodedLogsSize >= _maxBufferSizeBytes) { + // Buffer size exceeded, flush immediately + _performFlushLogs(); + } else if (_flushTimer == null) { + // Start timeout only when first item is added + _startTimer(); + } + // Note: We don't restart the timer on subsequent additions per spec + } catch (error) { + _options.log( + SentryLevel.error, + 'Failed to encode log: $error', + ); } } + /// Flushes the buffer immediately, sending all buffered logs. void flush() { + _performFlushLogs(); + } + + void _startTimer() { + _flushTimer = Timer(_flushTimeout, () { + _options.log( + SentryLevel.debug, + 'SentryLogBatcher: Timer fired, calling performCaptureLogs().', + ); + _performFlushLogs(); + }); + } + + void _performFlushLogs() { + // Reset timer state first _flushTimer?.cancel(); _flushTimer = null; - final logs = List.from(_logBuffer); - _logBuffer.clear(); + // Reset buffer on function exit + final logsToSend = List>.from(_encodedLogs); + _encodedLogs.clear(); + _encodedLogsSize = 0; - if (logs.isEmpty) { + if (logsToSend.isEmpty) { + _options.log( + SentryLevel.debug, + 'SentryLogBatcher: No logs to flush.', + ); return; } - final envelope = SentryEnvelope.fromLogs( - logs, - _options.sdk, - ); - - // TODO: Make sure the Android SDK understands the log envelope type. - _options.transport.send(envelope); + try { + final envelope = SentryEnvelope.fromLogsData(logsToSend, _options.sdk); + _options.transport.send(envelope); + } catch (error) { + _options.log( + SentryLevel.error, + 'Failed to send batched logs: $error', + ); + } } } diff --git a/packages/dart/test/sentry_envelope_item_test.dart b/packages/dart/test/sentry_envelope_item_test.dart index 741773a66a..a8b2bd2743 100644 --- a/packages/dart/test/sentry_envelope_item_test.dart +++ b/packages/dart/test/sentry_envelope_item_test.dart @@ -130,7 +130,32 @@ void main() { expect(sut.header.contentType, 'application/vnd.sentry.items.log+json'); expect(sut.header.type, SentryItemType.log); + expect(sut.header.itemCount, 2); expect(actualData, expectedData); }); + + test('fromLogsData', () async { + final payload = + utf8.encode('{"items":[{"test":"data1"},{"test":"data2"}]'); + final logsCount = 2; + + final sut = SentryEnvelopeItem.fromLogsData(payload, logsCount); + + expect(sut.header.contentType, 'application/vnd.sentry.items.log+json'); + expect(sut.header.type, SentryItemType.log); + expect(sut.header.itemCount, logsCount); + + final actualData = await sut.dataFactory(); + expect(actualData, payload); + }); + + test('fromLogsData null original object', () async { + final payload = utf8.encode('{"items":[{"test":"data"}]}'); + final logsCount = 1; + + final sut = SentryEnvelopeItem.fromLogsData(payload, logsCount); + + expect(sut.originalObject, null); + }); }); } diff --git a/packages/dart/test/sentry_envelope_test.dart b/packages/dart/test/sentry_envelope_test.dart index 9a21339aaa..eaa1af75c4 100644 --- a/packages/dart/test/sentry_envelope_test.dart +++ b/packages/dart/test/sentry_envelope_test.dart @@ -173,6 +173,42 @@ void main() { expect(actualItemData, expectedItemData); }); + test('fromLogsData', () async { + final encodedLogs = [ + utf8.encode( + '{"timestamp":"2023-01-01T00:00:00.000Z","level":"info","body":"test1","attributes":{}}'), + utf8.encode( + '{"timestamp":"2023-01-01T00:00:01.000Z","level":"info","body":"test2","attributes":{}}'), + ]; + + final sdkVersion = + SdkVersion(name: 'fixture-name', version: 'fixture-version'); + final sut = SentryEnvelope.fromLogsData(encodedLogs, sdkVersion); + + expect(sut.header.eventId, null); + expect(sut.header.sdkVersion, sdkVersion); + expect(sut.items.length, 1); + + final expectedEnvelopeItem = SentryEnvelopeItem.fromLogsData( + // The envelope should create the final payload with {"items": [...]} wrapper + utf8.encode('{"items":[') + + encodedLogs[0] + + utf8.encode(',') + + encodedLogs[1] + + utf8.encode(']}'), + 2, // logsCount + ); + + expect(sut.items[0].header.contentType, + expectedEnvelopeItem.header.contentType); + expect(sut.items[0].header.type, expectedEnvelopeItem.header.type); + expect(sut.items[0].header.itemCount, 2); + + final actualItem = await sut.items[0].dataFactory(); + final expectedItem = await expectedEnvelopeItem.dataFactory(); + expect(actualItem, expectedItem); + }); + test('max attachment size', () async { final attachment = SentryAttachment.fromLoader( loader: () => Uint8List.fromList([1, 2, 3, 4]), diff --git a/packages/dart/test/sentry_log_batcher_test.dart b/packages/dart/test/sentry_log_batcher_test.dart index 5e9f7cf5f3..dda0eabaa4 100644 --- a/packages/dart/test/sentry_log_batcher_test.dart +++ b/packages/dart/test/sentry_log_batcher_test.dart @@ -1,8 +1,8 @@ -import 'package:test/test.dart'; +import 'dart:async'; + +import 'package:sentry/sentry.dart'; import 'package:sentry/src/sentry_log_batcher.dart'; -import 'package:sentry/src/sentry_options.dart'; -import 'package:sentry/src/protocol/sentry_log.dart'; -import 'package:sentry/src/protocol/sentry_log_level.dart'; +import 'package:test/test.dart'; import 'mocks/mock_transport.dart'; @@ -48,8 +48,10 @@ void main() { expect(envelopePayloadJson['items'].last['body'], log2.body); }); - test('max logs are flushed without timeout', () async { - final batcher = fixture.getSut(maxBufferSize: 10); + test('logs exeeding max size are flushed without timeout', () async { + // Use a buffer size that can hold multiple logs before triggering flush + // Each log is ~153 bytes, so 300 bytes can hold 1 log, triggering flush on 2nd + final batcher = fixture.getSut(maxBufferSizeBytes: 300); final log = SentryLog( timestamp: DateTime.now(), @@ -58,9 +60,12 @@ void main() { attributes: {}, ); - for (var i = 0; i < 10; i++) { - batcher.addLog(log); - } + // Add first log - should fit in buffer + batcher.addLog(log); + expect(fixture.mockTransport.envelopes.length, 0); + + // Add second log - should exceed buffer and trigger flush + batcher.addLog(log); // Just wait a little bit, as we call capture without awaiting internally. await Future.delayed(Duration(milliseconds: 1)); @@ -69,15 +74,11 @@ void main() { final envelopePayloadJson = (fixture.mockTransport).logs.first; expect(envelopePayloadJson, isNotNull); - expect(envelopePayloadJson['items'].length, 10); + expect(envelopePayloadJson['items'].length, 2); }); - test('more than max logs are flushed eventuelly', () async { - final flushTimeout = Duration(milliseconds: 100); - final batcher = fixture.getSut( - maxBufferSize: 10, - flushTimeout: flushTimeout, - ); + test('calling flush directly flushes logs', () async { + final batcher = fixture.getSut(); final log = SentryLog( timestamp: DateTime.now(), @@ -86,27 +87,24 @@ void main() { attributes: {}, ); - for (var i = 0; i < 15; i++) { - batcher.addLog(log); - } - - await Future.delayed(flushTimeout); - - expect(fixture.mockTransport.envelopes.length, 2); - - final firstEnvelopePayloadJson = (fixture.mockTransport).logs.first; + batcher.addLog(log); + batcher.addLog(log); + batcher.flush(); - expect(firstEnvelopePayloadJson, isNotNull); - expect(firstEnvelopePayloadJson['items'].length, 10); + // Just wait a little bit, as we call capture without awaiting internally. + await Future.delayed(Duration(milliseconds: 1)); - final secondEnvelopePayloadJson = (fixture.mockTransport).logs.last; + expect(fixture.mockTransport.envelopes.length, 1); + final envelopePayloadJson = (fixture.mockTransport).logs.first; - expect(secondEnvelopePayloadJson, isNotNull); - expect(secondEnvelopePayloadJson['items'].length, 5); + expect(envelopePayloadJson, isNotNull); + expect(envelopePayloadJson['items'].length, 2); }); - test('calling flush directly flushes logs', () async { - final batcher = fixture.getSut(); + test('timeout is only started once and not restarted on subsequent additions', + () async { + final flushTimeout = Duration(milliseconds: 100); + final batcher = fixture.getSut(flushTimeout: flushTimeout); final log = SentryLog( timestamp: DateTime.now(), @@ -115,17 +113,20 @@ void main() { attributes: {}, ); + // Add first log - should start timer batcher.addLog(log); + expect(fixture.mockTransport.envelopes.length, 0); + + // Add second log immediately - should NOT restart timer batcher.addLog(log); - batcher.flush(); + expect(fixture.mockTransport.envelopes.length, 0); - // Just wait a little bit, as we call capture without awaiting internally. - await Future.delayed(Duration(milliseconds: 1)); + // Wait for timeout to fire + await Future.delayed(flushTimeout + Duration(milliseconds: 10)); + // Should have sent both logs after timeout expect(fixture.mockTransport.envelopes.length, 1); final envelopePayloadJson = (fixture.mockTransport).logs.first; - - expect(envelopePayloadJson, isNotNull); expect(envelopePayloadJson['items'].length, 2); }); } @@ -138,11 +139,11 @@ class Fixture { options.transport = mockTransport; } - SentryLogBatcher getSut({Duration? flushTimeout, int? maxBufferSize}) { + SentryLogBatcher getSut({Duration? flushTimeout, int? maxBufferSizeBytes}) { return SentryLogBatcher( options, flushTimeout: flushTimeout, - maxBufferSize: maxBufferSize, + maxBufferSizeBytes: maxBufferSizeBytes, ); } }