|
1 | 1 | import { Logger } from "@trigger.dev/core/logger";
|
| 2 | +import { tryCatch } from "@trigger.dev/core/utils"; |
2 | 3 | import { nanoid } from "nanoid";
|
3 | 4 | import pLimit from "p-limit";
|
4 | 5 | import { signalsEmitter } from "~/services/signals.server";
|
@@ -195,47 +196,62 @@ export class DynamicFlushScheduler<T> {
|
195 | 196 | // Schedule all batches for concurrent processing
|
196 | 197 | const flushPromises = batchesToFlush.map((batch) =>
|
197 | 198 | this.limiter(async () => {
|
198 |
| - const flushId = nanoid(); |
199 | 199 | const itemCount = batch.length;
|
200 | 200 |
|
201 |
| - try { |
202 |
| - const startTime = Date.now(); |
203 |
| - await this.callback(flushId, batch); |
204 |
| - |
205 |
| - const duration = Date.now() - startTime; |
206 |
| - this.totalQueuedItems -= itemCount; |
207 |
| - this.consecutiveFlushFailures = 0; |
208 |
| - this.lastFlushTime = Date.now(); |
209 |
| - this.metrics.flushedBatches++; |
210 |
| - this.metrics.totalItemsFlushed += itemCount; |
211 |
| - |
212 |
| - this.logger.debug("Batch flushed successfully", { |
213 |
| - flushId, |
214 |
| - itemCount, |
215 |
| - duration, |
216 |
| - remainingQueueDepth: this.totalQueuedItems, |
217 |
| - activeConcurrency: this.limiter.activeCount, |
218 |
| - pendingConcurrency: this.limiter.pendingCount, |
219 |
| - }); |
220 |
| - } catch (error) { |
221 |
| - this.consecutiveFlushFailures++; |
222 |
| - this.metrics.failedBatches++; |
| 201 | + const self = this; |
| 202 | + |
| 203 | + async function tryFlush(flushId: string, batchToFlush: T[], attempt: number = 1) { |
| 204 | + try { |
| 205 | + const startTime = Date.now(); |
| 206 | + await self.callback(flushId, batchToFlush); |
| 207 | + |
| 208 | + const duration = Date.now() - startTime; |
| 209 | + self.totalQueuedItems -= itemCount; |
| 210 | + self.consecutiveFlushFailures = 0; |
| 211 | + self.lastFlushTime = Date.now(); |
| 212 | + self.metrics.flushedBatches++; |
| 213 | + self.metrics.totalItemsFlushed += itemCount; |
| 214 | + |
| 215 | + self.logger.debug("Batch flushed successfully", { |
| 216 | + flushId, |
| 217 | + itemCount, |
| 218 | + duration, |
| 219 | + remainingQueueDepth: self.totalQueuedItems, |
| 220 | + activeConcurrency: self.limiter.activeCount, |
| 221 | + pendingConcurrency: self.limiter.pendingCount, |
| 222 | + }); |
| 223 | + } catch (error) { |
| 224 | + self.consecutiveFlushFailures++; |
| 225 | + self.metrics.failedBatches++; |
| 226 | + |
| 227 | + self.logger.error("Error attempting to flush batch", { |
| 228 | + flushId, |
| 229 | + itemCount, |
| 230 | + error, |
| 231 | + consecutiveFailures: self.consecutiveFlushFailures, |
| 232 | + attempt, |
| 233 | + }); |
| 234 | + |
| 235 | + // Back off on failures |
| 236 | + if (self.consecutiveFlushFailures > 5) { |
| 237 | + self.adjustConcurrency(true); |
| 238 | + } |
| 239 | + |
| 240 | + if (attempt <= 3) { |
| 241 | + await new Promise((resolve) => setTimeout(resolve, 500)); |
| 242 | + return await tryFlush(flushId, batchToFlush, attempt + 1); |
| 243 | + } else { |
| 244 | + throw error; |
| 245 | + } |
| 246 | + } |
| 247 | + } |
| 248 | + |
| 249 | + const [flushError] = await tryCatch(tryFlush(nanoid(), batch)); |
223 | 250 |
|
| 251 | + if (flushError) { |
224 | 252 | this.logger.error("Error flushing batch", {
|
225 |
| - flushId, |
226 |
| - itemCount, |
227 |
| - error, |
228 |
| - consecutiveFailures: this.consecutiveFlushFailures, |
| 253 | + error: flushError, |
229 | 254 | });
|
230 |
| - |
231 |
| - // Re-queue the batch at the front if it fails |
232 |
| - this.batchQueue.unshift(batch); |
233 |
| - this.totalQueuedItems += itemCount; |
234 |
| - |
235 |
| - // Back off on failures |
236 |
| - if (this.consecutiveFlushFailures > 3) { |
237 |
| - this.adjustConcurrency(true); |
238 |
| - } |
239 | 255 | }
|
240 | 256 | })
|
241 | 257 | );
|
|
0 commit comments