From c128819d2ae64d74a205dcf50322f5d240382429 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Mon, 11 Aug 2025 17:19:48 +0100 Subject: [PATCH 1/4] The logger now supports metadata --- internal-packages/run-engine/src/engine/errors.ts | 3 ++- packages/core/src/logger.ts | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/internal-packages/run-engine/src/engine/errors.ts b/internal-packages/run-engine/src/engine/errors.ts index 9e7c4c40c8..e4ff702ee0 100644 --- a/internal-packages/run-engine/src/engine/errors.ts +++ b/internal-packages/run-engine/src/engine/errors.ts @@ -59,7 +59,8 @@ export function runStatusFromError(error: TaskRunError): TaskRunStatus { export class ServiceValidationError extends Error { constructor( message: string, - public status?: number + public status?: number, + public metadata?: Record ) { super(message); this.name = "ServiceValidationError"; diff --git a/packages/core/src/logger.ts b/packages/core/src/logger.ts index 7b0de5db09..1e7a811bcb 100644 --- a/packages/core/src/logger.ts +++ b/packages/core/src/logger.ts @@ -147,6 +147,7 @@ function extractStructuredErrorFromArgs(...args: Array | message: error.message, stack: error.stack, name: error.name, + metadata: "metadata" in error ? error.metadata : undefined, }; } @@ -157,6 +158,7 @@ function extractStructuredErrorFromArgs(...args: Array | message: structuredError.error.message, stack: structuredError.error.stack, name: structuredError.error.name, + metadata: "metadata" in structuredError.error ? structuredError.error.metadata : undefined, }; } From 73fb41317d0e39c587f365ea08356113eb5b4e66 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Mon, 11 Aug 2025 17:20:48 +0100 Subject: [PATCH 2/4] Added metadata to ServiceValidationError in some critical places --- .../src/engine/systems/checkpointSystem.ts | 18 ++++++++++++++++-- .../src/engine/systems/runAttemptSystem.ts | 11 +++++++++-- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts b/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts index 5c6cf2aa57..bec173d960 100644 --- a/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts @@ -270,11 +270,25 @@ export class CheckpointSystem { const snapshot = await getLatestExecutionSnapshot(prisma, runId); if (snapshot.id !== snapshotId) { - throw new ServiceValidationError("Snapshot ID doesn't match the latest snapshot", 400); + throw new ServiceValidationError( + "Snapshot ID doesn't match the latest snapshot in continueRunExecution", + 400, + { + snapshotId, + latestSnapshotId: snapshot.id, + } + ); } if (!isPendingExecuting(snapshot.executionStatus)) { - throw new ServiceValidationError("Snapshot is not in a valid state to continue", 400); + throw new ServiceValidationError( + "Snapshot is not in a valid state to continue in continueRunExecution", + 400, + { + snapshotId, + snapshotStatus: snapshot.executionStatus, + } + ); } // Get the run and update the status diff --git a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts index e8c78b2174..942c22f6be 100644 --- a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts @@ -318,9 +318,16 @@ export class RunAttemptSystem { //if there is a big delay between the snapshot and the attempt, the snapshot might have changed //we just want to log because elsewhere it should have been put back into a state where it can be attempted this.$.logger.warn( - "RunEngine.createRunAttempt(): snapshot has changed since the attempt was created, ignoring." + "RunEngine.createRunAttempt(): snapshot has changed since the attempt was created, ignoring.", + { + snapshotId, + latestSnapshotId: latestSnapshot.id, + } ); - throw new ServiceValidationError("Snapshot changed", 409); + throw new ServiceValidationError("Snapshot changed inside startRunAttempt", 409, { + snapshotId, + latestSnapshotId: latestSnapshot.id, + }); } const taskRun = await prisma.taskRun.findFirst({ From 8c11a6a22c08a7170ff5535069cf2500c7f90c35 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Mon, 11 Aug 2025 17:21:43 +0100 Subject: [PATCH 3/4] Don't ack the heartbeat if there's a mismatch, it might prevent a brand new one --- .../run-engine/src/engine/systems/executionSnapshotSystem.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts index a9d5600aea..4e5628d2de 100644 --- a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts @@ -361,7 +361,6 @@ export class ExecutionSnapshotSystem { runnerId, }); - await this.$.worker.ack(`heartbeatSnapshot.${runId}`); return executionResultFromSnapshot(latestSnapshot); } From 4ffcc427e7cf992774b2a119394a6c65c8ff3753 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Mon, 11 Aug 2025 17:24:43 +0100 Subject: [PATCH 4/4] Don't ack the heartbeat inside stalled. By returning it will be acked IF the deduplication key matches --- internal-packages/run-engine/src/engine/index.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 7657431140..20ef7d53e8 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -1155,7 +1155,6 @@ export class RunEngine { } ); - await this.worker.ack(`heartbeatSnapshot.${runId}`); return; }