11import type { ReplayRecordingData } from '@sentry/types' ;
2- import { logger } from '@sentry/utils' ;
32
4- import type { AddEventResult , EventBuffer , RecordingEvent , WorkerRequest , WorkerResponse } from '../types' ;
3+ import type { AddEventResult , EventBuffer , RecordingEvent } from '../types' ;
4+ import { WorkerHandler } from './WorkerHandler' ;
55
66/**
77 * Event buffer that uses a web worker to compress events.
88 * Exported only for testing.
99 */
1010export class EventBufferCompressionWorker implements EventBuffer {
11- /**
12- * Keeps track of the list of events since the last flush that have not been compressed.
13- * For example, page is reloaded and a flush attempt is made, but
14- * `finish()` (and thus the flush), does not complete.
15- */
16- public _pendingEvents : RecordingEvent [ ] = [ ] ;
11+ /** @inheritdoc */
12+ public hasEvents : boolean ;
1713
18- private _worker : Worker ;
19- private _eventBufferItemLength : number = 0 ;
20- private _id : number = 0 ;
21- private _ensureReadyPromise ?: Promise < void > ;
14+ private _worker : WorkerHandler ;
2215
2316 public constructor ( worker : Worker ) {
24- this . _worker = worker ;
25- }
26-
27- /**
28- * The number of raw events that are buffered. This may not be the same as
29- * the number of events that have been compresed in the worker because
30- * `addEvent` is async.
31- */
32- public get pendingLength ( ) : number {
33- return this . _eventBufferItemLength ;
34- }
35-
36- /**
37- * Returns a list of the raw recording events that are being compressed.
38- */
39- public get pendingEvents ( ) : RecordingEvent [ ] {
40- return this . _pendingEvents ;
17+ this . _worker = new WorkerHandler ( worker ) ;
18+ this . hasEvents = false ;
4119 }
4220
4321 /**
4422 * Ensure the worker is ready (or not).
4523 * This will either resolve when the worker is ready, or reject if an error occured.
4624 */
4725 public ensureReady ( ) : Promise < void > {
48- // Ensure we only check once
49- if ( this . _ensureReadyPromise ) {
50- return this . _ensureReadyPromise ;
51- }
52-
53- this . _ensureReadyPromise = new Promise ( ( resolve , reject ) => {
54- this . _worker . addEventListener (
55- 'message' ,
56- ( { data } : MessageEvent ) => {
57- if ( ( data as WorkerResponse ) . success ) {
58- resolve ( ) ;
59- } else {
60- reject ( ) ;
61- }
62- } ,
63- { once : true } ,
64- ) ;
65-
66- this . _worker . addEventListener (
67- 'error' ,
68- error => {
69- reject ( error ) ;
70- } ,
71- { once : true } ,
72- ) ;
73- } ) ;
74-
75- return this . _ensureReadyPromise ;
26+ return this . _worker . ensureReady ( ) ;
7627 }
7728
7829 /**
7930 * Destroy the event buffer.
8031 */
8132 public destroy ( ) : void {
82- __DEBUG_BUILD__ && logger . log ( '[Replay] Destroying compression worker' ) ;
83- this . _worker . terminate ( ) ;
33+ this . _worker . destroy ( ) ;
8434 }
8535
8636 /**
@@ -89,19 +39,12 @@ export class EventBufferCompressionWorker implements EventBuffer {
8939 * Returns true if event was successfuly received and processed by worker.
9040 */
9141 public async addEvent ( event : RecordingEvent , isCheckout ?: boolean ) : Promise < AddEventResult > {
42+ this . hasEvents = true ;
43+
9244 if ( isCheckout ) {
9345 // This event is a checkout, make sure worker buffer is cleared before
9446 // proceeding.
95- await this . _postMessage ( {
96- id : this . _getAndIncrementId ( ) ,
97- method : 'init' ,
98- args : [ ] ,
99- } ) ;
100- }
101-
102- // Don't store checkout events in `_pendingEvents` because they are too large
103- if ( ! isCheckout ) {
104- this . _pendingEvents . push ( event ) ;
47+ await this . _clear ( ) ;
10548 }
10649
10750 return this . _sendEventToWorker ( event ) ;
@@ -110,97 +53,30 @@ export class EventBufferCompressionWorker implements EventBuffer {
11053 /**
11154 * Finish the event buffer and return the compressed data.
11255 */
113- public async finish ( ) : Promise < ReplayRecordingData > {
114- try {
115- return await this . _finishRequest ( this . _getAndIncrementId ( ) ) ;
116- } catch ( error ) {
117- __DEBUG_BUILD__ && logger . error ( '[Replay] Error when trying to compress events' , error ) ;
118- // fall back to uncompressed
119- const events = this . pendingEvents ;
120- return JSON . stringify ( events ) ;
121- }
122- }
123-
124- /**
125- * Post message to worker and wait for response before resolving promise.
126- */
127- private _postMessage < T > ( { id, method, args } : WorkerRequest ) : Promise < T > {
128- return new Promise ( ( resolve , reject ) => {
129- const listener = ( { data } : MessageEvent ) : void => {
130- const response = data as WorkerResponse ;
131- if ( response . method !== method ) {
132- return ;
133- }
134-
135- // There can be multiple listeners for a single method, the id ensures
136- // that the response matches the caller.
137- if ( response . id !== id ) {
138- return ;
139- }
140-
141- // At this point, we'll always want to remove listener regardless of result status
142- this . _worker . removeEventListener ( 'message' , listener ) ;
143-
144- if ( ! response . success ) {
145- // TODO: Do some error handling, not sure what
146- __DEBUG_BUILD__ && logger . error ( '[Replay]' , response . response ) ;
147-
148- reject ( new Error ( 'Error in compression worker' ) ) ;
149- return ;
150- }
151-
152- resolve ( response . response as T ) ;
153- } ;
154-
155- let stringifiedArgs ;
156- try {
157- stringifiedArgs = JSON . stringify ( args ) ;
158- } catch ( err ) {
159- __DEBUG_BUILD__ && logger . error ( '[Replay] Error when trying to stringify args' , err ) ;
160- stringifiedArgs = '[]' ;
161- }
162-
163- // Note: we can't use `once` option because it's possible it needs to
164- // listen to multiple messages
165- this . _worker . addEventListener ( 'message' , listener ) ;
166- this . _worker . postMessage ( { id, method, args : stringifiedArgs } ) ;
167- } ) ;
56+ public finish ( ) : Promise < ReplayRecordingData > {
57+ return this . _finishRequest ( ) ;
16858 }
16959
17060 /**
17161 * Send the event to the worker.
17262 */
173- private async _sendEventToWorker ( event : RecordingEvent ) : Promise < AddEventResult > {
174- const promise = this . _postMessage < void > ( {
175- id : this . _getAndIncrementId ( ) ,
176- method : 'addEvent' ,
177- args : [ event ] ,
178- } ) ;
179-
180- // XXX: See note in `get length()`
181- this . _eventBufferItemLength ++ ;
182-
183- return promise ;
63+ private _sendEventToWorker ( event : RecordingEvent ) : Promise < AddEventResult > {
64+ return this . _worker . postMessage < void > ( 'addEvent' , JSON . stringify ( event ) ) ;
18465 }
18566
18667 /**
18768 * Finish the request and return the compressed data from the worker.
18869 */
189- private async _finishRequest ( id : number ) : Promise < Uint8Array > {
190- const promise = this . _postMessage < Uint8Array > ( { id, method : 'finish' , args : [ ] } ) ;
191-
192- // XXX: See note in `get length()`
193- this . _eventBufferItemLength = 0 ;
194-
195- await promise ;
70+ private async _finishRequest ( ) : Promise < Uint8Array > {
71+ const response = await this . _worker . postMessage < Uint8Array > ( 'finish' ) ;
19672
197- this . _pendingEvents = [ ] ;
73+ this . hasEvents = false ;
19874
199- return promise ;
75+ return response ;
20076 }
20177
202- /** Get the current ID and increment it for the next call . */
203- private _getAndIncrementId ( ) : number {
204- return this . _id ++ ;
78+ /** Clear any pending events from the worker . */
79+ private _clear ( ) : Promise < void > {
80+ return this . _worker . postMessage ( 'clear' ) ;
20581 }
20682}
0 commit comments