diff --git a/packages/integration-tests/suites/replay/captureReplay/test.ts b/packages/integration-tests/suites/replay/captureReplay/test.ts index 80a3f52201dd..51a1d900f9ed 100644 --- a/packages/integration-tests/suites/replay/captureReplay/test.ts +++ b/packages/integration-tests/suites/replay/captureReplay/test.ts @@ -23,7 +23,7 @@ sentryTest('captureReplay', async ({ getLocalTestPath, page }) => { await page.goto(url); await page.click('button'); - await page.waitForTimeout(200); + await page.waitForTimeout(300); const replayEvent = await getFirstSentryEnvelopeRequest(page, url); diff --git a/packages/replay/src/replay.ts b/packages/replay/src/replay.ts index e82908defa58..06a8ae58f64a 100644 --- a/packages/replay/src/replay.ts +++ b/packages/replay/src/replay.ts @@ -1,7 +1,8 @@ /* eslint-disable max-lines */ // TODO: We might want to split this file up import { addGlobalEventProcessor, captureException, getCurrentHub, setContext } from '@sentry/core'; import type { Breadcrumb, ReplayEvent, ReplayRecordingMode, TransportMakeRequestResponse } from '@sentry/types'; -import { addInstrumentationHandler, logger } from '@sentry/utils'; +import type { RateLimits } from '@sentry/utils'; +import { addInstrumentationHandler, disabledUntil, isRateLimited, logger, updateRateLimits } from '@sentry/utils'; import { EventType, record } from 'rrweb'; import { @@ -128,6 +129,11 @@ export class ReplayContainer implements ReplayContainerInterface { initialUrl: '', }; + /** + * A RateLimits object holding the rate-limit durations in case a sent replay event was rate-limited. + */ + private _rateLimits: RateLimits = {}; + public constructor({ options, recordingOptions, @@ -988,7 +994,15 @@ export class ReplayContainer implements ReplayContainerInterface { const envelope = createReplayEnvelope(replayEvent, recordingData, dsn, client.getOptions().tunnel); try { - return await transport.send(envelope); + const response = await transport.send(envelope); + // TODO (v8): we can remove this guard once transport.send's type signature doesn't include void anymore + if (response) { + this._rateLimits = updateRateLimits(this._rateLimits, response); + if (isRateLimited(this._rateLimits, 'replay')) { + this._handleRateLimit(); + } + } + return response; } catch { throw new Error(UNABLE_TO_SEND_REPLAY); } @@ -1040,9 +1054,8 @@ export class ReplayContainer implements ReplayContainerInterface { throw new Error(`${UNABLE_TO_SEND_REPLAY} - max retries exceeded`); } - this._retryCount = this._retryCount + 1; // will retry in intervals of 5, 10, 30 - this._retryInterval = this._retryCount * this._retryInterval; + this._retryInterval = ++this._retryCount * this._retryInterval; return await new Promise((resolve, reject) => { setTimeout(async () => { @@ -1069,4 +1082,29 @@ export class ReplayContainer implements ReplayContainerInterface { saveSession(this.session); } } + + /** + * Pauses the replay and resumes it after the rate-limit duration is over. + */ + private _handleRateLimit(): void { + // in case recording is already paused, we don't need to do anything, as we might have already paused because of a + // rate limit + if (this.isPaused()) { + return; + } + + const rateLimitEnd = disabledUntil(this._rateLimits, 'replay'); + const rateLimitDuration = rateLimitEnd - Date.now(); + + if (rateLimitDuration > 0) { + __DEBUG_BUILD__ && logger.warn('[Replay]', `Rate limit hit, pausing replay for ${rateLimitDuration}ms`); + this.pause(); + this._debouncedFlush && this._debouncedFlush.cancel(); + + setTimeout(() => { + __DEBUG_BUILD__ && logger.info('[Replay]', 'Resuming replay after rate limit'); + this.resume(); + }, rateLimitDuration); + } + } } diff --git a/packages/replay/test/integration/rateLimiting.test.ts b/packages/replay/test/integration/rateLimiting.test.ts new file mode 100644 index 000000000000..1270d7c9d780 --- /dev/null +++ b/packages/replay/test/integration/rateLimiting.test.ts @@ -0,0 +1,301 @@ +import { getCurrentHub } from '@sentry/core'; +import type { Transport, TransportMakeRequestResponse } from '@sentry/types'; + +import { DEFAULT_FLUSH_MIN_DELAY, SESSION_IDLE_DURATION } from '../../src/constants'; +import type { ReplayContainer } from '../../src/replay'; +import { BASE_TIMESTAMP, mockSdk } from '../index'; +import { mockRrweb } from '../mocks/mockRrweb'; +import { clearSession } from '../utils/clearSession'; +import { useFakeTimers } from '../utils/use-fake-timers'; + +useFakeTimers(); + +async function advanceTimers(time: number) { + jest.advanceTimersByTime(time); + await new Promise(process.nextTick); +} + +type MockTransportSend = jest.MockedFunction; +type MockSendReplayRequest = jest.MockedFunction; + +describe('Integration | rate-limiting behaviour', () => { + let replay: ReplayContainer; + let mockTransportSend: MockTransportSend; + let mockSendReplayRequest: MockSendReplayRequest; + const { record: mockRecord } = mockRrweb(); + + beforeAll(async () => { + jest.setSystemTime(new Date(BASE_TIMESTAMP)); + + ({ replay } = await mockSdk({ + replayOptions: { + stickySession: false, + }, + })); + + // @ts-ignore private API + jest.spyOn(replay, '_sendReplayRequest'); + + jest.runAllTimers(); + mockTransportSend = getCurrentHub()?.getClient()?.getTransport()?.send as MockTransportSend; + mockSendReplayRequest = replay['_sendReplayRequest'] as MockSendReplayRequest; + }); + + beforeEach(() => { + jest.setSystemTime(new Date(BASE_TIMESTAMP)); + mockRecord.takeFullSnapshot.mockClear(); + mockTransportSend.mockClear(); + + // Create a new session and clear mocks because a segment (from initial + // checkout) will have already been uploaded by the time the tests run + clearSession(replay); + replay['_loadSession']({ expiry: 0 }); + + mockSendReplayRequest.mockClear(); + + replay['_rateLimits'] = {}; + }); + + afterEach(async () => { + jest.runAllTimers(); + await new Promise(process.nextTick); + jest.setSystemTime(new Date(BASE_TIMESTAMP)); + clearSession(replay); + jest.clearAllMocks(); + replay['_loadSession']({ expiry: SESSION_IDLE_DURATION }); + }); + + afterAll(() => { + replay && replay.stop(); + }); + + it.each([ + { + statusCode: 429, + headers: { + 'x-sentry-rate-limits': '30', + 'retry-after': null, + }, + }, + { + statusCode: 429, + headers: { + 'x-sentry-rate-limits': '30:replay', + 'retry-after': null, + }, + }, + { + statusCode: 429, + headers: { + 'x-sentry-rate-limits': null, + 'retry-after': '30', + }, + }, + ] as TransportMakeRequestResponse[])( + 'pauses recording and flushing a rate limit is hit and resumes both after the rate limit duration is over', + async rateLimitResponse => { + expect(replay.session?.segmentId).toBe(0); + jest.spyOn(replay, 'pause'); + jest.spyOn(replay, 'resume'); + // @ts-ignore private API + jest.spyOn(replay, '_handleRateLimit'); + // @ts-ignore private API + jest.spyOn(replay, '_sendReplay'); + + const TEST_EVENT = { data: {}, timestamp: BASE_TIMESTAMP, type: 2 }; + + mockTransportSend.mockImplementationOnce(() => { + return Promise.resolve(rateLimitResponse); + }); + + mockRecord._emitter(TEST_EVENT); + + // T = base + 5 + await advanceTimers(DEFAULT_FLUSH_MIN_DELAY); + + expect(mockRecord.takeFullSnapshot).not.toHaveBeenCalled(); + expect(mockTransportSend).toHaveBeenCalledTimes(1); + expect(replay).toHaveLastSentReplay({ events: JSON.stringify([TEST_EVENT]) }); + + expect(replay['_handleRateLimit']).toHaveBeenCalledTimes(1); + // resume() was called once before we even started + expect(replay.resume).not.toHaveBeenCalled(); + expect(replay.pause).toHaveBeenCalledTimes(1); + + // No user activity to trigger an update + expect(replay.session?.lastActivity).toBe(BASE_TIMESTAMP); + expect(replay.session?.segmentId).toBe(1); + + // let's simulate the rate-limit time of inactivity (30secs) and check that we don't do anything in the meantime + const TEST_EVENT2 = { data: {}, timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY, type: 3 }; + for (let i = 0; i < 5; i++) { + const ev = { + ...TEST_EVENT2, + timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY * (i + 1), + }; + mockRecord._emitter(ev); + await advanceTimers(DEFAULT_FLUSH_MIN_DELAY); + expect(replay.isPaused()).toBe(true); + expect(replay['_sendReplay']).toHaveBeenCalledTimes(1); + expect(mockTransportSend).toHaveBeenCalledTimes(1); + } + + // T = base + 35 + await advanceTimers(DEFAULT_FLUSH_MIN_DELAY); + + // now, recording should resume and first, we expect a checkout event to be sent, as resume() + // should trigger a full snapshot + expect(replay.resume).toHaveBeenCalledTimes(1); + expect(replay.isPaused()).toBe(false); + + expect(replay['_sendReplay']).toHaveBeenCalledTimes(2); + expect(replay).toHaveLastSentReplay({ + events: JSON.stringify([ + { data: { isCheckout: true }, timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY * 7, type: 2 }, + ]), + }); + + // and let's also emit a new event and check that it is recorded + const TEST_EVENT3 = { + data: {}, + timestamp: BASE_TIMESTAMP + 7 * DEFAULT_FLUSH_MIN_DELAY, + type: 3, + }; + mockRecord._emitter(TEST_EVENT3); + + // T = base + 40 + await advanceTimers(DEFAULT_FLUSH_MIN_DELAY); + expect(replay['_sendReplay']).toHaveBeenCalledTimes(3); + expect(replay).toHaveLastSentReplay({ events: JSON.stringify([TEST_EVENT3]) }); + + // nothing should happen afterwards + // T = base + 60 + await advanceTimers(20_000); + expect(replay['_sendReplay']).toHaveBeenCalledTimes(3); + expect(replay).toHaveLastSentReplay({ events: JSON.stringify([TEST_EVENT3]) }); + + // events array should be empty + expect(replay.eventBuffer?.pendingLength).toBe(0); + }, + ); + + it('handles rate-limits from a plain 429 response without any retry time', async () => { + expect(replay.session?.segmentId).toBe(0); + jest.spyOn(replay, 'pause'); + jest.spyOn(replay, 'resume'); + // @ts-ignore private API + jest.spyOn(replay, '_handleRateLimit'); + // @ts-ignore private API + jest.spyOn(replay, '_sendReplay'); + + const TEST_EVENT = { data: {}, timestamp: BASE_TIMESTAMP, type: 2 }; + + mockTransportSend.mockImplementationOnce(() => { + return Promise.resolve({ statusCode: 429 }); + }); + + mockRecord._emitter(TEST_EVENT); + + // T = base + 5 + await advanceTimers(DEFAULT_FLUSH_MIN_DELAY); + + expect(mockRecord.takeFullSnapshot).not.toHaveBeenCalled(); + expect(mockTransportSend).toHaveBeenCalledTimes(1); + expect(replay).toHaveLastSentReplay({ events: JSON.stringify([TEST_EVENT]) }); + + expect(replay['_handleRateLimit']).toHaveBeenCalledTimes(1); + // resume() was called once before we even started + expect(replay.resume).not.toHaveBeenCalled(); + expect(replay.pause).toHaveBeenCalledTimes(1); + + // No user activity to trigger an update + expect(replay.session?.lastActivity).toBe(BASE_TIMESTAMP); + expect(replay.session?.segmentId).toBe(1); + + // let's simulate the rate-limit time of inactivity (60secs) and check that we don't do anything in the meantime + // 60secs are the default we fall back to in the plain 429 case in updateRateLimits() + const TEST_EVENT2 = { data: {}, timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY, type: 3 }; + for (let i = 0; i < 11; i++) { + const ev = { + ...TEST_EVENT2, + timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY * (i + 1), + }; + mockRecord._emitter(ev); + await advanceTimers(DEFAULT_FLUSH_MIN_DELAY); + expect(replay.isPaused()).toBe(true); + expect(replay['_sendReplay']).toHaveBeenCalledTimes(1); + expect(mockTransportSend).toHaveBeenCalledTimes(1); + } + + // T = base + 60 + await advanceTimers(DEFAULT_FLUSH_MIN_DELAY); + + // now, recording should resume and first, we expect a checkout event to be sent, as resume() + // should trigger a full snapshot + expect(replay.resume).toHaveBeenCalledTimes(1); + expect(replay.isPaused()).toBe(false); + + expect(replay['_sendReplay']).toHaveBeenCalledTimes(2); + expect(replay).toHaveLastSentReplay({ + events: JSON.stringify([ + { data: { isCheckout: true }, timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY * 13, type: 2 }, + ]), + }); + + // and let's also emit a new event and check that it is recorded + const TEST_EVENT3 = { + data: {}, + timestamp: BASE_TIMESTAMP + 7 * DEFAULT_FLUSH_MIN_DELAY, + type: 3, + }; + mockRecord._emitter(TEST_EVENT3); + + // T = base + 65 + await advanceTimers(DEFAULT_FLUSH_MIN_DELAY); + expect(replay['_sendReplay']).toHaveBeenCalledTimes(3); + expect(replay).toHaveLastSentReplay({ events: JSON.stringify([TEST_EVENT3]) }); + + // nothing should happen afterwards + // T = base + 85 + await advanceTimers(20_000); + expect(replay['_sendReplay']).toHaveBeenCalledTimes(3); + expect(replay).toHaveLastSentReplay({ events: JSON.stringify([TEST_EVENT3]) }); + + // events array should be empty + expect(replay.eventBuffer?.pendingLength).toBe(0); + }); + + it("doesn't do anything, if a rate limit is hit and recording is already paused", async () => { + let paused = false; + expect(replay.session?.segmentId).toBe(0); + jest.spyOn(replay, 'isPaused').mockImplementation(() => { + return paused; + }); + jest.spyOn(replay, 'pause'); + jest.spyOn(replay, 'resume'); + // @ts-ignore private API + jest.spyOn(replay, '_handleRateLimit'); + + const TEST_EVENT = { data: {}, timestamp: BASE_TIMESTAMP, type: 2 }; + + mockTransportSend.mockImplementationOnce(() => { + return Promise.resolve({ statusCode: 429 }); + }); + + mockRecord._emitter(TEST_EVENT); + paused = true; + + // T = base + 5 + await advanceTimers(DEFAULT_FLUSH_MIN_DELAY); + + expect(mockRecord.takeFullSnapshot).not.toHaveBeenCalled(); + expect(mockTransportSend).toHaveBeenCalledTimes(1); + + expect(replay).toHaveLastSentReplay({ events: JSON.stringify([TEST_EVENT]) }); + + expect(replay['_handleRateLimit']).toHaveBeenCalledTimes(1); + expect(replay.resume).not.toHaveBeenCalled(); + expect(replay.isPaused).toHaveBeenCalledTimes(2); + expect(replay.pause).not.toHaveBeenCalled(); + }); +});