Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ sentryTest('captureReplay', async ({ getLocalTestPath, page }) => {
await page.goto(url);

await page.click('button');
await page.waitForTimeout(200);
await page.waitForTimeout(300);

const replayEvent = await getFirstSentryEnvelopeRequest<Event>(page, url);

Expand Down
46 changes: 42 additions & 4 deletions packages/replay/src/replay.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
/* eslint-disable max-lines */ // TODO: We might want to split this file up
import { addGlobalEventProcessor, captureException, getCurrentHub, setContext } from '@sentry/core';
import type { Breadcrumb, ReplayEvent, ReplayRecordingMode, TransportMakeRequestResponse } from '@sentry/types';
import { addInstrumentationHandler, logger } from '@sentry/utils';
import type { RateLimits } from '@sentry/utils';
import { addInstrumentationHandler, disabledUntil, isRateLimited, logger, updateRateLimits } from '@sentry/utils';
import { EventType, record } from 'rrweb';

import {
Expand Down Expand Up @@ -128,6 +129,11 @@ export class ReplayContainer implements ReplayContainerInterface {
initialUrl: '',
};

/**
* A RateLimits object holding the rate-limit durations in case a sent replay event was rate-limited.
*/
private _rateLimits: RateLimits = {};

public constructor({
options,
recordingOptions,
Expand Down Expand Up @@ -988,7 +994,15 @@ export class ReplayContainer implements ReplayContainerInterface {
const envelope = createReplayEnvelope(replayEvent, recordingData, dsn, client.getOptions().tunnel);

try {
return await transport.send(envelope);
const response = await transport.send(envelope);
// TODO (v8): we can remove this guard once transport.send's type signature doesn't include void anymore
if (response) {
this._rateLimits = updateRateLimits(this._rateLimits, response);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, what's the reason to keep this on the class instance? Could we not just pass this through, e.g.:

const rateLimits = updateRateLimits({}, response);
// ...
this._handleRateLimits(rateLimits);

?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added _rateLimits to the class as I was initially worried about multiple pending requests which would both return different rate limit responses. Also, we keep the rateLimits object alive across multiple requests in the base transport.

But I guess the risk here is minimal, so I'm happy to change it to a local variable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I am also not 100% sure. I guess if there are multiple pending requests, they would just both result in the _handleRateLimits being called, right? Maybe we should add some logic there to handle this. Thinking of either:

a) Check if isPaused() === true, and if so, do nothing
b) Cancel the previous timeout, if it exists, and create a new one

I'd say a) is the easier one - and it should still work if we run into the rate limit again, as it will just re-pause it then. WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there are multiple pending requests, they would just both result in the _handleRateLimits being called, right?

Good point!

I'd say a) is the easier one

Agreed, will do this!

if (isRateLimited(this._rateLimits, 'replay')) {
this._handleRateLimit();
}
}
return response;
} catch {
throw new Error(UNABLE_TO_SEND_REPLAY);
}
Expand Down Expand Up @@ -1040,9 +1054,8 @@ export class ReplayContainer implements ReplayContainerInterface {
throw new Error(`${UNABLE_TO_SEND_REPLAY} - max retries exceeded`);
}

this._retryCount = this._retryCount + 1;
// will retry in intervals of 5, 10, 30
this._retryInterval = this._retryCount * this._retryInterval;
this._retryInterval = ++this._retryCount * this._retryInterval;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice optimization 👍


return await new Promise((resolve, reject) => {
setTimeout(async () => {
Expand All @@ -1069,4 +1082,29 @@ export class ReplayContainer implements ReplayContainerInterface {
saveSession(this.session);
}
}

/**
* Pauses the replay and resumes it after the rate-limit duration is over.
*/
private _handleRateLimit(): void {
// in case recording is already paused, we don't need to do anything, as we might have already paused because of a
// rate limit
if (this.isPaused()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice and easy 👍

return;
}

const rateLimitEnd = disabledUntil(this._rateLimits, 'replay');
const rateLimitDuration = rateLimitEnd - Date.now();

if (rateLimitDuration > 0) {
__DEBUG_BUILD__ && logger.warn('[Replay]', `Rate limit hit, pausing replay for ${rateLimitDuration}ms`);
this.pause();
this._debouncedFlush && this._debouncedFlush.cancel();

setTimeout(() => {
__DEBUG_BUILD__ && logger.info('[Replay]', 'Resuming replay after rate limit');
this.resume();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity cc @billyvg - when we resume, do we automatically do a full checkout? I am not totally sure, as we just call rrweb's record there, basically.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least that's what the JSdoc of startRecording tells me 😅

/**
* Start recording.
*
* Note that this will cause a new DOM checkout
*/
startRecording(): void {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good point :D

}, rateLimitDuration);
}
}
}
301 changes: 301 additions & 0 deletions packages/replay/test/integration/rateLimiting.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,301 @@
import { getCurrentHub } from '@sentry/core';
import type { Transport, TransportMakeRequestResponse } from '@sentry/types';

import { DEFAULT_FLUSH_MIN_DELAY, SESSION_IDLE_DURATION } from '../../src/constants';
import type { ReplayContainer } from '../../src/replay';
import { BASE_TIMESTAMP, mockSdk } from '../index';
import { mockRrweb } from '../mocks/mockRrweb';
import { clearSession } from '../utils/clearSession';
import { useFakeTimers } from '../utils/use-fake-timers';

useFakeTimers();

async function advanceTimers(time: number) {
jest.advanceTimersByTime(time);
await new Promise(process.nextTick);
}

type MockTransportSend = jest.MockedFunction<Transport['send']>;
type MockSendReplayRequest = jest.MockedFunction<ReplayContainer['_sendReplayRequest']>;

describe('Integration | rate-limiting behaviour', () => {
let replay: ReplayContainer;
let mockTransportSend: MockTransportSend;
let mockSendReplayRequest: MockSendReplayRequest;
const { record: mockRecord } = mockRrweb();

beforeAll(async () => {
jest.setSystemTime(new Date(BASE_TIMESTAMP));

({ replay } = await mockSdk({
replayOptions: {
stickySession: false,
},
}));

// @ts-ignore private API
jest.spyOn(replay, '_sendReplayRequest');

jest.runAllTimers();
mockTransportSend = getCurrentHub()?.getClient()?.getTransport()?.send as MockTransportSend;
mockSendReplayRequest = replay['_sendReplayRequest'] as MockSendReplayRequest;
});

beforeEach(() => {
jest.setSystemTime(new Date(BASE_TIMESTAMP));
mockRecord.takeFullSnapshot.mockClear();
mockTransportSend.mockClear();

// Create a new session and clear mocks because a segment (from initial
// checkout) will have already been uploaded by the time the tests run
clearSession(replay);
replay['_loadSession']({ expiry: 0 });

mockSendReplayRequest.mockClear();

replay['_rateLimits'] = {};
});

afterEach(async () => {
jest.runAllTimers();
await new Promise(process.nextTick);
jest.setSystemTime(new Date(BASE_TIMESTAMP));
clearSession(replay);
jest.clearAllMocks();
replay['_loadSession']({ expiry: SESSION_IDLE_DURATION });
});

afterAll(() => {
replay && replay.stop();
});

it.each([
{
statusCode: 429,
headers: {
'x-sentry-rate-limits': '30',
'retry-after': null,
},
},
{
statusCode: 429,
headers: {
'x-sentry-rate-limits': '30:replay',
'retry-after': null,
},
},
{
statusCode: 429,
headers: {
'x-sentry-rate-limits': null,
'retry-after': '30',
},
},
] as TransportMakeRequestResponse[])(
'pauses recording and flushing a rate limit is hit and resumes both after the rate limit duration is over',
async rateLimitResponse => {
expect(replay.session?.segmentId).toBe(0);
jest.spyOn(replay, 'pause');
jest.spyOn(replay, 'resume');
// @ts-ignore private API
jest.spyOn(replay, '_handleRateLimit');
// @ts-ignore private API
jest.spyOn(replay, '_sendReplay');

const TEST_EVENT = { data: {}, timestamp: BASE_TIMESTAMP, type: 2 };

mockTransportSend.mockImplementationOnce(() => {
return Promise.resolve(rateLimitResponse);
});

mockRecord._emitter(TEST_EVENT);

// T = base + 5
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);

expect(mockRecord.takeFullSnapshot).not.toHaveBeenCalled();
expect(mockTransportSend).toHaveBeenCalledTimes(1);
expect(replay).toHaveLastSentReplay({ events: JSON.stringify([TEST_EVENT]) });

expect(replay['_handleRateLimit']).toHaveBeenCalledTimes(1);
// resume() was called once before we even started
expect(replay.resume).not.toHaveBeenCalled();
expect(replay.pause).toHaveBeenCalledTimes(1);

// No user activity to trigger an update
expect(replay.session?.lastActivity).toBe(BASE_TIMESTAMP);
expect(replay.session?.segmentId).toBe(1);

// let's simulate the rate-limit time of inactivity (30secs) and check that we don't do anything in the meantime
const TEST_EVENT2 = { data: {}, timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY, type: 3 };
for (let i = 0; i < 5; i++) {
const ev = {
...TEST_EVENT2,
timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY * (i + 1),
};
mockRecord._emitter(ev);
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);
expect(replay.isPaused()).toBe(true);
expect(replay['_sendReplay']).toHaveBeenCalledTimes(1);
expect(mockTransportSend).toHaveBeenCalledTimes(1);
}

// T = base + 35
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);

// now, recording should resume and first, we expect a checkout event to be sent, as resume()
// should trigger a full snapshot
expect(replay.resume).toHaveBeenCalledTimes(1);
expect(replay.isPaused()).toBe(false);

expect(replay['_sendReplay']).toHaveBeenCalledTimes(2);
expect(replay).toHaveLastSentReplay({
events: JSON.stringify([
{ data: { isCheckout: true }, timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY * 7, type: 2 },
]),
});

// and let's also emit a new event and check that it is recorded
const TEST_EVENT3 = {
data: {},
timestamp: BASE_TIMESTAMP + 7 * DEFAULT_FLUSH_MIN_DELAY,
type: 3,
};
mockRecord._emitter(TEST_EVENT3);

// T = base + 40
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);
expect(replay['_sendReplay']).toHaveBeenCalledTimes(3);
expect(replay).toHaveLastSentReplay({ events: JSON.stringify([TEST_EVENT3]) });

// nothing should happen afterwards
// T = base + 60
await advanceTimers(20_000);
expect(replay['_sendReplay']).toHaveBeenCalledTimes(3);
expect(replay).toHaveLastSentReplay({ events: JSON.stringify([TEST_EVENT3]) });

// events array should be empty
expect(replay.eventBuffer?.pendingLength).toBe(0);
},
);

it('handles rate-limits from a plain 429 response without any retry time', async () => {
expect(replay.session?.segmentId).toBe(0);
jest.spyOn(replay, 'pause');
jest.spyOn(replay, 'resume');
// @ts-ignore private API
jest.spyOn(replay, '_handleRateLimit');
// @ts-ignore private API
jest.spyOn(replay, '_sendReplay');

const TEST_EVENT = { data: {}, timestamp: BASE_TIMESTAMP, type: 2 };

mockTransportSend.mockImplementationOnce(() => {
return Promise.resolve({ statusCode: 429 });
});

mockRecord._emitter(TEST_EVENT);

// T = base + 5
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);

expect(mockRecord.takeFullSnapshot).not.toHaveBeenCalled();
expect(mockTransportSend).toHaveBeenCalledTimes(1);
expect(replay).toHaveLastSentReplay({ events: JSON.stringify([TEST_EVENT]) });

expect(replay['_handleRateLimit']).toHaveBeenCalledTimes(1);
// resume() was called once before we even started
expect(replay.resume).not.toHaveBeenCalled();
expect(replay.pause).toHaveBeenCalledTimes(1);

// No user activity to trigger an update
expect(replay.session?.lastActivity).toBe(BASE_TIMESTAMP);
expect(replay.session?.segmentId).toBe(1);

// let's simulate the rate-limit time of inactivity (60secs) and check that we don't do anything in the meantime
// 60secs are the default we fall back to in the plain 429 case in updateRateLimits()
const TEST_EVENT2 = { data: {}, timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY, type: 3 };
for (let i = 0; i < 11; i++) {
const ev = {
...TEST_EVENT2,
timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY * (i + 1),
};
mockRecord._emitter(ev);
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);
expect(replay.isPaused()).toBe(true);
expect(replay['_sendReplay']).toHaveBeenCalledTimes(1);
expect(mockTransportSend).toHaveBeenCalledTimes(1);
}

// T = base + 60
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);

// now, recording should resume and first, we expect a checkout event to be sent, as resume()
// should trigger a full snapshot
expect(replay.resume).toHaveBeenCalledTimes(1);
expect(replay.isPaused()).toBe(false);

expect(replay['_sendReplay']).toHaveBeenCalledTimes(2);
expect(replay).toHaveLastSentReplay({
events: JSON.stringify([
{ data: { isCheckout: true }, timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY * 13, type: 2 },
]),
});

// and let's also emit a new event and check that it is recorded
const TEST_EVENT3 = {
data: {},
timestamp: BASE_TIMESTAMP + 7 * DEFAULT_FLUSH_MIN_DELAY,
type: 3,
};
mockRecord._emitter(TEST_EVENT3);

// T = base + 65
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);
expect(replay['_sendReplay']).toHaveBeenCalledTimes(3);
expect(replay).toHaveLastSentReplay({ events: JSON.stringify([TEST_EVENT3]) });

// nothing should happen afterwards
// T = base + 85
await advanceTimers(20_000);
expect(replay['_sendReplay']).toHaveBeenCalledTimes(3);
expect(replay).toHaveLastSentReplay({ events: JSON.stringify([TEST_EVENT3]) });

// events array should be empty
expect(replay.eventBuffer?.pendingLength).toBe(0);
});

it("doesn't do anything, if a rate limit is hit and recording is already paused", async () => {
let paused = false;
expect(replay.session?.segmentId).toBe(0);
jest.spyOn(replay, 'isPaused').mockImplementation(() => {
return paused;
});
jest.spyOn(replay, 'pause');
jest.spyOn(replay, 'resume');
// @ts-ignore private API
jest.spyOn(replay, '_handleRateLimit');

const TEST_EVENT = { data: {}, timestamp: BASE_TIMESTAMP, type: 2 };

mockTransportSend.mockImplementationOnce(() => {
return Promise.resolve({ statusCode: 429 });
});

mockRecord._emitter(TEST_EVENT);
paused = true;

// T = base + 5
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);

expect(mockRecord.takeFullSnapshot).not.toHaveBeenCalled();
expect(mockTransportSend).toHaveBeenCalledTimes(1);

expect(replay).toHaveLastSentReplay({ events: JSON.stringify([TEST_EVENT]) });

expect(replay['_handleRateLimit']).toHaveBeenCalledTimes(1);
expect(replay.resume).not.toHaveBeenCalled();
expect(replay.isPaused).toHaveBeenCalledTimes(2);
expect(replay.pause).not.toHaveBeenCalled();
});
});