11import type { Envelope , InternalBaseTransportOptions , Transport , TransportMakeRequestResponse } from '@sentry/types' ;
2- import { logger } from '@sentry/utils' ;
2+ import { forEachEnvelopeItem , logger , parseRetryAfterHeader } from '@sentry/utils' ;
33
4- export const START_DELAY = 5_000 ;
5- const MAX_DELAY = 2_000_000_000 ;
4+ export const MIN_DELAY = 100 ; // 100 ms
5+ export const START_DELAY = 5_000 ; // 5 seconds
6+ const MAX_DELAY = 3.6e6 ; // 1 hour
67const DEFAULT_QUEUE_SIZE = 30 ;
78
8- function wasRateLimited ( result : TransportMakeRequestResponse ) : boolean {
9- return ! ! ( result . headers && result . headers [ 'x-sentry-rate-limits' ] ) ;
10- }
9+ function isReplayEnvelope ( envelope : Envelope ) : boolean {
10+ let isReplay = false ;
11+
12+ forEachEnvelopeItem ( envelope , ( _ , type ) => {
13+ if ( type === 'replay_event' ) {
14+ isReplay = true ;
15+ }
16+ } ) ;
1117
12- type BeforeSendResponse = 'send' | 'queue' | 'drop' ;
18+ return isReplay ;
19+ }
1320
1421interface OfflineTransportOptions extends InternalBaseTransportOptions {
1522 /**
16- * The maximum number of events to keep in the offline queue .
23+ * The maximum number of events to keep in the offline store .
1724 *
1825 * Defaults: 30
1926 */
2027 maxQueueSize ?: number ;
2128
2229 /**
23- * Flush the offline queue shortly after startup.
30+ * Flush the offline store shortly after startup.
2431 *
2532 * Defaults: false
2633 */
2734 flushAtStartup ?: boolean ;
2835
2936 /**
30- * Called when an event is queued .
31- */
32- eventQueued ?: ( ) => void ;
33-
34- /**
35- * Called before attempting to send an event to Sentry.
37+ * Called before an event is stored.
3638 *
37- * Return 'send' to attempt to send the event.
38- * Return 'queue' to queue the event for sending later.
39- * Return 'drop' to drop the event.
39+ * Return false to drop the envelope rather than store it.
40+ *
41+ * @param envelope The envelope that failed to send.
42+ * @param error The error that occurred.
43+ * @param retryDelay The current retry delay in milliseconds.
4044 */
41- beforeSend ?: ( request : Envelope ) => BeforeSendResponse | Promise < BeforeSendResponse > ;
45+ shouldStore ?: ( envelope : Envelope , error : Error , retryDelay : number ) => boolean | Promise < boolean > ;
4246}
4347
4448interface OfflineStore {
@@ -48,8 +52,10 @@ interface OfflineStore {
4852
4953export type CreateOfflineStore = ( maxQueueCount : number ) => OfflineStore ;
5054
55+ type Timer = number | { unref ?: ( ) => void } ;
56+
5157/**
52- * Wraps a transport and queues events when envelopes fail to send.
58+ * Wraps a transport and stores and retries events when they fail to send.
5359 *
5460 * @param createTransport The transport to wrap.
5561 * @param createStore The store implementation to use.
@@ -59,88 +65,104 @@ export function makeOfflineTransport<TO>(
5965 createStore : CreateOfflineStore ,
6066) : ( options : TO & OfflineTransportOptions ) => Transport {
6167 return options => {
62- const baseTransport = createTransport ( options ) ;
68+ const transport = createTransport ( options ) ;
6369 const maxQueueSize = options . maxQueueSize === undefined ? DEFAULT_QUEUE_SIZE : options . maxQueueSize ;
6470 const store = createStore ( maxQueueSize ) ;
6571
6672 let retryDelay = START_DELAY ;
73+ let flushTimer : Timer | undefined ;
6774
68- function queued ( ) : void {
69- if ( options . eventQueued ) {
70- options . eventQueued ( ) ;
71- }
75+ function log ( msg : string , error ?: Error ) : void {
76+ __DEBUG_BUILD__ && logger . info ( `[Offline]: ${ msg } ` , error ) ;
7277 }
7378
74- function queueRequest ( envelope : Envelope ) : Promise < void > {
75- return store . insert ( envelope ) . then ( ( ) => {
76- queued ( ) ;
79+ function shouldQueue ( env : Envelope , error : Error , retryDelay : number ) : boolean | Promise < boolean > {
80+ if ( isReplayEnvelope ( env ) ) {
81+ return false ;
82+ }
7783
78- setTimeout ( ( ) => {
79- void flushQueue ( ) ;
80- } , retryDelay ) ;
84+ if ( options . shouldStore ) {
85+ return options . shouldStore ( env , error , retryDelay ) ;
86+ }
8187
82- retryDelay *= 3 ;
88+ return true ;
89+ }
8390
84- // If the delay is bigger than 2^31 (max signed 32-bit int), setTimeout throws
85- // an error on node.js and falls back to 1 which can cause a huge number of requests.
86- if ( retryDelay > MAX_DELAY ) {
87- retryDelay = MAX_DELAY ;
91+ function flushIn ( delay : number ) : void {
92+ if ( flushTimer ) {
93+ clearTimeout ( flushTimer as ReturnType < typeof setTimeout > ) ;
94+ }
95+
96+ flushTimer = setTimeout ( async ( ) => {
97+ flushTimer = undefined ;
98+
99+ const found = await store . pop ( ) ;
100+ if ( found ) {
101+ log ( 'Attempting to send previously queued event' ) ;
102+ void send ( found ) . catch ( e => {
103+ log ( 'Failed to retry sending' , e ) ;
104+ } ) ;
88105 }
89- } ) ;
106+ } , delay ) as Timer ;
107+
108+ // We need to unref the timer in node.js, otherwise the node process never exit.
109+ if ( typeof flushTimer !== 'number' && typeof flushTimer . unref === 'function' ) {
110+ flushTimer . unref ( ) ;
111+ }
90112 }
91113
92- async function flushQueue ( ) : Promise < void > {
93- const found = await store . pop ( ) ;
114+ function flushWithBackOff ( ) : void {
115+ if ( flushTimer ) {
116+ return ;
117+ }
118+
119+ flushIn ( retryDelay ) ;
120+
121+ retryDelay *= 2 ;
94122
95- if ( found ) {
96- __DEBUG_BUILD__ && logger . info ( '[Offline]: Attempting to send previously queued event' ) ;
97- void send ( found ) ;
123+ if ( retryDelay > MAX_DELAY ) {
124+ retryDelay = MAX_DELAY ;
98125 }
99126 }
100127
101- async function send ( request : Envelope ) : Promise < void | TransportMakeRequestResponse > {
102- let action = 'send' ;
128+ async function send ( envelope : Envelope ) : Promise < void | TransportMakeRequestResponse > {
129+ try {
130+ const result = await transport . send ( envelope ) ;
103131
104- if ( options . beforeSend ) {
105- action = await options . beforeSend ( request ) ;
106- }
132+ let delay = MIN_DELAY ;
107133
108- if ( action === 'send' ) {
109- try {
110- const result = await baseTransport . send ( request ) ;
111- if ( wasRateLimited ( result || { } ) ) {
112- __DEBUG_BUILD__ && logger . info ( '[Offline]: Event queued due to rate limiting' ) ;
113- action = 'queue' ;
114- } else {
115- // Envelope was successfully sent
116- // Reset the retry delay
117- retryDelay = START_DELAY ;
118- // Check if there are any more in the queue
119- void flushQueue ( ) ;
134+ if ( result ) {
135+ // If there's a retry-after header, use that as the next delay.
136+ if ( result . headers && result . headers [ 'retry-after' ] ) {
137+ delay = parseRetryAfterHeader ( result . headers [ 'retry-after' ] ) ;
138+ } // If we have a server error, return now so we don't flush the queue.
139+ else if ( ( result . statusCode || 0 ) >= 400 ) {
120140 return result ;
121141 }
122- } catch ( e ) {
123- __DEBUG_BUILD__ && logger . info ( '[Offline]: Event queued due to error' , e ) ;
124- action = 'queue' ;
125142 }
126- }
127143
128- if ( action == 'queue' ) {
129- void queueRequest ( request ) ;
144+ flushIn ( delay ) ;
145+ retryDelay = START_DELAY ;
146+ return result ;
147+ } catch ( e ) {
148+ if ( await shouldQueue ( envelope , e , retryDelay ) ) {
149+ await store . insert ( envelope ) ;
150+ flushWithBackOff ( ) ;
151+ log ( 'Error sending. Event queued' , e ) ;
152+ return { } ;
153+ } else {
154+ throw e ;
155+ }
130156 }
131-
132- return { } ;
133157 }
134158
135159 if ( options . flushAtStartup ) {
136- setTimeout ( ( ) => {
137- void flushQueue ( ) ;
138- } , retryDelay ) ;
160+ flushWithBackOff ( ) ;
139161 }
140162
141163 return {
142164 send,
143- flush : ( timeout ?: number ) => baseTransport . flush ( timeout ) ,
165+ flush : ( timeout ?: number ) => transport . flush ( timeout ) ,
144166 } ;
145167 } ;
146168}
0 commit comments