From 7ba8c7b9381c84d0754c579ee1de1740438783cd Mon Sep 17 00:00:00 2001 From: Francesco Novy Date: Thu, 9 Feb 2023 15:49:21 +0100 Subject: [PATCH] feat(replay): Keep min. 30 seconds of data in error mode --- packages/replay/src/constants.ts | 4 +- .../src/eventBuffer/EventBufferArray.ts | 38 +++-- .../EventBufferCompressionWorker.ts | 12 ++ ...EventBufferPartitionedCompressionWorker.ts | 46 ++++++ .../src/eventBuffer/EventBufferProxy.ts | 25 ++- .../src/eventBuffer/PartitionedQueue.ts | 49 ++++++ packages/replay/src/eventBuffer/index.ts | 5 +- packages/replay/src/replay.ts | 1 + packages/replay/src/types.ts | 8 +- packages/replay/src/util/addEvent.ts | 23 ++- .../test/integration/errorSampleRate.test.ts | 21 ++- .../unit/eventBuffer/EventBufferArray.test.ts | 60 ++++--- .../EventBufferCompressionWorker.test.ts | 13 ++ ...BufferPartitionedCompressionWorker.test.ts | 148 ++++++++++++++++++ .../unit/eventBuffer/EventBufferProxy.test.ts | 3 +- .../unit/eventBuffer/PartitionedQueue.test.ts | 58 +++++++ .../replay/test/unit/util/addEvent.test.ts | 100 +++++++++++- .../replay/test/utils/setupReplayContainer.ts | 6 +- 18 files changed, 553 insertions(+), 67 deletions(-) create mode 100644 packages/replay/src/eventBuffer/EventBufferPartitionedCompressionWorker.ts create mode 100644 packages/replay/src/eventBuffer/PartitionedQueue.ts create mode 100644 packages/replay/test/unit/eventBuffer/EventBufferPartitionedCompressionWorker.test.ts create mode 100644 packages/replay/test/unit/eventBuffer/PartitionedQueue.test.ts diff --git a/packages/replay/src/constants.ts b/packages/replay/src/constants.ts index 87bf1823b056..751f9c827675 100644 --- a/packages/replay/src/constants.ts +++ b/packages/replay/src/constants.ts @@ -24,8 +24,8 @@ export const MAX_SESSION_LIFE = 3_600_000; // 60 minutes export const DEFAULT_FLUSH_MIN_DELAY = 5_000; export const DEFAULT_FLUSH_MAX_DELAY = 5_000; -/* How long to wait for error checkouts */ -export const ERROR_CHECKOUT_TIME = 60_000; +/* How often to capture a full checkout when in error mode */ +export const ERROR_CHECKOUT_TIME = 30_000; export const RETRY_BASE_INTERVAL = 5000; export const RETRY_MAX_COUNT = 3; diff --git a/packages/replay/src/eventBuffer/EventBufferArray.ts b/packages/replay/src/eventBuffer/EventBufferArray.ts index 3ec54a526b9d..6dabc9fd0f76 100644 --- a/packages/replay/src/eventBuffer/EventBufferArray.ts +++ b/packages/replay/src/eventBuffer/EventBufferArray.ts @@ -1,4 +1,7 @@ +import type { ReplayRecordingData } from '@sentry/types'; + import type { AddEventResult, EventBuffer, RecordingEvent } from '../types'; +import { PartitionedQueue } from './PartitionedQueue'; /** * A basic event buffer that does not do any compression. @@ -6,42 +9,51 @@ import type { AddEventResult, EventBuffer, RecordingEvent } from '../types'; */ export class EventBufferArray implements EventBuffer { /** All the events that are buffered to be sent. */ - public events: RecordingEvent[]; + protected _events: PartitionedQueue; public constructor() { - this.events = []; + this._events = new PartitionedQueue(); + } + + /** @inheritdoc */ + public get events(): RecordingEvent[] { + return this._events.getItems(); } /** @inheritdoc */ public get hasEvents(): boolean { - return this.events.length > 0; + return this._events.getLength() > 0; } /** @inheritdoc */ public destroy(): void { - this.events = []; + this._events.clear(); } /** @inheritdoc */ - public async addEvent(event: RecordingEvent, isCheckout?: boolean): Promise { - if (isCheckout) { - this.events = [event]; - return; - } + public async clear(keepLastCheckout?: boolean): Promise { + this._events.clear(keepLastCheckout); + } - this.events.push(event); - return; + /** @inheritdoc */ + public async addEvent(event: RecordingEvent, isCheckout?: boolean): Promise { + this._events.add(event, isCheckout); } /** @inheritdoc */ - public finish(): Promise { + public finish(): Promise { return new Promise(resolve => { // Make a copy of the events array reference and immediately clear the // events member so that we do not lose new events while uploading // attachment. const eventsRet = this.events; - this.events = []; + this._events.clear(); resolve(JSON.stringify(eventsRet)); }); } + + /** @inheritdoc */ + public getEarliestTimestamp(): number | null { + return this.events.map(event => event.timestamp).sort()[0] || null; + } } diff --git a/packages/replay/src/eventBuffer/EventBufferCompressionWorker.ts b/packages/replay/src/eventBuffer/EventBufferCompressionWorker.ts index 3d532b68df28..135668285727 100644 --- a/packages/replay/src/eventBuffer/EventBufferCompressionWorker.ts +++ b/packages/replay/src/eventBuffer/EventBufferCompressionWorker.ts @@ -57,6 +57,18 @@ export class EventBufferCompressionWorker implements EventBuffer { return this._finishRequest(); } + /** @inheritdoc */ + public clear(): Promise { + // NOTE: We do not support keeping the last checkout in this implementation. + return this._clear(); + } + + /** @inheritdoc */ + public getEarliestTimestamp(): number | null { + // Not supported in this mode + return null; + } + /** * Send the event to the worker. */ diff --git a/packages/replay/src/eventBuffer/EventBufferPartitionedCompressionWorker.ts b/packages/replay/src/eventBuffer/EventBufferPartitionedCompressionWorker.ts new file mode 100644 index 000000000000..88fe0817b08f --- /dev/null +++ b/packages/replay/src/eventBuffer/EventBufferPartitionedCompressionWorker.ts @@ -0,0 +1,46 @@ +import type { ReplayRecordingData } from '@sentry/types'; + +import type { RecordingEvent } from '../types'; +import { EventBufferArray } from './EventBufferArray'; +import { WorkerHandler } from './WorkerHandler'; + +/** + * Event buffer that uses a web worker to compress events. + * Exported only for testing. + */ +export class EventBufferPartitionedCompressionWorker extends EventBufferArray { + private _worker: WorkerHandler; + + public constructor(worker: Worker) { + super(); + this._worker = new WorkerHandler(worker); + } + /** + * Ensure the worker is ready (or not). + * This will either resolve when the worker is ready, or reject if an error occured. + */ + public ensureReady(): Promise { + return this._worker.ensureReady(); + } + + /** @inheritdoc */ + public destroy(): void { + this._worker.destroy(); + super.destroy(); + } + + /** + * Finish the event buffer and return the compressed data. + */ + public finish(): Promise { + const { events } = this; + this._events.clear(); + + return this._compressEvents(events); + } + + /** Compress a given array of events at once. */ + private _compressEvents(events: RecordingEvent[]): Promise { + return this._worker.postMessage('compress', JSON.stringify(events)); + } +} diff --git a/packages/replay/src/eventBuffer/EventBufferProxy.ts b/packages/replay/src/eventBuffer/EventBufferProxy.ts index 24c5fa85f3a1..f33babd06901 100644 --- a/packages/replay/src/eventBuffer/EventBufferProxy.ts +++ b/packages/replay/src/eventBuffer/EventBufferProxy.ts @@ -4,6 +4,7 @@ import { logger } from '@sentry/utils'; import type { AddEventResult, EventBuffer, RecordingEvent } from '../types'; import { EventBufferArray } from './EventBufferArray'; import { EventBufferCompressionWorker } from './EventBufferCompressionWorker'; +import { EventBufferPartitionedCompressionWorker } from './EventBufferPartitionedCompressionWorker'; /** * This proxy will try to use the compression worker, and fall back to use the simple buffer if an error occurs there. @@ -12,15 +13,21 @@ import { EventBufferCompressionWorker } from './EventBufferCompressionWorker'; */ export class EventBufferProxy implements EventBuffer { private _fallback: EventBufferArray; - private _compression: EventBufferCompressionWorker; + private _compression: EventBufferCompressionWorker | EventBufferPartitionedCompressionWorker; private _used: EventBuffer; private _ensureWorkerIsLoadedPromise: Promise; - public constructor(worker: Worker) { + public constructor(worker: Worker, keepLastCheckout: boolean) { this._fallback = new EventBufferArray(); - this._compression = new EventBufferCompressionWorker(worker); - this._used = this._fallback; + // In error mode, we use the partitioned compression worker, which does not use compression streaming + // Instead, all events are sent at finish-time, as we need to continuously modify the queued events + // In session mode, we use a streaming compression implementation, which is more performant + this._compression = keepLastCheckout + ? new EventBufferPartitionedCompressionWorker(worker) + : new EventBufferCompressionWorker(worker); + + this._used = this._fallback; this._ensureWorkerIsLoadedPromise = this._ensureWorkerIsLoaded(); } @@ -52,11 +59,21 @@ export class EventBufferProxy implements EventBuffer { return this._used.finish(); } + /** @inheritdoc */ + public clear(keepLastCheckout?: boolean): Promise { + return this._used.clear(keepLastCheckout); + } + /** Ensure the worker has loaded. */ public ensureWorkerIsLoaded(): Promise { return this._ensureWorkerIsLoadedPromise; } + /** @inheritdoc */ + public getEarliestTimestamp(): number | null { + return this._used.getEarliestTimestamp(); + } + /** Actually check if the worker has been loaded. */ private async _ensureWorkerIsLoaded(): Promise { try { diff --git a/packages/replay/src/eventBuffer/PartitionedQueue.ts b/packages/replay/src/eventBuffer/PartitionedQueue.ts new file mode 100644 index 000000000000..6161d24afc9e --- /dev/null +++ b/packages/replay/src/eventBuffer/PartitionedQueue.ts @@ -0,0 +1,49 @@ +/** + * A queue with partitions for each checkout. + */ +export class PartitionedQueue { + private _items: T[]; + private _lastCheckoutPos?: number; + + public constructor() { + this._items = []; + } + + /** Add an item to the queue. */ + public add(record: T, isCheckout?: boolean): void { + this._items.push(record); + + if (isCheckout) { + this._lastCheckoutPos = this._items.length - 1; + } + } + + /** + * Clear items from the queue. + * If `keepLastCheckout` is given, all items after the last checkout will be kept. + */ + public clear(keepLastCheckout?: boolean): void { + if (!keepLastCheckout) { + this._items = []; + this._lastCheckoutPos = undefined; + return; + } + + if (this._lastCheckoutPos) { + this._items = this._items.splice(this._lastCheckoutPos); + this._lastCheckoutPos = undefined; + } + + // Else, there is only a single checkout recorded yet, which we don't want to clear out + } + + /** Get all items */ + public getItems(): T[] { + return this._items; + } + + /** Get the number of items that are queued. */ + public getLength(): number { + return this._items.length; + } +} diff --git a/packages/replay/src/eventBuffer/index.ts b/packages/replay/src/eventBuffer/index.ts index f0eb83c68243..cc6cac37649e 100644 --- a/packages/replay/src/eventBuffer/index.ts +++ b/packages/replay/src/eventBuffer/index.ts @@ -7,12 +7,13 @@ import { EventBufferProxy } from './EventBufferProxy'; interface CreateEventBufferParams { useCompression: boolean; + keepLastCheckout: boolean; } /** * Create an event buffer for replays. */ -export function createEventBuffer({ useCompression }: CreateEventBufferParams): EventBuffer { +export function createEventBuffer({ useCompression, keepLastCheckout }: CreateEventBufferParams): EventBuffer { // eslint-disable-next-line no-restricted-globals if (useCompression && window.Worker) { try { @@ -20,7 +21,7 @@ export function createEventBuffer({ useCompression }: CreateEventBufferParams): __DEBUG_BUILD__ && logger.log('[Replay] Using compression worker'); const worker = new Worker(workerUrl); - return new EventBufferProxy(worker); + return new EventBufferProxy(worker, keepLastCheckout); } catch (error) { __DEBUG_BUILD__ && logger.log('[Replay] Failed to create compression worker'); // Fall back to use simple event buffer array diff --git a/packages/replay/src/replay.ts b/packages/replay/src/replay.ts index c162a26cd5da..85012a1888f1 100644 --- a/packages/replay/src/replay.ts +++ b/packages/replay/src/replay.ts @@ -180,6 +180,7 @@ export class ReplayContainer implements ReplayContainerInterface { this.eventBuffer = createEventBuffer({ useCompression: this._options.useCompression, + keepLastCheckout: this.recordingMode === 'error', }); this._addListeners(); diff --git a/packages/replay/src/types.ts b/packages/replay/src/types.ts index 27c878a3edba..44e9435ef65c 100644 --- a/packages/replay/src/types.ts +++ b/packages/replay/src/types.ts @@ -23,7 +23,7 @@ export interface SendReplayData { */ export interface WorkerRequest { id: number; - method: 'clear' | 'addEvent' | 'finish'; + method: 'clear' | 'addEvent' | 'finish' | 'compress'; arg?: string; } @@ -280,6 +280,12 @@ export interface EventBuffer { * Clears and returns the contents of the buffer. */ finish(): Promise; + + /** Clear the buffer, and optional ensure we keep the last checkout. */ + clear(keepLastCheckout?: boolean): Promise; + + /** Get the earliest timestamp of a pending event. */ + getEarliestTimestamp(): number | null; } export type AddUpdateCallback = () => boolean | void; diff --git a/packages/replay/src/util/addEvent.ts b/packages/replay/src/util/addEvent.ts index e243ce11b56b..46dea20cc9ed 100644 --- a/packages/replay/src/util/addEvent.ts +++ b/packages/replay/src/util/addEvent.ts @@ -12,7 +12,9 @@ export async function addEvent( event: RecordingEvent, isCheckout?: boolean, ): Promise { - if (!replay.eventBuffer) { + const { eventBuffer, session } = replay; + + if (!eventBuffer || !session) { // This implies that `_isEnabled` is false return null; } @@ -38,12 +40,27 @@ export async function addEvent( // Only record earliest event if a new session was created, otherwise it // shouldn't be relevant const earliestEvent = replay.getContext().earliestEvent; - if (replay.session && replay.session.segmentId === 0 && (!earliestEvent || timestampInMs < earliestEvent)) { + if (session.segmentId === 0 && (!earliestEvent || timestampInMs < earliestEvent)) { replay.getContext().earliestEvent = timestampInMs; } try { - return await replay.eventBuffer.addEvent(event, isCheckout); + if (isCheckout) { + if (replay.recordingMode === 'error') { + // Do not wait on it, just do it + // We know in this mode this is actually "sync" + void eventBuffer.clear(true); + + // Ensure we have the correct first checkout timestamp when an error occurs + if (!session.segmentId) { + replay.getContext().earliestEvent = eventBuffer.getEarliestTimestamp(); + } + } else { + await eventBuffer.clear(); + } + } + + return await eventBuffer.addEvent(event, isCheckout); } catch (error) { __DEBUG_BUILD__ && logger.error(error); replay.stop(); diff --git a/packages/replay/test/integration/errorSampleRate.test.ts b/packages/replay/test/integration/errorSampleRate.test.ts index 5ba806734cae..34580b79ac69 100644 --- a/packages/replay/test/integration/errorSampleRate.test.ts +++ b/packages/replay/test/integration/errorSampleRate.test.ts @@ -33,7 +33,8 @@ describe('Integration | errorSampleRate', () => { beforeEach(async () => { ({ mockRecord, domHandler, replay } = await resetSdkMock({ replayOptions: { - stickySession: true, + stickySession: false, + useCompression: false, }, sentryOptions: { replaysSessionSampleRate: 0.0, @@ -326,11 +327,10 @@ describe('Integration | errorSampleRate', () => { }); }); - it('has correct timestamps when error occurs much later than initial pageload/checkout', async () => { + it('keeps up to the last two checkout events', async () => { const ELAPSED = ERROR_CHECKOUT_TIME; const TEST_EVENT = { data: {}, timestamp: BASE_TIMESTAMP, type: 3 }; mockRecord._emitter(TEST_EVENT); - // add a mock performance event replay.performanceEvents.push(PerformanceEntryResource()); @@ -356,21 +356,18 @@ describe('Integration | errorSampleRate', () => { jest.advanceTimersByTime(20); await new Promise(process.nextTick); - expect(replay.session?.started).toBe(BASE_TIMESTAMP + ELAPSED + 20); + expect(replay.session?.started).toBe(BASE_TIMESTAMP); - // Does not capture mouse click + // Does capture everything from the previous checkout expect(replay).toHaveSentReplay({ recordingPayloadHeader: { segment_id: 0 }, replayEventPayload: expect.objectContaining({ - // Make sure the old performance event is thrown out - replay_start_timestamp: (BASE_TIMESTAMP + ELAPSED + 20) / 1000, + replay_start_timestamp: BASE_TIMESTAMP / 1000, }), recordingData: JSON.stringify([ - { - data: { isCheckout: true }, - timestamp: BASE_TIMESTAMP + ELAPSED + 20, - type: 2, - }, + { data: { isCheckout: true }, timestamp: BASE_TIMESTAMP, type: 2 }, + { data: {}, timestamp: BASE_TIMESTAMP, type: 3 }, + { data: { isCheckout: true }, timestamp: BASE_TIMESTAMP + ELAPSED + 20, type: 2 }, ]), }); }); diff --git a/packages/replay/test/unit/eventBuffer/EventBufferArray.test.ts b/packages/replay/test/unit/eventBuffer/EventBufferArray.test.ts index 36d0b5aa0e00..30c271f29652 100644 --- a/packages/replay/test/unit/eventBuffer/EventBufferArray.test.ts +++ b/packages/replay/test/unit/eventBuffer/EventBufferArray.test.ts @@ -1,45 +1,53 @@ +import type { ReplayRecordingMode } from '@sentry/types'; + import { createEventBuffer } from './../../../src/eventBuffer'; import { BASE_TIMESTAMP } from './../../index'; const TEST_EVENT = { data: {}, timestamp: BASE_TIMESTAMP, type: 3 }; describe('Unit | eventBuffer | EventBufferArray', () => { - it('adds events to normal event buffer', async function () { - const buffer = createEventBuffer({ useCompression: false }); + for (const recordingMode of ['session', 'error'] as ReplayRecordingMode[]) { + it(`adds events to normal event buffer with recordingMode=${recordingMode}`, async function () { + const buffer = createEventBuffer({ useCompression: false, keepLastCheckout: recordingMode === 'error' }); - buffer.addEvent(TEST_EVENT); - buffer.addEvent(TEST_EVENT); + buffer.addEvent(TEST_EVENT); + buffer.addEvent(TEST_EVENT); - const result = await buffer.finish(); + const result = await buffer.finish(); - expect(result).toEqual(JSON.stringify([TEST_EVENT, TEST_EVENT])); - }); + expect(result).toEqual(JSON.stringify([TEST_EVENT, TEST_EVENT])); + }); - it('adds checkout event to normal event buffer', async function () { - const buffer = createEventBuffer({ useCompression: false }); + it(`adds checkout event to normal event buffer with recordingMode=${recordingMode}`, async function () { + const buffer = createEventBuffer({ useCompression: false, keepLastCheckout: recordingMode === 'error' }); - buffer.addEvent(TEST_EVENT); - buffer.addEvent(TEST_EVENT); - buffer.addEvent(TEST_EVENT); + buffer.addEvent(TEST_EVENT); + buffer.addEvent(TEST_EVENT); + buffer.addEvent(TEST_EVENT); - buffer.addEvent(TEST_EVENT, true); - const result = await buffer.finish(); + // Checkout triggers clear + buffer.clear(); + buffer.addEvent(TEST_EVENT, true); + buffer.addEvent(TEST_EVENT); + const result = await buffer.finish(); + buffer.addEvent(TEST_EVENT); - expect(result).toEqual(JSON.stringify([TEST_EVENT])); - }); + expect(result).toEqual(JSON.stringify([TEST_EVENT, TEST_EVENT])); + }); - it('calling `finish()` multiple times does not result in duplicated events', async function () { - const buffer = createEventBuffer({ useCompression: false }); + it(`calling \`finish()\` multiple times does not result in duplicated events with recordingMode=${recordingMode}`, async function () { + const buffer = createEventBuffer({ useCompression: false, keepLastCheckout: recordingMode === 'error' }); - buffer.addEvent(TEST_EVENT); + buffer.addEvent(TEST_EVENT); - const promise1 = buffer.finish(); - const promise2 = buffer.finish(); + const promise1 = buffer.finish(); + const promise2 = buffer.finish(); - const result1 = (await promise1) as Uint8Array; - const result2 = (await promise2) as Uint8Array; + const result1 = (await promise1) as Uint8Array; + const result2 = (await promise2) as Uint8Array; - expect(result1).toEqual(JSON.stringify([TEST_EVENT])); - expect(result2).toEqual(JSON.stringify([])); - }); + expect(result1).toEqual(JSON.stringify([TEST_EVENT])); + expect(result2).toEqual(JSON.stringify([])); + }); + } }); diff --git a/packages/replay/test/unit/eventBuffer/EventBufferCompressionWorker.test.ts b/packages/replay/test/unit/eventBuffer/EventBufferCompressionWorker.test.ts index 1e22716b553a..40776f72327b 100644 --- a/packages/replay/test/unit/eventBuffer/EventBufferCompressionWorker.test.ts +++ b/packages/replay/test/unit/eventBuffer/EventBufferCompressionWorker.test.ts @@ -3,6 +3,7 @@ import 'jsdom-worker'; import pako from 'pako'; import { BASE_TIMESTAMP } from '../..'; +import { EventBufferCompressionWorker } from '../../../src/eventBuffer/EventBufferCompressionWorker'; import { EventBufferProxy } from '../../../src/eventBuffer/EventBufferProxy'; import { createEventBuffer } from './../../../src/eventBuffer'; @@ -12,9 +13,11 @@ describe('Unit | eventBuffer | EventBufferCompressionWorker', () => { it('adds events to event buffer with compression worker', async function () { const buffer = createEventBuffer({ useCompression: true, + keepLastCheckout: false, }) as EventBufferProxy; expect(buffer).toBeInstanceOf(EventBufferProxy); + expect(buffer['_compression']).toBeInstanceOf(EventBufferCompressionWorker); // Ensure worker is ready await buffer.ensureWorkerIsLoaded(); @@ -32,9 +35,11 @@ describe('Unit | eventBuffer | EventBufferCompressionWorker', () => { it('adds checkout events to event buffer with compression worker', async function () { const buffer = createEventBuffer({ useCompression: true, + keepLastCheckout: false, }) as EventBufferProxy; expect(buffer).toBeInstanceOf(EventBufferProxy); + expect(buffer['_compression']).toBeInstanceOf(EventBufferCompressionWorker); // Ensure worker is ready await buffer.ensureWorkerIsLoaded(); @@ -55,9 +60,11 @@ describe('Unit | eventBuffer | EventBufferCompressionWorker', () => { it('calling `finish()` multiple times does not result in duplicated events', async function () { const buffer = createEventBuffer({ useCompression: true, + keepLastCheckout: false, }) as EventBufferProxy; expect(buffer).toBeInstanceOf(EventBufferProxy); + expect(buffer['_compression']).toBeInstanceOf(EventBufferCompressionWorker); // Ensure worker is ready await buffer.ensureWorkerIsLoaded(); @@ -79,9 +86,11 @@ describe('Unit | eventBuffer | EventBufferCompressionWorker', () => { it('calling `finish()` multiple times, with events in between, does not result in duplicated or dropped events', async function () { const buffer = createEventBuffer({ useCompression: true, + keepLastCheckout: false, }) as EventBufferProxy; expect(buffer).toBeInstanceOf(EventBufferProxy); + expect(buffer['_compression']).toBeInstanceOf(EventBufferCompressionWorker); // Ensure worker is ready await buffer.ensureWorkerIsLoaded(); @@ -107,9 +116,11 @@ describe('Unit | eventBuffer | EventBufferCompressionWorker', () => { it('handles an error when compressing the payload', async function () { const buffer = createEventBuffer({ useCompression: true, + keepLastCheckout: false, }) as EventBufferProxy; expect(buffer).toBeInstanceOf(EventBufferProxy); + expect(buffer['_compression']).toBeInstanceOf(EventBufferCompressionWorker); // Ensure worker is ready await buffer.ensureWorkerIsLoaded(); @@ -128,9 +139,11 @@ describe('Unit | eventBuffer | EventBufferCompressionWorker', () => { it('handles an error when adding an event', async function () { const buffer = createEventBuffer({ useCompression: true, + keepLastCheckout: false, }) as EventBufferProxy; expect(buffer).toBeInstanceOf(EventBufferProxy); + expect(buffer['_compression']).toBeInstanceOf(EventBufferCompressionWorker); // Ensure worker is ready await buffer.ensureWorkerIsLoaded(); diff --git a/packages/replay/test/unit/eventBuffer/EventBufferPartitionedCompressionWorker.test.ts b/packages/replay/test/unit/eventBuffer/EventBufferPartitionedCompressionWorker.test.ts new file mode 100644 index 000000000000..7ceac038fb63 --- /dev/null +++ b/packages/replay/test/unit/eventBuffer/EventBufferPartitionedCompressionWorker.test.ts @@ -0,0 +1,148 @@ +import 'jsdom-worker'; + +import pako from 'pako'; + +import { BASE_TIMESTAMP } from '../..'; +import { createEventBuffer } from '../../../src/eventBuffer'; +import { EventBufferPartitionedCompressionWorker } from '../../../src/eventBuffer/EventBufferPartitionedCompressionWorker'; +import { EventBufferProxy } from '../../../src/eventBuffer/EventBufferProxy'; + +const TEST_EVENT = { data: {}, timestamp: BASE_TIMESTAMP, type: 3 }; + +describe('Unit | eventBuffer | EventBufferPartitionedCompressionWorker', () => { + it('adds events to event buffer with compression worker', async function () { + const buffer = createEventBuffer({ + useCompression: true, + keepLastCheckout: true, + }) as EventBufferProxy; + + expect(buffer).toBeInstanceOf(EventBufferProxy); + expect(buffer['_compression']).toBeInstanceOf(EventBufferPartitionedCompressionWorker); + + // Ensure worker is ready + await buffer.ensureWorkerIsLoaded(); + + buffer.addEvent(TEST_EVENT); + buffer.addEvent(TEST_EVENT); + + const result = await buffer.finish(); + expect(result).toBeInstanceOf(Uint8Array); + const restored = pako.inflate(result as Uint8Array, { to: 'string' }); + + expect(restored).toEqual(JSON.stringify([TEST_EVENT, TEST_EVENT])); + }); + + it('adds checkout events to event buffer with compression worker', async function () { + const buffer = createEventBuffer({ + useCompression: true, + keepLastCheckout: true, + }) as EventBufferProxy; + + expect(buffer).toBeInstanceOf(EventBufferProxy); + expect(buffer['_compression']).toBeInstanceOf(EventBufferPartitionedCompressionWorker); + + // Ensure worker is ready + await buffer.ensureWorkerIsLoaded(); + + await buffer.addEvent(TEST_EVENT); + await buffer.addEvent(TEST_EVENT); + + // This should clear previous buffer, but keep last checkout + buffer.clear(true); + await buffer.addEvent({ ...TEST_EVENT, type: 2 }, true); + + await buffer.addEvent(TEST_EVENT); + await buffer.addEvent(TEST_EVENT); + + // This should clear previous buffer, but keep last checkout + buffer.clear(true); + await buffer.addEvent({ ...TEST_EVENT, type: 2 }, true); + + const result = await buffer.finish(); + expect(result).toBeInstanceOf(Uint8Array); + const restored = pako.inflate(result as Uint8Array, { to: 'string' }); + + expect(restored).toEqual( + JSON.stringify([{ ...TEST_EVENT, type: 2 }, TEST_EVENT, TEST_EVENT, { ...TEST_EVENT, type: 2 }]), + ); + }); + + it('calling `finish()` multiple times does not result in duplicated events', async function () { + const buffer = createEventBuffer({ + useCompression: true, + keepLastCheckout: true, + }) as EventBufferProxy; + + expect(buffer).toBeInstanceOf(EventBufferProxy); + expect(buffer['_compression']).toBeInstanceOf(EventBufferPartitionedCompressionWorker); + + // Ensure worker is ready + await buffer.ensureWorkerIsLoaded(); + + buffer.addEvent(TEST_EVENT); + + const promise1 = buffer.finish(); + const promise2 = buffer.finish(); + + const result1 = (await promise1) as Uint8Array; + const result2 = (await promise2) as Uint8Array; + const restored1 = pako.inflate(result1, { to: 'string' }); + const restored2 = pako.inflate(result2, { to: 'string' }); + + expect(restored1).toEqual(JSON.stringify([TEST_EVENT])); + expect(restored2).toEqual(JSON.stringify([])); + }); + + it('calling `finish()` multiple times, with events in between, does not result in duplicated or dropped events', async function () { + const buffer = createEventBuffer({ + useCompression: true, + keepLastCheckout: true, + }) as EventBufferProxy; + + expect(buffer).toBeInstanceOf(EventBufferProxy); + expect(buffer['_compression']).toBeInstanceOf(EventBufferPartitionedCompressionWorker); + + // Ensure worker is ready + await buffer.ensureWorkerIsLoaded(); + + buffer.addEvent(TEST_EVENT); + + const promise1 = buffer.finish(); + await new Promise(process.nextTick); + + buffer.addEvent({ ...TEST_EVENT, type: 5 }); + const promise2 = buffer.finish(); + + const result1 = (await promise1) as Uint8Array; + const result2 = (await promise2) as Uint8Array; + + const restored1 = pako.inflate(result1, { to: 'string' }); + const restored2 = pako.inflate(result2, { to: 'string' }); + + expect(restored1).toEqual(JSON.stringify([TEST_EVENT])); + expect(restored2).toEqual(JSON.stringify([{ ...TEST_EVENT, type: 5 }])); + }); + + it('handles an error when compressing the payload', async function () { + const buffer = createEventBuffer({ + useCompression: true, + keepLastCheckout: true, + }) as EventBufferProxy; + + expect(buffer).toBeInstanceOf(EventBufferProxy); + expect(buffer['_compression']).toBeInstanceOf(EventBufferPartitionedCompressionWorker); + + // Ensure worker is ready + await buffer.ensureWorkerIsLoaded(); + + await buffer.addEvent(TEST_EVENT); + await buffer.addEvent(TEST_EVENT); + + // @ts-ignore Mock this private so it triggers an error + jest.spyOn(buffer._compression._worker, 'postMessage').mockImplementationOnce(() => { + return Promise.reject('test worker error'); + }); + + await expect(() => buffer.finish()).rejects.toBeDefined(); + }); +}); diff --git a/packages/replay/test/unit/eventBuffer/EventBufferProxy.test.ts b/packages/replay/test/unit/eventBuffer/EventBufferProxy.test.ts index c4a98f3c446e..6ed6e91fee5a 100644 --- a/packages/replay/test/unit/eventBuffer/EventBufferProxy.test.ts +++ b/packages/replay/test/unit/eventBuffer/EventBufferProxy.test.ts @@ -23,6 +23,7 @@ describe('Unit | eventBuffer | EventBufferProxy', () => { it('waits for the worker to be loaded when calling finish', async function () { const buffer = createEventBuffer({ useCompression: true, + keepLastCheckout: false, }) as EventBufferProxy; expect(buffer).toBeInstanceOf(EventBufferProxy); @@ -41,7 +42,7 @@ describe('Unit | eventBuffer | EventBufferProxy', () => { const workerBlob = new Blob([workerString]); const workerUrl = URL.createObjectURL(workerBlob); const worker = new Worker(workerUrl); - const buffer = new EventBufferProxy(worker); + const buffer = new EventBufferProxy(worker, false); buffer.addEvent(TEST_EVENT); buffer.addEvent(TEST_EVENT); diff --git a/packages/replay/test/unit/eventBuffer/PartitionedQueue.test.ts b/packages/replay/test/unit/eventBuffer/PartitionedQueue.test.ts new file mode 100644 index 000000000000..8ab9f7f69902 --- /dev/null +++ b/packages/replay/test/unit/eventBuffer/PartitionedQueue.test.ts @@ -0,0 +1,58 @@ +import { PartitionedQueue } from '../../../src/eventBuffer/PartitionedQueue'; + +describe('Unit | eventBuffer | PartitionedQueue', () => { + it('works with empty queue', () => { + const queue = new PartitionedQueue(); + + expect(queue.getItems()).toEqual([]); + expect(queue.getLength()).toEqual(0); + + queue.clear(); + + expect(queue.getItems()).toEqual([]); + expect(queue.getLength()).toEqual(0); + + queue.clear(true); + + expect(queue.getItems()).toEqual([]); + expect(queue.getLength()).toEqual(0); + }); + + it('allows to add records', () => { + const queue = new PartitionedQueue(); + + queue.add('one'); + queue.add('two'); + queue.add('three'); + + expect(queue.getItems()).toEqual(['one', 'two', 'three']); + expect(queue.getLength()).toEqual(3); + + queue.clear(); + + expect(queue.getItems()).toEqual([]); + expect(queue.getLength()).toEqual(0); + }); + + it('allows to add records with checkouts', () => { + const queue = new PartitionedQueue(); + + queue.add('one'); + queue.add('two'); + queue.add('three', true); + queue.add('four'); + + expect(queue.getItems()).toEqual(['one', 'two', 'three', 'four']); + expect(queue.getLength()).toEqual(4); + + queue.clear(true); + + expect(queue.getItems()).toEqual(['three', 'four']); + expect(queue.getLength()).toEqual(2); + + queue.clear(true); + + expect(queue.getItems()).toEqual(['three', 'four']); + expect(queue.getLength()).toEqual(2); + }); +}); diff --git a/packages/replay/test/unit/util/addEvent.test.ts b/packages/replay/test/unit/util/addEvent.test.ts index 6cc2e6b6ffdf..4a7809621d32 100644 --- a/packages/replay/test/unit/util/addEvent.test.ts +++ b/packages/replay/test/unit/util/addEvent.test.ts @@ -1,7 +1,12 @@ import 'jsdom-worker'; +import { inflate } from 'pako'; + import { BASE_TIMESTAMP } from '../..'; -import type { EventBufferProxy } from '../../../src/eventBuffer/EventBufferProxy'; +import type { EventBufferArray } from '../../../src/eventBuffer/EventBufferArray'; +import type { EventBufferCompressionWorker } from '../../../src/eventBuffer/EventBufferCompressionWorker'; +import type { EventBufferPartitionedCompressionWorker } from '../../../src/eventBuffer/EventBufferPartitionedCompressionWorker'; +import { EventBufferProxy } from '../../../src/eventBuffer/EventBufferProxy'; import { addEvent } from '../../../src/util/addEvent'; import { setupReplayContainer } from '../../utils/setupReplayContainer'; import { useFakeTimers } from '../../utils/use-fake-timers'; @@ -29,4 +34,97 @@ describe('Unit | util | addEvent', () => { expect(replay.isEnabled()).toEqual(false); }); + + it.each([ + ['with compression ', true], + ['without compression', false], + ])('clears queue after two checkouts in error mode %s', async (_, useCompression) => { + jest.setSystemTime(BASE_TIMESTAMP); + + const replay = setupReplayContainer({ + options: { useCompression, sessionSampleRate: 0, errorSampleRate: 1 }, + }); + + const _eventBuffer = replay.eventBuffer as EventBufferArray | EventBufferProxy; + let eventBuffer: EventBufferProxy | EventBufferArray = _eventBuffer; + + if (eventBuffer instanceof EventBufferProxy) { + await eventBuffer.ensureWorkerIsLoaded(); + + // @ts-ignore access private + eventBuffer = eventBuffer._compression as EventBufferPartitionedCompressionWorker; + } + + addEvent(replay, { data: {}, timestamp: BASE_TIMESTAMP + 10, type: 2 }, false); + addEvent(replay, { data: {}, timestamp: BASE_TIMESTAMP + 0, type: 3 }); + await addEvent(replay, { data: {}, timestamp: BASE_TIMESTAMP + 100, type: 2 }, true); + + expect(replay.getContext().earliestEvent).toEqual(BASE_TIMESTAMP); + expect(eventBuffer.events).toEqual([ + { data: {}, timestamp: BASE_TIMESTAMP + 10, type: 2 }, + { data: {}, timestamp: BASE_TIMESTAMP, type: 3 }, + { data: {}, timestamp: BASE_TIMESTAMP + 100, type: 2 }, + ]); + + await addEvent(replay, { data: {}, timestamp: BASE_TIMESTAMP + 200, type: 2 }, true); + + expect(replay.getContext().earliestEvent).toEqual(BASE_TIMESTAMP + 100); + expect(eventBuffer.events).toEqual([ + { data: {}, timestamp: BASE_TIMESTAMP + 100, type: 2 }, + { data: {}, timestamp: BASE_TIMESTAMP + 200, type: 2 }, + ]); + + addEvent(replay, { data: {}, timestamp: BASE_TIMESTAMP + 250, type: 3 }, false); + await addEvent(replay, { data: {}, timestamp: BASE_TIMESTAMP + 300, type: 2 }, true); + + expect(replay.getContext().earliestEvent).toEqual(BASE_TIMESTAMP + 200); + expect(eventBuffer.events).toEqual([ + { data: {}, timestamp: BASE_TIMESTAMP + 200, type: 2 }, + { data: {}, timestamp: BASE_TIMESTAMP + 250, type: 3 }, + { data: {}, timestamp: BASE_TIMESTAMP + 300, type: 2 }, + ]); + + const events = await _eventBuffer.finish(); + const eventsString = typeof events === 'string' ? events : inflate(events, { to: 'string' }); + + expect(eventsString).toEqual( + JSON.stringify([ + { data: {}, timestamp: BASE_TIMESTAMP + 200, type: 2 }, + { data: {}, timestamp: BASE_TIMESTAMP + 250, type: 3 }, + { data: {}, timestamp: BASE_TIMESTAMP + 300, type: 2 }, + ]), + ); + }); + + it.each([ + ['with compression ', true], + ['without compression', false], + ])('clears queue after each checkout in session mode %s', async (_, useCompression) => { + jest.setSystemTime(BASE_TIMESTAMP); + + const replay = setupReplayContainer({ + options: { useCompression, sessionSampleRate: 1, errorSampleRate: 0 }, + }); + + const _eventBuffer = replay.eventBuffer!; + let eventBuffer = _eventBuffer; + + if (eventBuffer instanceof EventBufferProxy) { + await eventBuffer.ensureWorkerIsLoaded(); + + // @ts-ignore private api + eventBuffer = eventBuffer._compression as EventBufferCompressionWorker; + } + + addEvent(replay, { data: {}, timestamp: BASE_TIMESTAMP + 10, type: 2 }, false); + addEvent(replay, { data: {}, timestamp: BASE_TIMESTAMP + 0, type: 3 }); + await addEvent(replay, { data: {}, timestamp: BASE_TIMESTAMP + 100, type: 2 }, true); + + expect(replay.getContext().earliestEvent).toEqual(BASE_TIMESTAMP); + + const events = await _eventBuffer.finish(); + const eventsString = typeof events === 'string' ? events : inflate(events, { to: 'string' }); + + expect(eventsString).toEqual(JSON.stringify([{ data: {}, timestamp: BASE_TIMESTAMP + 100, type: 2 }])); + }); }); diff --git a/packages/replay/test/utils/setupReplayContainer.ts b/packages/replay/test/utils/setupReplayContainer.ts index 15eaa47d5736..697070593e5b 100644 --- a/packages/replay/test/utils/setupReplayContainer.ts +++ b/packages/replay/test/utils/setupReplayContainer.ts @@ -13,8 +13,8 @@ export function setupReplayContainer({ flushMinDelay: 100, flushMaxDelay: 100, stickySession: false, - sessionSampleRate: 0, - errorSampleRate: 1, + sessionSampleRate: 1, + errorSampleRate: 0, useCompression: false, blockAllMedia: true, _experiments: {}, @@ -30,8 +30,10 @@ export function setupReplayContainer({ replay['_setInitialState'](); replay['_loadAndCheckSession'](SESSION_IDLE_DURATION); replay['_isEnabled'] = true; + replay.recordingMode = replay.session?.sampled === 'error' ? 'error' : 'session'; replay.eventBuffer = createEventBuffer({ useCompression: options?.useCompression || false, + keepLastCheckout: replay.recordingMode === 'error', }); return replay;