diff --git a/.changeset/angry-trainers-perform.md b/.changeset/angry-trainers-perform.md new file mode 100644 index 0000000000..ac468823eb --- /dev/null +++ b/.changeset/angry-trainers-perform.md @@ -0,0 +1,5 @@ +--- +"trigger.dev": patch +--- + +Stop failing attempt spans when a run is cancelled diff --git a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts index a00ffa3f3c..7919e075b2 100644 --- a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts @@ -251,6 +251,7 @@ export class SpanPresenter extends BasePresenter { engine: run.engine, region, workerQueue: run.workerQueue, + traceId: run.traceId, spanId: run.spanId, isCached: !!span.originalRun, machinePreset: machine?.name, diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx index 6a4e29476e..054c53f777 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx @@ -772,6 +772,14 @@ function RunBody({ Worker queue {run.workerQueue} + + Trace ID + {run.traceId} + + + Span ID + {run.spanId} + )} diff --git a/apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts b/apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts index ac7099357d..3080ae871a 100644 --- a/apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts +++ b/apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts @@ -90,11 +90,18 @@ export class IdempotencyKeyConcern { isError: associatedWaitpoint.outputIsError, }, async (event) => { + const spanId = + request.options?.parentAsLinkType === "replay" + ? event.spanId + : event.traceparent?.spanId + ? `${event.traceparent.spanId}:${event.spanId}` + : event.spanId; + //block run with waitpoint await this.engine.blockRunWithWaitpoint({ runId: RunId.fromFriendlyId(parentRunId), waitpoints: associatedWaitpoint.id, - spanIdToComplete: event.spanId, + spanIdToComplete: spanId, batch: request.options?.batchId ? { id: request.options.batchId, diff --git a/apps/webapp/app/v3/eventRepository.server.ts b/apps/webapp/app/v3/eventRepository.server.ts index a839c3423c..6a94c5b219 100644 --- a/apps/webapp/app/v3/eventRepository.server.ts +++ b/apps/webapp/app/v3/eventRepository.server.ts @@ -2,8 +2,8 @@ import { Attributes, AttributeValue, Link, trace, TraceFlags, Tracer } from "@op import { RandomIdGenerator } from "@opentelemetry/sdk-trace-base"; import { SemanticResourceAttributes } from "@opentelemetry/semantic-conventions"; import { + AttemptFailedSpanEvent, correctErrorStackTrace, - createPacketAttributesAsJson, ExceptionEventProperties, ExceptionSpanEvent, flattenAttributes, @@ -21,7 +21,7 @@ import { unflattenAttributes, } from "@trigger.dev/core/v3"; import { parseTraceparent, serializeTraceparent } from "@trigger.dev/core/v3/isomorphic"; -import { Prisma, TaskEvent, TaskEventKind, TaskEventStatus } from "@trigger.dev/database"; +import { Prisma, TaskEvent, TaskEventKind, TaskEventStatus, TaskRun } from "@trigger.dev/database"; import { nanoid } from "nanoid"; import { createHash } from "node:crypto"; import { EventEmitter } from "node:stream"; @@ -54,6 +54,22 @@ export type CreatableEventKind = TaskEventKind; export type CreatableEventStatus = TaskEventStatus; export type CreatableEventEnvironmentType = CreatableEvent["environmentType"]; +export type CompleteableTaskRun = Pick< + TaskRun, + | "friendlyId" + | "traceId" + | "spanId" + | "parentSpanId" + | "createdAt" + | "completedAt" + | "taskIdentifier" + | "projectId" + | "runtimeEnvironmentId" + | "organizationId" + | "environmentType" + | "isTest" +>; + export type TraceAttributes = Partial< Pick< CreatableEvent, @@ -137,6 +153,7 @@ export type QueriedEvent = Prisma.TaskEventGetPayload<{ events: true; environmentType: true; kind: true; + attemptNumber: true; }; }>; @@ -290,78 +307,286 @@ export class EventRepository { return await this.#flushBatch(nanoid(), events); } - async completeEvent( - storeTable: TaskEventStoreTable, - spanId: string, - startCreatedAt: Date, - endCreatedAt?: Date, - options?: UpdateEventOptions - ) { - const events = await this.queryIncompleteEvents( - storeTable, - { spanId }, - startCreatedAt, - endCreatedAt - ); + async completeSuccessfulRunEvent({ run, endTime }: { run: CompleteableTaskRun; endTime?: Date }) { + const startTime = convertDateToNanoseconds(run.createdAt); - if (events.length === 0) { - logger.warn("No incomplete events found for spanId", { spanId, options }); - return; - } + await this.insertImmediate({ + message: run.taskIdentifier, + serviceName: "api server", + serviceNamespace: "trigger.dev", + level: "TRACE", + kind: "SERVER", + traceId: run.traceId, + spanId: run.spanId, + parentId: run.parentSpanId, + runId: run.friendlyId, + taskSlug: run.taskIdentifier, + projectRef: "", + projectId: run.projectId, + environmentId: run.runtimeEnvironmentId, + environmentType: run.environmentType ?? "DEVELOPMENT", + organizationId: run.organizationId ?? "", + isPartial: false, + isError: false, + isCancelled: false, + status: "OK", + runIsTest: run.isTest, + startTime, + properties: {}, + metadata: undefined, + style: undefined, + duration: calculateDurationFromStart(startTime, endTime ?? new Date()), + output: undefined, + payload: undefined, + payloadType: undefined, + }); + } - const event = events[0]; + async completeCachedRunEvent({ + run, + blockedRun, + endTime, + spanId, + parentSpanId, + spanCreatedAt, + isError, + }: { + run: CompleteableTaskRun; + blockedRun: CompleteableTaskRun; + spanId: string; + parentSpanId: string; + spanCreatedAt: Date; + isError: boolean; + endTime?: Date; + }) { + const startTime = convertDateToNanoseconds(spanCreatedAt); - const output = options?.attributes.output - ? await createPacketAttributesAsJson( - options?.attributes.output, - options?.attributes.outputType ?? "application/json" - ) - : undefined; + await this.insertImmediate({ + message: run.taskIdentifier, + serviceName: "api server", + serviceNamespace: "trigger.dev", + level: "TRACE", + kind: "SERVER", + traceId: blockedRun.traceId, + spanId: spanId, + parentId: parentSpanId, + runId: blockedRun.friendlyId, + taskSlug: run.taskIdentifier, + projectRef: "", + projectId: run.projectId, + environmentId: run.runtimeEnvironmentId, + environmentType: run.environmentType ?? "DEVELOPMENT", + organizationId: run.organizationId ?? "", + isPartial: false, + isError, + isCancelled: false, + status: "OK", + runIsTest: run.isTest, + startTime, + properties: {}, + metadata: undefined, + style: undefined, + duration: calculateDurationFromStart(startTime, endTime ?? new Date()), + output: undefined, + payload: undefined, + payloadType: undefined, + }); + } - logger.debug("Completing event", { - spanId, - eventId: event.id, + async completeFailedRunEvent({ + run, + endTime, + exception, + }: { + run: CompleteableTaskRun; + endTime?: Date; + exception: { message?: string; type?: string; stacktrace?: string }; + }) { + const startTime = convertDateToNanoseconds(run.createdAt); + + await this.insertImmediate({ + message: run.taskIdentifier, + serviceName: "api server", + serviceNamespace: "trigger.dev", + level: "TRACE", + kind: "SERVER", + traceId: run.traceId, + spanId: run.spanId, + parentId: run.parentSpanId, + runId: run.friendlyId, + taskSlug: run.taskIdentifier, + projectRef: "", + projectId: run.projectId, + environmentId: run.runtimeEnvironmentId, + environmentType: run.environmentType ?? "DEVELOPMENT", + organizationId: run.organizationId ?? "", + isPartial: false, + isError: true, + isCancelled: false, + status: "ERROR", + runIsTest: run.isTest, + startTime, + events: [ + { + name: "exception", + time: endTime ?? new Date(), + properties: { + exception, + }, + }, + ], + properties: {}, + metadata: undefined, + style: undefined, + duration: calculateDurationFromStart(startTime, endTime ?? new Date()), + output: undefined, + payload: undefined, + payloadType: undefined, }); + } - const completedEvent = { - ...omit(event, "id"), + async completeExpiredRunEvent({ + run, + endTime, + ttl, + }: { + run: CompleteableTaskRun; + endTime?: Date; + ttl: string; + }) { + const startTime = convertDateToNanoseconds(run.createdAt); + + await this.insertImmediate({ + message: run.taskIdentifier, + serviceName: "api server", + serviceNamespace: "trigger.dev", + level: "TRACE", + kind: "SERVER", + traceId: run.traceId, + spanId: run.spanId, + parentId: run.parentSpanId, + runId: run.friendlyId, + taskSlug: run.taskIdentifier, + projectRef: "", + projectId: run.projectId, + environmentId: run.runtimeEnvironmentId, + environmentType: run.environmentType ?? "DEVELOPMENT", + organizationId: run.organizationId ?? "", isPartial: false, - isError: options?.attributes.isError ?? false, + isError: true, isCancelled: false, - status: options?.attributes.isError ? "ERROR" : "OK", - links: event.links ?? [], - events: event.events ?? (options?.events as any) ?? [], - duration: calculateDurationFromStart(event.startTime, options?.endTime), - properties: event.properties as Attributes, - metadata: event.metadata as Attributes, - style: event.style as Attributes, - output: output, - outputType: - options?.attributes.outputType === "application/store" || - options?.attributes.outputType === "text/plain" - ? options?.attributes.outputType - : "application/json", - payload: event.payload as Attributes, - payloadType: event.payloadType, - } satisfies CreatableEvent; + status: "ERROR", + runIsTest: run.isTest, + startTime, + events: [ + { + name: "exception", + time: endTime ?? new Date(), + properties: { + exception: { + message: `Run expired because the TTL (${ttl}) was reached`, + }, + }, + }, + ], + properties: {}, + metadata: undefined, + style: undefined, + duration: calculateDurationFromStart(startTime, endTime ?? new Date()), + output: undefined, + payload: undefined, + payloadType: undefined, + }); + } - await this.insert(completedEvent); + async createAttemptFailedRunEvent({ + run, + endTime, + attemptNumber, + exception, + }: { + run: CompleteableTaskRun; + endTime?: Date; + attemptNumber: number; + exception: { message?: string; type?: string; stacktrace?: string }; + }) { + const startTime = convertDateToNanoseconds(run.createdAt); - return completedEvent; + await this.insertImmediate({ + message: run.taskIdentifier, + serviceName: "api server", + serviceNamespace: "trigger.dev", + level: "TRACE", + kind: "UNSPECIFIED", // This will be treated as an "invisible" event + traceId: run.traceId, + spanId: run.spanId, + parentId: run.parentSpanId, + runId: run.friendlyId, + taskSlug: run.taskIdentifier, + projectRef: "", + projectId: run.projectId, + environmentId: run.runtimeEnvironmentId, + environmentType: run.environmentType ?? "DEVELOPMENT", + organizationId: run.organizationId ?? "", + isPartial: true, + isError: false, + isCancelled: false, + status: "OK", + runIsTest: run.isTest, + startTime, + events: [ + { + name: "attempt_failed", + time: endTime ?? new Date(), + properties: { + exception, + attemptNumber, + runId: run.friendlyId, + }, + } satisfies AttemptFailedSpanEvent, + ], + properties: {}, + metadata: undefined, + style: undefined, + duration: calculateDurationFromStart(startTime, endTime ?? new Date()), + output: undefined, + payload: undefined, + payloadType: undefined, + }); } - async cancelEvent(event: TaskEventRecord, cancelledAt: Date, reason: string) { - if (!event.isPartial) { - return; - } + async cancelRunEvent({ + reason, + run, + cancelledAt, + }: { + reason: string; + run: CompleteableTaskRun; + cancelledAt: Date; + }) { + const startTime = convertDateToNanoseconds(run.createdAt); await this.insertImmediate({ - ...omit(event, "id"), + message: run.taskIdentifier, + serviceName: "api server", + serviceNamespace: "trigger.dev", + level: "TRACE", + kind: "SERVER", + traceId: run.traceId, + spanId: run.spanId, + parentId: run.parentSpanId, + runId: run.friendlyId, + taskSlug: run.taskIdentifier, + projectRef: "", + projectId: run.projectId, + environmentId: run.runtimeEnvironmentId, + environmentType: run.environmentType ?? "DEVELOPMENT", + organizationId: run.organizationId ?? "", isPartial: false, - isError: false, + isError: true, isCancelled: true, status: "ERROR", - links: event.links ?? [], + runIsTest: run.isTest, events: [ { name: "cancellation", @@ -370,56 +595,18 @@ export class EventRepository { reason, }, }, - ...((event.events as any[]) ?? []), ], - duration: calculateDurationFromStart(event.startTime, cancelledAt), - properties: event.properties as Attributes, - metadata: event.metadata as Attributes, - style: event.style as Attributes, - output: event.output as Attributes, - outputType: event.outputType, - payload: event.payload as Attributes, - payloadType: event.payloadType, + startTime, + properties: {}, + metadata: undefined, + style: undefined, + duration: calculateDurationFromStart(startTime, cancelledAt), + output: undefined, + payload: undefined, + payloadType: undefined, }); } - async cancelEvents(events: TaskEventRecord[], cancelledAt: Date, reason: string) { - const eventsToCancel = events.filter((event) => event.isPartial); - - if (eventsToCancel.length === 0) { - return; - } - - await this.insertMany( - eventsToCancel.map((event) => ({ - ...omit(event, "id"), - isPartial: false, - isError: false, - isCancelled: true, - status: "ERROR", - links: event.links ?? [], - events: [ - { - name: "cancellation", - time: cancelledAt, - properties: { - reason, - }, - }, - ...((event.events as any[]) ?? []), - ], - duration: calculateDurationFromStart(event.startTime, cancelledAt), - properties: event.properties as Attributes, - metadata: event.metadata as Attributes, - style: event.style as Attributes, - output: event.output as Attributes, - outputType: event.outputType, - payload: event.payload as Attributes, - payloadType: event.payloadType, - })) - ); - } - async crashEvent({ event, crashedAt, @@ -461,75 +648,6 @@ export class EventRepository { }); } - async #queryEvents( - storeTable: TaskEventStoreTable, - queryOptions: QueryOptions, - startCreatedAt: Date, - endCreatedAt?: Date - ): Promise { - return await this.taskEventStore.findMany( - storeTable, - queryOptions, - startCreatedAt, - endCreatedAt - ); - } - - async queryIncompleteEvents( - storeTable: TaskEventStoreTable, - queryOptions: QueryOptions, - startCreatedAt: Date, - endCreatedAt?: Date, - allowCompleteDuplicate = false - ) { - // First we will find all the events that match the query options (selecting minimal data). - const taskEvents = await this.taskEventStore.findMany( - storeTable, - queryOptions, - startCreatedAt, - endCreatedAt, - { spanId: true, isPartial: true, isCancelled: true }, - undefined, - { limit: 500 } - ); - - // Optimize the filtering by pre-processing the data - const completeEventSpanIds = new Set(); - const incompleteEvents: Array<{ spanId: string }> = []; - - // Single pass to categorize events and build lookup structures - for (const event of taskEvents) { - if (!event.isPartial && !event.isCancelled) { - // This is a complete event - completeEventSpanIds.add(event.spanId); - } else if (event.isPartial && !event.isCancelled) { - // This is a potentially incomplete event - incompleteEvents.push(event); - } - // Skip cancelled events as they are not incomplete - } - - // Filter incomplete events, excluding those with complete duplicates - const filteredTaskEvents = allowCompleteDuplicate - ? incompleteEvents - : incompleteEvents.filter((event) => !completeEventSpanIds.has(event.spanId)); - - if (filteredTaskEvents.length === 0) { - return []; - } - - return this.#queryEvents( - storeTable, - { - spanId: { - in: filteredTaskEvents.map((event) => event.spanId), - }, - }, - startCreatedAt, - endCreatedAt - ); - } - public async getTraceSummary( storeTable: TaskEventStoreTable, traceId: string, @@ -566,8 +684,25 @@ export class EventRepository { continue; } + // This is an invisible event, and we just want to keep the original event but concat together + // the event.events with the existingEvent.events + if (event.kind === "UNSPECIFIED") { + eventsBySpanId.set(event.spanId, { + ...existingEvent, + events: [...(existingEvent.events ?? []), ...(event.events ?? [])], + }); + continue; + } + if (event.isCancelled || !event.isPartial) { - eventsBySpanId.set(event.spanId, event); + const mergedEvent: PreparedEvent = { + ...event, + // Preserve style from the original partial event + style: existingEvent.style, + events: [...(existingEvent.events ?? []), ...(event.events ?? [])], + }; + eventsBySpanId.set(event.spanId, mergedEvent); + continue; } } @@ -576,12 +711,23 @@ export class EventRepository { const spansBySpanId = new Map(); const spans = preparedEvents.map((event) => { - const ancestorCancelled = isAncestorCancelled(eventsBySpanId, event.spanId); - const duration = calculateDurationIfAncestorIsCancelled( - eventsBySpanId, - event.spanId, - event.duration - ); + const overrides = getAncestorOverrides({ + spansById: eventsBySpanId, + span: event, + }); + + const ancestorCancelled = overrides?.isCancelled ?? false; + const ancestorIsError = overrides?.isError ?? false; + const duration = overrides?.duration ?? event.duration; + const events = [...(overrides?.events ?? []), ...(event.events ?? [])]; + const isPartial = ancestorCancelled || ancestorIsError ? false : event.isPartial; + const isCancelled = + event.isCancelled === true ? true : event.isPartial && ancestorCancelled; + const isError = isCancelled + ? false + : typeof overrides?.isError === "boolean" + ? overrides.isError + : event.isError; const span = { id: event.spanId, @@ -592,13 +738,13 @@ export class EventRepository { message: event.message, style: event.style, duration, - isError: event.isError, - isPartial: ancestorCancelled ? false : event.isPartial, - isCancelled: event.isCancelled === true ? true : event.isPartial && ancestorCancelled, + isError, + isPartial, + isCancelled, isDebug: event.kind === TaskEventKind.LOG, startTime: getDateFromNanoseconds(event.startTime), level: event.level, - events: event.events, + events, environmentType: event.environmentType, }, }; @@ -661,8 +807,29 @@ export class EventRepository { continue; } + // This is an invisible event, and we just want to keep the original event but concat together + // the event.events with the existingEvent.events + if (event.kind === "UNSPECIFIED") { + eventsBySpanId.set(event.spanId, { + ...existingEvent, + events: [...(existingEvent.events ?? []), ...(event.events ?? [])], + }); + continue; + } + if (event.isCancelled || !event.isPartial) { - eventsBySpanId.set(event.spanId, event); + // If we have a cancelled event and an existing partial event, + // merge them: use cancelled event data but preserve style from the partial event + if (event.isCancelled && existingEvent.isPartial && !existingEvent.isCancelled) { + const mergedEvent: PreparedDetailedEvent = { + ...event, // Use cancelled event as base (has correct timing, status, events) + // Preserve style from the original partial event + style: existingEvent.style, + events: [...(existingEvent.events ?? []), ...(event.events ?? [])], + }; + eventsBySpanId.set(event.spanId, mergedEvent); + continue; + } } } @@ -677,12 +844,23 @@ export class EventRepository { // First pass: create all span detailed summaries for (const event of preparedEvents) { - const ancestorCancelled = isAncestorCancelled(eventsBySpanId, event.spanId); - const duration = calculateDurationIfAncestorIsCancelled( - eventsBySpanId, - event.spanId, - event.duration - ); + const overrides = getAncestorOverrides({ + spansById: eventsBySpanId, + span: event, + }); + + const ancestorCancelled = overrides?.isCancelled ?? false; + const ancestorIsError = overrides?.isError ?? false; + const duration = overrides?.duration ?? event.duration; + const events = [...(overrides?.events ?? []), ...(event.events ?? [])]; + const isPartial = ancestorCancelled || ancestorIsError ? false : event.isPartial; + const isCancelled = + event.isCancelled === true ? true : event.isPartial && ancestorCancelled; + const isError = isCancelled + ? false + : typeof overrides?.isError === "boolean" + ? overrides.isError + : event.isError; const output = event.output ? (event.output as Attributes) : undefined; const properties = event.properties @@ -697,12 +875,12 @@ export class EventRepository { runId: event.runId, taskSlug: event.taskSlug ?? undefined, taskPath: event.taskPath ?? undefined, - events: event.events?.filter((e) => !e.name.startsWith("trigger.dev")), + events: events?.filter((e) => !e.name.startsWith("trigger.dev")), startTime: getDateFromNanoseconds(event.startTime), duration: nanosecondsToMilliseconds(duration), - isError: event.isError, - isPartial: ancestorCancelled ? false : event.isPartial, - isCancelled: event.isCancelled === true ? true : event.isPartial && ancestorCancelled, + isError, + isPartial, + isCancelled, level: event.level, environmentType: event.environmentType, workerVersion: event.workerVersion ?? undefined, @@ -772,12 +950,17 @@ export class EventRepository { events: true, environmentType: true, taskSlug: true, + attemptNumber: true, } ); let preparedEvents: Array = []; for (const event of events) { + if (event.kind === "UNSPECIFIED") { + continue; + } + preparedEvents.push(prepareEvent(event)); } @@ -862,7 +1045,7 @@ export class EventRepository { } const spanEvents = transformEvents( - preparedEvent.events, + span.data.events, spanEvent.metadata as Attributes, spanEvent.environmentType === "DEVELOPMENT" ); @@ -902,8 +1085,7 @@ export class EventRepository { endCreatedAt?: Date ) { return await startActiveSpan("createSpanFromEvent", async (s) => { - let ancestorCancelled = false; - let duration = event.duration; + let overrides: AncestorOverrides | undefined; if (!event.isCancelled && event.isPartial) { await this.#walkSpanAncestors( @@ -917,7 +1099,9 @@ export class EventRepository { } if (ancestorEvent.isCancelled) { - ancestorCancelled = true; + overrides = { + isCancelled: true, + }; // We need to get the cancellation time from the cancellation span event const cancellationEvent = ancestorEvent.events.find( @@ -925,17 +1109,56 @@ export class EventRepository { ); if (cancellationEvent) { - duration = calculateDurationFromStart(event.startTime, cancellationEvent.time); + overrides.duration = calculateDurationFromStart( + event.startTime, + cancellationEvent.time + ); } return { stop: true }; } + const attemptFailedEvent = (ancestorEvent.events ?? []).find( + (spanEvent) => + spanEvent.name === "attempt_failed" && + spanEvent.properties.attemptNumber === event.attemptNumber + ); + + if (!attemptFailedEvent) { + return { stop: false }; + } + + overrides = { + isError: true, + events: [ + { + name: "exception", + time: attemptFailedEvent.time, + properties: { + exception: (attemptFailedEvent as AttemptFailedSpanEvent).properties.exception, + }, + }, + ], + duration: calculateDurationFromStart(event.startTime, attemptFailedEvent.time), + }; + return { stop: false }; } ); } + const ancestorCancelled = overrides?.isCancelled ?? false; + const ancestorIsError = overrides?.isError ?? false; + const duration = overrides?.duration ?? event.duration; + const events = [...(overrides?.events ?? []), ...(event.events ?? [])]; + const isPartial = ancestorCancelled || ancestorIsError ? false : event.isPartial; + const isCancelled = event.isCancelled === true ? true : event.isPartial && ancestorCancelled; + const isError = isCancelled + ? false + : typeof overrides?.isError === "boolean" + ? overrides.isError + : event.isError; + const span = { id: event.spanId, parentId: event.parentId ?? undefined, @@ -945,12 +1168,12 @@ export class EventRepository { message: event.message, style: event.style, duration, - isError: event.isError, - isPartial: ancestorCancelled ? false : event.isPartial, - isCancelled: event.isCancelled === true ? true : event.isPartial && ancestorCancelled, + isError, + isPartial, + isCancelled, startTime: getDateFromNanoseconds(event.startTime), level: event.level, - events: event.events, + events, environmentType: event.environmentType, }, }; @@ -1026,19 +1249,80 @@ export class EventRepository { ); let finalEvent: TaskEvent | undefined; + let overrideEvents: TaskEvent[] = []; + let partialEvent: TaskEvent | undefined; + // Separate partial and final events for (const event of events) { - if (event.isPartial && finalEvent) { + if (event.kind === "UNSPECIFIED") { + overrideEvents.push(event); continue; } - finalEvent = event; + if (event.isPartial) { + // Take the first partial event (earliest) + if (!partialEvent) { + partialEvent = event; + } + } else { + // Take the last complete/cancelled event (most recent) + finalEvent = event; + } + } + + // If we have both partial and final events, merge them intelligently + if (finalEvent && partialEvent) { + return this.#mergeOverrides( + this.#mergePartialWithFinalEvent(partialEvent, finalEvent), + overrideEvents + ); } - return finalEvent; + // Return whichever event we have + return this.#mergeOverrides(finalEvent ?? partialEvent, overrideEvents); }); } + /** + * Merges a partial event with a final (complete/cancelled) event. + * Uses the final event as base but fills in missing fields from the partial event. + */ + #mergePartialWithFinalEvent(partialEvent: TaskEvent, finalEvent: TaskEvent): TaskEvent { + const merged = { + ...finalEvent, // Use final event as base + // Override with partial event fields only if final event fields are missing/empty + properties: isEmpty(finalEvent.properties) ? partialEvent.properties : finalEvent.properties, + metadata: isEmpty(finalEvent.metadata) ? partialEvent.metadata : finalEvent.metadata, + style: isEmpty(finalEvent.style) ? partialEvent.style : finalEvent.style, + output: isEmpty(finalEvent.output) ? partialEvent.output : finalEvent.output, + payload: isEmpty(finalEvent.payload) ? partialEvent.payload : finalEvent.payload, + payloadType: !finalEvent.payloadType ? partialEvent.payloadType : finalEvent.payloadType, + }; + + return merged; + } + + #mergeOverrides( + event: TaskEvent | undefined, + overrideEvents: TaskEvent[] + ): TaskEvent | undefined { + function extractEventsFromEvent(event: TaskEvent): SpanEvent[] { + return (event.events ?? []) as unknown as SpanEvent[]; + } + + if (!event) { + return; + } + + return { + ...event, + events: [ + ...extractEventsFromEvent(event), + ...overrideEvents.flatMap(extractEventsFromEvent), + ] as unknown as Prisma.JsonValue, + }; + } + public async recordEvent( message: string, options: TraceEventOptions & { duration?: number; parentId?: string } @@ -1141,11 +1425,6 @@ export class EventRepository { ): Promise { const propagatedContext = extractContextFromCarrier(options.context ?? {}); - logger.debug("[otelContext]", { - propagatedContext, - options, - }); - const start = process.hrtime.bigint(); const startTime = options.startTime ?? getNowInNanoseconds(); @@ -1708,98 +1987,128 @@ function parseStyleField(style: Prisma.JsonValue): TaskEventStyle { return {}; } -function isAncestorCancelled( - events: Map, - spanId: string -) { - const event = events.get(spanId); +type AncestorOverrides = { + isCancelled?: boolean; + duration?: number; + isError?: boolean; + events?: SpanEvents; +}; + +function getAncestorOverrides({ + spansById, + span, +}: { + spansById: Map; + span: PreparedEvent; +}): AncestorOverrides | undefined { + const overrides: AncestorOverrides = {}; - if (!event) { - return false; + if (span.level !== "TRACE") { + return; } - if (event.isCancelled) { - return true; + const cancelledAncestor = findCancelledAncestor(spansById, span, span.spanId); + + if (cancelledAncestor) { + overrides.isCancelled = true; + + // We need to get the cancellation time from the cancellation span event + const cancellationEvent = cancelledAncestor.events.find( + (event) => event.name === "cancellation" + ); + + if (cancellationEvent) { + overrides.duration = calculateDurationFromStart(span.startTime, cancellationEvent.time); + } + + return overrides; } - if (event.parentId) { - return isAncestorCancelled(events, event.parentId); + const attemptFailedAncestorEvent = findAttemptFailedAncestor(spansById, span, span.spanId); + + if (attemptFailedAncestorEvent) { + overrides.isError = true; + overrides.events = [ + { + name: "exception", + time: attemptFailedAncestorEvent.time, + properties: { + exception: attemptFailedAncestorEvent.properties.exception, + }, + } satisfies ExceptionSpanEvent, + ]; + overrides.duration = calculateDurationFromStart( + span.startTime, + attemptFailedAncestorEvent.time + ); + + return overrides; } - return false; + return; } -function calculateDurationIfAncestorIsCancelled( - events: Map< - string, - { - isCancelled: boolean; - parentId: string | null; - isPartial: boolean; - startTime: bigint; - events: SpanEvents; - } - >, - spanId: string, - defaultDuration: number +function findCancelledAncestor( + spansById: Map, + originalSpan: PreparedEvent, + spanId?: string | null ) { - const event = events.get(spanId); - - if (!event) { - return defaultDuration; + if (!spanId) { + return; } - if (event.isCancelled) { - return defaultDuration; + if (originalSpan.spanId === spanId) { + return findCancelledAncestor(spansById, originalSpan, originalSpan.parentId); } - if (!event.isPartial) { - return defaultDuration; - } + const ancestorSpan = spansById.get(spanId); - if (event.parentId) { - const cancelledAncestor = findFirstCancelledAncestor(events, event.parentId); + if (!ancestorSpan) { + return; + } - if (cancelledAncestor) { - // We need to get the cancellation time from the cancellation span event - const cancellationEvent = cancelledAncestor.events.find( - (event) => event.name === "cancellation" - ); + if (ancestorSpan.isCancelled) { + return ancestorSpan; + } - if (cancellationEvent) { - return calculateDurationFromStart(event.startTime, cancellationEvent.time); - } - } + if (ancestorSpan.parentId) { + return findCancelledAncestor(spansById, originalSpan, ancestorSpan.parentId); } - return defaultDuration; + return; } -function findFirstCancelledAncestor( - events: Map< - string, - { - isCancelled: boolean; - parentId: string | null; - isPartial: boolean; - startTime: bigint; - events: SpanEvents; - } - >, - spanId: string +function findAttemptFailedAncestor( + spansById: Map, + originalSpan: PreparedEvent, + spanId?: string | null ) { - const event = events.get(spanId); + if (!spanId) { + return; + } + + if (originalSpan.spanId === spanId) { + return findAttemptFailedAncestor(spansById, originalSpan, originalSpan.parentId); + } + + const ancestorSpan = spansById.get(spanId); - if (!event) { + if (!ancestorSpan) { return; } - if (event.isCancelled) { - return event; + const attemptFailedEvent = (ancestorSpan.events ?? []).find( + (event) => + event.name === "attempt_failed" && + event.properties.attemptNumber === originalSpan.attemptNumber + ); + + if (attemptFailedEvent) { + return attemptFailedEvent as AttemptFailedSpanEvent; } - if (event.parentId) { - return findFirstCancelledAncestor(events, event.parentId); + if (ancestorSpan.parentId) { + return findAttemptFailedAncestor(spansById, originalSpan, ancestorSpan.parentId); } return; @@ -1895,6 +2204,10 @@ export function getDateFromNanoseconds(nanoseconds: bigint) { return new Date(Number(nanoseconds) / 1_000_000); } +function convertDateToNanoseconds(date: Date) { + return BigInt(date.getTime()) * BigInt(1_000_000); +} + function nanosecondsToMilliseconds(nanoseconds: bigint | number): number { return Number(nanoseconds) / 1_000_000; } @@ -2149,3 +2462,19 @@ function isRetriablePrismaError( return false; } + +function isEmptyObject(obj: object) { + for (var prop in obj) { + if (Object.prototype.hasOwnProperty.call(obj, prop)) { + return false; + } + } + + return true; +} +// Helper function to check if a field is empty/missing +function isEmpty(value: any): boolean { + if (value === null || value === undefined) return true; + if (typeof value === "object" && !Array.isArray(value) && isEmptyObject(value)) return true; + return false; +} diff --git a/apps/webapp/app/v3/runEngineHandlers.server.ts b/apps/webapp/app/v3/runEngineHandlers.server.ts index f40f4b0176..726cd68ff7 100644 --- a/apps/webapp/app/v3/runEngineHandlers.server.ts +++ b/apps/webapp/app/v3/runEngineHandlers.server.ts @@ -1,59 +1,66 @@ -import { $replica, prisma } from "~/db.server"; +import { tryCatch } from "@trigger.dev/core/utils"; +import { createJsonErrorObject, sanitizeError } from "@trigger.dev/core/v3"; +import { RunId } from "@trigger.dev/core/v3/isomorphic"; +import { $replica } from "~/db.server"; +import { env } from "~/env.server"; +import { findEnvironmentFromRun } from "~/models/runtimeEnvironment.server"; +import { logger } from "~/services/logger.server"; +import { updateMetadataService } from "~/services/metadata/updateMetadataInstance.server"; +import { reportInvocationUsage } from "~/services/platform.v3.server"; +import { MetadataTooLargeError } from "~/utils/packets"; import { createExceptionPropertiesFromError, eventRepository, recordRunDebugLog, } from "./eventRepository.server"; -import { createJsonErrorObject, sanitizeError } from "@trigger.dev/core/v3"; -import { logger } from "~/services/logger.server"; -import { safeJsonParse } from "~/utils/json"; -import type { Attributes } from "@opentelemetry/api"; -import { reportInvocationUsage } from "~/services/platform.v3.server"; import { roomFromFriendlyRunId, socketIo } from "./handleSocketIo.server"; import { engine } from "./runEngine.server"; import { PerformTaskRunAlertsService } from "./services/alerts/performTaskRunAlerts.server"; -import { RunId } from "@trigger.dev/core/v3/isomorphic"; -import { updateMetadataService } from "~/services/metadata/updateMetadataInstance.server"; -import { findEnvironmentFromRun } from "~/models/runtimeEnvironment.server"; -import { env } from "~/env.server"; -import { getTaskEventStoreTableForRun } from "./taskEventStore.server"; -import { MetadataTooLargeError } from "~/utils/packets"; export function registerRunEngineEventBusHandlers() { engine.eventBus.on("runSucceeded", async ({ time, run }) => { - try { - const completedEvent = await eventRepository.completeEvent( - getTaskEventStoreTableForRun(run), - run.spanId, - run.createdAt, - run.completedAt ?? undefined, - { - endTime: time, - attributes: { - isError: false, - output: - run.outputType === "application/store" || run.outputType === "text/plain" - ? run.output - : run.output - ? (safeJsonParse(run.output) as Attributes) - : undefined, - outputType: run.outputType, - }, - } - ); + const [taskRunError, taskRun] = await tryCatch( + $replica.taskRun.findFirstOrThrow({ + where: { + id: run.id, + }, + select: { + id: true, + friendlyId: true, + traceId: true, + spanId: true, + parentSpanId: true, + createdAt: true, + completedAt: true, + taskIdentifier: true, + projectId: true, + runtimeEnvironmentId: true, + environmentType: true, + isTest: true, + organizationId: true, + }, + }) + ); - if (!completedEvent) { - logger.error("[runSucceeded] Failed to complete event for unknown reason", { - runId: run.id, - spanId: run.spanId, - }); - return; - } - } catch (error) { - logger.error("[runSucceeded] Failed to complete event", { - error: error instanceof Error ? error.message : error, + if (taskRunError) { + logger.error("[runSucceeded] Failed to find task run", { + error: taskRunError, + runId: run.id, + }); + return; + } + + const [completeSuccessfulRunEventError] = await tryCatch( + eventRepository.completeSuccessfulRunEvent({ + run: taskRun, + endTime: time, + }) + ); + + if (completeSuccessfulRunEventError) { + logger.error("[runSucceeded] Failed to complete successful run event", { + error: completeSuccessfulRunEventError, runId: run.id, - spanId: run.spanId, }); } }); @@ -73,251 +80,308 @@ export function registerRunEngineEventBusHandlers() { // Handle events engine.eventBus.on("runFailed", async ({ time, run }) => { - try { - const sanitizedError = sanitizeError(run.error); - const exception = createExceptionPropertiesFromError(sanitizedError); + const sanitizedError = sanitizeError(run.error); + const exception = createExceptionPropertiesFromError(sanitizedError); - const eventStore = getTaskEventStoreTableForRun(run); - - const completedEvent = await eventRepository.completeEvent( - eventStore, - run.spanId, - run.createdAt, - run.completedAt ?? undefined, - { - endTime: time, - attributes: { - isError: true, - }, - events: [ - { - name: "exception", - time, - properties: { - exception, - }, - }, - ], - } - ); + const [taskRunError, taskRun] = await tryCatch( + $replica.taskRun.findFirstOrThrow({ + where: { + id: run.id, + }, + select: { + id: true, + friendlyId: true, + traceId: true, + spanId: true, + parentSpanId: true, + createdAt: true, + completedAt: true, + taskIdentifier: true, + projectId: true, + runtimeEnvironmentId: true, + environmentType: true, + isTest: true, + organizationId: true, + }, + }) + ); - if (!completedEvent) { - logger.error("[runFailed] Failed to complete event for unknown reason", { - runId: run.id, - spanId: run.spanId, - }); - return; - } + if (taskRunError) { + logger.error("[runFailed] Failed to find task run", { + error: taskRunError, + runId: run.id, + }); + return; + } - const inProgressEvents = await eventRepository.queryIncompleteEvents( - eventStore, - { - runId: completedEvent?.runId, - }, - run.createdAt, - run.completedAt ?? undefined - ); + const [completeFailedRunEventError] = await tryCatch( + eventRepository.completeFailedRunEvent({ + run: taskRun, + endTime: time, + exception, + }) + ); - await Promise.all( - inProgressEvents.map((event) => { - try { - const completedEvent = eventRepository.completeEvent( - eventStore, - run.spanId, - run.createdAt, - run.completedAt ?? undefined, - { - endTime: time, - attributes: { - isError: true, - }, - events: [ - { - name: "exception", - time, - properties: { - exception, - }, - }, - ], - } - ); - - if (!completedEvent) { - logger.error("[runFailed] Failed to complete in-progress event for unknown reason", { - runId: run.id, - spanId: run.spanId, - eventId: event.id, - }); - return; - } - } catch (error) { - logger.error("[runFailed] Failed to complete in-progress event", { - error: error instanceof Error ? error.message : error, - runId: run.id, - spanId: run.spanId, - eventId: event.id, - }); - } - }) - ); - } catch (error) { - logger.error("[runFailed] Failed to complete event", { - error: error instanceof Error ? error.message : error, + if (completeFailedRunEventError) { + logger.error("[runFailed] Failed to complete failed run event", { + error: completeFailedRunEventError, runId: run.id, - spanId: run.spanId, }); } }); engine.eventBus.on("runAttemptFailed", async ({ time, run }) => { - try { - const sanitizedError = sanitizeError(run.error); - const exception = createExceptionPropertiesFromError(sanitizedError); - const eventStore = getTaskEventStoreTableForRun(run); + const sanitizedError = sanitizeError(run.error); + const exception = createExceptionPropertiesFromError(sanitizedError); - const inProgressEvents = await eventRepository.queryIncompleteEvents( - eventStore, - { - runId: RunId.toFriendlyId(run.id), - spanId: { - not: run.spanId, - }, + const [taskRunError, taskRun] = await tryCatch( + $replica.taskRun.findFirstOrThrow({ + where: { + id: run.id, }, - run.createdAt, - run.completedAt ?? undefined - ); + select: { + id: true, + friendlyId: true, + traceId: true, + spanId: true, + parentSpanId: true, + createdAt: true, + completedAt: true, + taskIdentifier: true, + projectId: true, + runtimeEnvironmentId: true, + environmentType: true, + isTest: true, + organizationId: true, + }, + }) + ); - await Promise.all( - inProgressEvents.map((event) => { - return eventRepository.crashEvent({ - event: event, - crashedAt: time, - exception, - }); - }) - ); - } catch (error) { - logger.error("[runAttemptFailed] Failed to complete event", { - error: error instanceof Error ? error.message : error, + if (taskRunError) { + logger.error("[runAttemptFailed] Failed to find task run", { + error: taskRunError, runId: run.id, - spanId: run.spanId, }); + return; } - }); - engine.eventBus.on("cachedRunCompleted", async ({ time, span, blockedRunId, hasError }) => { - try { - const blockedRun = await $replica.taskRun.findFirst({ - select: { - taskEventStore: true, - }, - where: { - id: blockedRunId, - }, + const [createAttemptFailedRunEventError] = await tryCatch( + eventRepository.createAttemptFailedRunEvent({ + run: taskRun, + endTime: time, + attemptNumber: run.attemptNumber, + exception, + }) + ); + + if (createAttemptFailedRunEventError) { + logger.error("[runAttemptFailed] Failed to create attempt failed run event", { + error: createAttemptFailedRunEventError, + runId: run.id, }); + } + }); - if (!blockedRun) { - logger.error("[cachedRunCompleted] Blocked run not found", { - blockedRunId, + engine.eventBus.on( + "cachedRunCompleted", + async ({ time, span, blockedRunId, hasError, cachedRunId }) => { + const [parentSpanId, spanId] = span.id.split(":"); + + if (!spanId || !parentSpanId) { + logger.debug("[cachedRunCompleted] Invalid span id", { + spanId, + parentSpanId, }); return; } - const eventStore = getTaskEventStoreTableForRun(blockedRun); - - const completedEvent = await eventRepository.completeEvent( - eventStore, - span.id, - span.createdAt, - time, - { - endTime: time, - attributes: { - isError: hasError, + const [cachedRunError, cachedRun] = await tryCatch( + $replica.taskRun.findFirstOrThrow({ + where: { + id: cachedRunId, }, - } + select: { + id: true, + friendlyId: true, + traceId: true, + spanId: true, + parentSpanId: true, + createdAt: true, + completedAt: true, + taskIdentifier: true, + projectId: true, + runtimeEnvironmentId: true, + environmentType: true, + isTest: true, + organizationId: true, + }, + }) ); - if (!completedEvent) { - logger.error("[cachedRunCompleted] Failed to complete event for unknown reason", { - span, + if (cachedRunError) { + logger.error("[cachedRunCompleted] Failed to find cached run", { + error: cachedRunError, + cachedRunId, }); return; } - } catch (error) { - logger.error("[cachedRunCompleted] Failed to complete event for unknown reason", { - error: error instanceof Error ? error.message : error, - span, - }); - } - }); - - engine.eventBus.on("runExpired", async ({ time, run }) => { - try { - const eventStore = getTaskEventStoreTableForRun(run); - const completedEvent = await eventRepository.completeEvent( - eventStore, - run.spanId, - run.createdAt, - run.completedAt ?? undefined, - { - endTime: time, - attributes: { - isError: true, + const [blockedRunError, blockedRun] = await tryCatch( + $replica.taskRun.findFirst({ + where: { + id: blockedRunId, }, - events: [ - { - name: "exception", - time, - properties: { - exception: { - message: `Run expired because the TTL (${run.ttl}) was reached`, - }, - }, - }, - ], - } + select: { + id: true, + friendlyId: true, + traceId: true, + spanId: true, + parentSpanId: true, + createdAt: true, + completedAt: true, + taskIdentifier: true, + projectId: true, + runtimeEnvironmentId: true, + environmentType: true, + isTest: true, + organizationId: true, + }, + }) ); - if (!completedEvent) { - logger.error("[runFailed] Failed to complete event for unknown reason", { - runId: run.id, - spanId: run.spanId, + if (blockedRunError) { + logger.error("[cachedRunCompleted] Failed to find blocked run", { + error: blockedRunError, + blockedRunId, + }); + } + + if (!blockedRun) { + logger.error("[cachedRunCompleted] Blocked run not found", { + blockedRunId, }); return; } - } catch (error) { - logger.error("[runExpired] Failed to complete event", { - error: error instanceof Error ? error.message : error, + + const [completeCachedRunEventError] = await tryCatch( + eventRepository.completeCachedRunEvent({ + run: cachedRun, + blockedRun, + spanId, + parentSpanId, + spanCreatedAt: span.createdAt, + isError: hasError, + endTime: time, + }) + ); + + if (completeCachedRunEventError) { + logger.error("[cachedRunCompleted] Failed to complete cached run event", { + error: completeCachedRunEventError, + cachedRunId, + }); + } + } + ); + + engine.eventBus.on("runExpired", async ({ time, run }) => { + if (!run.ttl) { + return; + } + + const [taskRunError, taskRun] = await tryCatch( + $replica.taskRun.findFirstOrThrow({ + where: { + id: run.id, + }, + select: { + id: true, + friendlyId: true, + traceId: true, + spanId: true, + parentSpanId: true, + createdAt: true, + completedAt: true, + taskIdentifier: true, + projectId: true, + runtimeEnvironmentId: true, + environmentType: true, + isTest: true, + organizationId: true, + }, + }) + ); + + if (taskRunError) { + logger.error("[runExpired] Failed to find task run", { + error: taskRunError, + runId: run.id, + }); + return; + } + + const [completeExpiredRunEventError] = await tryCatch( + eventRepository.completeExpiredRunEvent({ + run: taskRun, + endTime: time, + ttl: run.ttl, + }) + ); + + if (completeExpiredRunEventError) { + logger.error("[runExpired] Failed to complete expired run event", { + error: completeExpiredRunEventError, runId: run.id, - spanId: run.spanId, }); } }); engine.eventBus.on("runCancelled", async ({ time, run }) => { - try { - const eventStore = getTaskEventStoreTableForRun(run); - - const inProgressEvents = await eventRepository.queryIncompleteEvents( - eventStore, - { - runId: run.friendlyId, + const [taskRunError, taskRun] = await tryCatch( + $replica.taskRun.findFirstOrThrow({ + where: { + id: run.id, }, - run.createdAt, - run.completedAt ?? undefined - ); + select: { + id: true, + friendlyId: true, + traceId: true, + spanId: true, + parentSpanId: true, + createdAt: true, + completedAt: true, + taskIdentifier: true, + projectId: true, + runtimeEnvironmentId: true, + environmentType: true, + isTest: true, + organizationId: true, + }, + }) + ); - const error = createJsonErrorObject(run.error); + if (taskRunError) { + logger.error("[runCancelled] Task run not found", { + error: taskRunError, + runId: run.id, + }); + return; + } - await eventRepository.cancelEvents(inProgressEvents, time, error.message); - } catch (error) { - logger.error("[runCancelled] Failed to cancel event", { - error: error instanceof Error ? error.message : error, + const error = createJsonErrorObject(run.error); + + const [cancelRunEventError] = await tryCatch( + eventRepository.cancelRunEvent({ + reason: error.message, + run: taskRun, + cancelledAt: time, + }) + ); + + if (cancelRunEventError) { + logger.error("[runCancelled] Failed to cancel run event", { + error: cancelRunEventError, runId: run.id, - spanId: run.spanId, }); } }); diff --git a/apps/webapp/app/v3/services/cancelAttempt.server.ts b/apps/webapp/app/v3/services/cancelAttempt.server.ts index b889705e1e..04d61f42f8 100644 --- a/apps/webapp/app/v3/services/cancelAttempt.server.ts +++ b/apps/webapp/app/v3/services/cancelAttempt.server.ts @@ -72,25 +72,6 @@ export class CancelAttemptService extends BaseService { error: isCancellable ? { type: "STRING_ERROR", raw: reason } : undefined, }); }); - - const inProgressEvents = await eventRepository.queryIncompleteEvents( - getTaskEventStoreTableForRun(taskRunAttempt.taskRun), - { - runId: taskRunAttempt.taskRun.friendlyId, - }, - taskRunAttempt.taskRun.createdAt, - taskRunAttempt.taskRun.completedAt ?? undefined - ); - - logger.debug("Cancelling in-progress events", { - inProgressEvents: inProgressEvents.map((event) => event.id), - }); - - await Promise.all( - inProgressEvents.map((event) => { - return eventRepository.cancelEvent(event, cancelledAt, reason); - }) - ); }); } } diff --git a/apps/webapp/app/v3/services/cancelTaskRunV1.server.ts b/apps/webapp/app/v3/services/cancelTaskRunV1.server.ts index 8c208265de..ea152233ab 100644 --- a/apps/webapp/app/v3/services/cancelTaskRunV1.server.ts +++ b/apps/webapp/app/v3/services/cancelTaskRunV1.server.ts @@ -1,4 +1,4 @@ -import { type Prisma, type TaskRun } from "@trigger.dev/database"; +import { type Prisma } from "@trigger.dev/database"; import assertNever from "assert-never"; import { logger } from "~/services/logger.server"; import { eventRepository } from "../eventRepository.server"; @@ -8,9 +8,9 @@ import { CANCELLABLE_ATTEMPT_STATUSES, isCancellableRunStatus } from "../taskSta import { BaseService } from "./baseService.server"; import { CancelAttemptService } from "./cancelAttempt.server"; import { CancelTaskAttemptDependenciesService } from "./cancelTaskAttemptDependencies.server"; -import { FinalizeTaskRunService } from "./finalizeTaskRun.server"; -import { getTaskEventStoreTableForRun } from "../taskEventStore.server"; import { CancelableTaskRun } from "./cancelTaskRun.server"; +import { FinalizeTaskRunService } from "./finalizeTaskRun.server"; +import { tryCatch } from "@trigger.dev/core/utils"; type ExtendedTaskRun = Prisma.TaskRunGetPayload<{ include: { @@ -92,6 +92,7 @@ export class CancelTaskRunServiceV1 extends BaseService { }, runtimeEnvironment: true, lockedToVersion: true, + project: true, }, attemptStatus: "CANCELED", error: { @@ -100,21 +101,20 @@ export class CancelTaskRunServiceV1 extends BaseService { }, }); - const inProgressEvents = await eventRepository.queryIncompleteEvents( - getTaskEventStoreTableForRun(taskRun), - { - runId: taskRun.friendlyId, - }, - taskRun.createdAt, - taskRun.completedAt ?? undefined + const [cancelRunEventError] = await tryCatch( + eventRepository.cancelRunEvent({ + reason: opts.reason, + run: cancelledTaskRun, + cancelledAt: opts.cancelledAt, + }) ); - logger.debug("Cancelling in-progress events", { - inProgressEvents: inProgressEvents.map((event) => event.id), - eventCount: inProgressEvents.length, - }); - - await eventRepository.cancelEvents(inProgressEvents, opts.cancelledAt, opts.reason); + if (cancelRunEventError) { + logger.error("[CancelTaskRunServiceV1] Failed to cancel run event", { + error: cancelRunEventError, + runId: cancelledTaskRun.id, + }); + } // Cancel any in progress attempts if (opts.cancelAttempts) { diff --git a/apps/webapp/app/v3/services/completeAttempt.server.ts b/apps/webapp/app/v3/services/completeAttempt.server.ts index b9033a97e9..7998204b62 100644 --- a/apps/webapp/app/v3/services/completeAttempt.server.ts +++ b/apps/webapp/app/v3/services/completeAttempt.server.ts @@ -33,6 +33,7 @@ import { CancelAttemptService } from "./cancelAttempt.server"; import { CreateCheckpointService } from "./createCheckpoint.server"; import { FinalizeTaskRunService } from "./finalizeTaskRun.server"; import { RetryAttemptService } from "./retryAttempt.server"; +import { tryCatch } from "@trigger.dev/core/utils"; type FoundAttempt = Awaited>; @@ -164,27 +165,20 @@ export class CompleteAttemptService extends BaseService { env, }); - // Now we need to "complete" the task run event/span - await eventRepository.completeEvent( - getTaskEventStoreTableForRun(taskRunAttempt.taskRun), - taskRunAttempt.taskRun.spanId, - taskRunAttempt.taskRun.createdAt, - taskRunAttempt.taskRun.completedAt ?? undefined, - { + const [completeSuccessfulRunEventError] = await tryCatch( + eventRepository.completeSuccessfulRunEvent({ + run: taskRunAttempt.taskRun, endTime: new Date(), - attributes: { - isError: false, - output: - completion.outputType === "application/store" || completion.outputType === "text/plain" - ? completion.output - : completion.output - ? (safeJsonParse(completion.output) as Attributes) - : undefined, - outputType: completion.outputType, - }, - } + }) ); + if (completeSuccessfulRunEventError) { + logger.error("[CompleteAttemptService] Failed to complete successful run event", { + error: completeSuccessfulRunEventError, + runId: taskRunAttempt.taskRunId, + }); + } + return "COMPLETED"; } @@ -322,29 +316,21 @@ export class CompleteAttemptService extends BaseService { exitRun(taskRunAttempt.taskRunId); } - // Now we need to "complete" the task run event/span - await eventRepository.completeEvent( - getTaskEventStoreTableForRun(taskRunAttempt.taskRun), - taskRunAttempt.taskRun.spanId, - taskRunAttempt.taskRun.createdAt, - taskRunAttempt.taskRun.completedAt ?? undefined, - { + const [completeFailedRunEventError] = await tryCatch( + eventRepository.completeFailedRunEvent({ + run: taskRunAttempt.taskRun, endTime: failedAt, - attributes: { - isError: true, - }, - events: [ - { - name: "exception", - time: failedAt, - properties: { - exception: createExceptionPropertiesFromError(sanitizedError), - }, - }, - ], - } + exception: createExceptionPropertiesFromError(sanitizedError), + }) ); + if (completeFailedRunEventError) { + logger.error("[CompleteAttemptService] Failed to complete failed run event", { + error: completeFailedRunEventError, + runId: taskRunAttempt.taskRunId, + }); + } + await this._prisma.taskRun.update({ where: { id: taskRunAttempt.taskRunId, @@ -385,64 +371,43 @@ export class CompleteAttemptService extends BaseService { return "COMPLETED"; } - const inProgressEvents = await eventRepository.queryIncompleteEvents( - getTaskEventStoreTableForRun(taskRunAttempt.taskRun), - { - runId: taskRunAttempt.taskRun.friendlyId, - }, - taskRunAttempt.taskRun.createdAt, - taskRunAttempt.taskRun.completedAt ?? undefined - ); - // Handle in-progress events switch (status) { case "CRASHED": { - logger.debug("[CompleteAttemptService] Crashing in-progress events", { - inProgressEvents: inProgressEvents.map((event) => event.id), - }); - - await Promise.all( - inProgressEvents.map((event) => { - return eventRepository.crashEvent({ - event, - crashedAt: failedAt, - exception: createExceptionPropertiesFromError(sanitizedError), - }); + const [createAttemptFailedEventError] = await tryCatch( + eventRepository.createAttemptFailedRunEvent({ + run: taskRunAttempt.taskRun, + endTime: failedAt, + attemptNumber: taskRunAttempt.number, + exception: createExceptionPropertiesFromError(sanitizedError), }) ); + if (createAttemptFailedEventError) { + logger.error("[CompleteAttemptService] Failed to create attempt failed run event", { + error: createAttemptFailedEventError, + runId: taskRunAttempt.taskRunId, + }); + } + break; } case "SYSTEM_FAILURE": { - logger.debug("[CompleteAttemptService] Failing in-progress events", { - inProgressEvents: inProgressEvents.map((event) => event.id), - }); - - await Promise.all( - inProgressEvents.map((event) => { - return eventRepository.completeEvent( - getTaskEventStoreTableForRun(taskRunAttempt.taskRun), - event.spanId, - taskRunAttempt.taskRun.createdAt, - taskRunAttempt.taskRun.completedAt ?? undefined, - { - endTime: failedAt, - attributes: { - isError: true, - }, - events: [ - { - name: "exception", - time: failedAt, - properties: { - exception: createExceptionPropertiesFromError(sanitizedError), - }, - }, - ], - } - ); + const [createAttemptFailedEventError] = await tryCatch( + eventRepository.createAttemptFailedRunEvent({ + run: taskRunAttempt.taskRun, + endTime: failedAt, + attemptNumber: taskRunAttempt.number, + exception: createExceptionPropertiesFromError(sanitizedError), }) ); + + if (createAttemptFailedEventError) { + logger.error("[CompleteAttemptService] Failed to create attempt failed run event", { + error: createAttemptFailedEventError, + runId: taskRunAttempt.taskRunId, + }); + } } } diff --git a/apps/webapp/app/v3/services/crashTaskRun.server.ts b/apps/webapp/app/v3/services/crashTaskRun.server.ts index b6c463a53c..61ef107dab 100644 --- a/apps/webapp/app/v3/services/crashTaskRun.server.ts +++ b/apps/webapp/app/v3/services/crashTaskRun.server.ts @@ -8,6 +8,7 @@ import { sanitizeError, TaskRunErrorCodes, TaskRunInternalError } from "@trigger import { FinalizeTaskRunService } from "./finalizeTaskRun.server"; import { FailedTaskRunRetryHelper } from "../failedTaskRun.server"; import { getTaskEventStoreTableForRun } from "../taskEventStore.server"; +import { tryCatch } from "@trigger.dev/core/utils"; export type CrashTaskRunServiceOptions = { reason?: string; @@ -120,34 +121,25 @@ export class CrashTaskRunService extends BaseService { }, }); - const inProgressEvents = await eventRepository.queryIncompleteEvents( - getTaskEventStoreTableForRun(taskRun), - { - runId: taskRun.friendlyId, - }, - taskRun.createdAt, - taskRun.completedAt ?? undefined, - options?.overrideCompletion - ); - - logger.debug("[CrashTaskRunService] Crashing in-progress events", { - inProgressEvents: inProgressEvents.map((event) => event.id), - }); - - await Promise.all( - inProgressEvents.map((event) => { - return eventRepository.crashEvent({ - event: event, - crashedAt: opts.crashedAt, - exception: { - type: opts.errorCode ?? TaskRunErrorCodes.TASK_RUN_CRASHED, - message: opts.reason, - stacktrace: opts.logs, - }, - }); + const [createAttemptFailedEventError] = await tryCatch( + eventRepository.completeFailedRunEvent({ + run: crashedTaskRun, + endTime: opts.crashedAt, + exception: { + type: opts.errorCode ?? TaskRunErrorCodes.TASK_RUN_CRASHED, + message: opts.reason, + stacktrace: opts.logs, + }, }) ); + if (createAttemptFailedEventError) { + logger.error("[CrashTaskRunService] Failed to complete failed run event", { + error: createAttemptFailedEventError, + runId: crashedTaskRun.id, + }); + } + if (!opts.crashAttempts) { return; } diff --git a/apps/webapp/app/v3/services/expireEnqueuedRun.server.ts b/apps/webapp/app/v3/services/expireEnqueuedRun.server.ts index 9e97795eaa..6658907b0d 100644 --- a/apps/webapp/app/v3/services/expireEnqueuedRun.server.ts +++ b/apps/webapp/app/v3/services/expireEnqueuedRun.server.ts @@ -5,6 +5,7 @@ import { eventRepository } from "../eventRepository.server"; import { BaseService } from "./baseService.server"; import { FinalizeTaskRunService } from "./finalizeTaskRun.server"; import { getTaskEventStoreTableForRun } from "../taskEventStore.server"; +import { tryCatch } from "@trigger.dev/core/utils"; export class ExpireEnqueuedRunService extends BaseService { public static async ack(runId: string, tx?: PrismaClientOrTransaction) { @@ -78,28 +79,21 @@ export class ExpireEnqueuedRunService extends BaseService { }, }); - await eventRepository.completeEvent( - getTaskEventStoreTableForRun(run), - run.spanId, - run.createdAt, - run.completedAt ?? undefined, - { - endTime: new Date(), - attributes: { - isError: true, - }, - events: [ - { - name: "exception", - time: new Date(), - properties: { - exception: { - message: `Run expired because the TTL (${run.ttl}) was reached`, - }, - }, - }, - ], + if (run.ttl) { + const [completeExpiredRunEventError] = await tryCatch( + eventRepository.completeExpiredRunEvent({ + run, + endTime: new Date(), + ttl: run.ttl, + }) + ); + + if (completeExpiredRunEventError) { + logger.error("[ExpireEnqueuedRunService] Failed to complete expired run event", { + error: completeExpiredRunEventError, + runId: run.id, + }); } - ); + } } } diff --git a/apps/webapp/app/v3/taskEventStore.server.ts b/apps/webapp/app/v3/taskEventStore.server.ts index 27fc498112..251db6642c 100644 --- a/apps/webapp/app/v3/taskEventStore.server.ts +++ b/apps/webapp/app/v3/taskEventStore.server.ts @@ -21,6 +21,7 @@ export type TraceEvent = Pick< | "events" | "environmentType" | "kind" + | "attemptNumber" >; export type DetailedTraceEvent = Pick< @@ -47,6 +48,7 @@ export type DetailedTraceEvent = Pick< | "machinePreset" | "properties" | "output" + | "attemptNumber" >; export type TaskEventStoreTable = "taskEvent" | "taskEventPartitioned"; @@ -188,7 +190,8 @@ export class TaskEventStore { level, events, "environmentType", - "kind" + "kind", + "attemptNumber" FROM "TaskEventPartitioned" WHERE "traceId" = ${traceId} @@ -220,7 +223,8 @@ export class TaskEventStore { level, events, "environmentType", - "kind" + "kind", + "attemptNumber" FROM "TaskEvent" WHERE "traceId" = ${traceId} ${ @@ -273,7 +277,8 @@ export class TaskEventStore { "queueName", "machinePreset", properties, - output + output, + "attemptNumber" FROM "TaskEventPartitioned" WHERE "traceId" = ${traceId} @@ -311,7 +316,8 @@ export class TaskEventStore { "queueName", "machinePreset", properties, - output + output, + "attemptNumber" FROM "TaskEvent" WHERE "traceId" = ${traceId} ${ diff --git a/internal-packages/run-engine/src/engine/eventBus.ts b/internal-packages/run-engine/src/engine/eventBus.ts index 178bdd5b3b..c602e47b30 100644 --- a/internal-packages/run-engine/src/engine/eventBus.ts +++ b/internal-packages/run-engine/src/engine/eventBus.ts @@ -287,6 +287,7 @@ export type EventBusEvents = { }; hasError: boolean; blockedRunId: string; + cachedRunId?: string; }, ]; runMetadataUpdated: [ diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index 74062ed17e..4ebbd55779 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -160,6 +160,7 @@ export class WaitpointSystem { }, blockedRunId: run.taskRunId, hasError: output?.isError ?? false, + cachedRunId: waitpoint.completedByTaskRunId ?? undefined, }); } } diff --git a/packages/core/src/v3/schemas/openTelemetry.ts b/packages/core/src/v3/schemas/openTelemetry.ts index 140f1c45b8..47d9d9a50d 100644 --- a/packages/core/src/v3/schemas/openTelemetry.ts +++ b/packages/core/src/v3/schemas/openTelemetry.ts @@ -28,6 +28,18 @@ export const CancellationSpanEvent = z.object({ export type CancellationSpanEvent = z.infer; +export const AttemptFailedSpanEvent = z.object({ + name: z.literal("attempt_failed"), + time: z.coerce.date(), + properties: z.object({ + exception: ExceptionEventProperties, + attemptNumber: z.number(), + runId: z.string(), + }), +}); + +export type AttemptFailedSpanEvent = z.infer; + export const OtherSpanEvent = z.object({ name: z.string(), time: z.coerce.date(), @@ -36,7 +48,12 @@ export const OtherSpanEvent = z.object({ export type OtherSpanEvent = z.infer; -export const SpanEvent = z.union([ExceptionSpanEvent, CancellationSpanEvent, OtherSpanEvent]); +export const SpanEvent = z.union([ + ExceptionSpanEvent, + CancellationSpanEvent, + AttemptFailedSpanEvent, + OtherSpanEvent, +]); export type SpanEvent = z.infer; @@ -52,6 +69,10 @@ export function isCancellationSpanEvent(event: SpanEvent): event is Cancellation return event.name === "cancellation"; } +export function isAttemptFailedSpanEvent(event: SpanEvent): event is AttemptFailedSpanEvent { + return event.name === "attempt_failed"; +} + export const SpanMessagingEvent = z.object({ system: z.string().optional(), client_id: z.string().optional(), diff --git a/packages/core/src/v3/workers/taskExecutor.ts b/packages/core/src/v3/workers/taskExecutor.ts index 38e7d7c19a..6de4c77a5f 100644 --- a/packages/core/src/v3/workers/taskExecutor.ts +++ b/packages/core/src/v3/workers/taskExecutor.ts @@ -352,8 +352,7 @@ export class TaskExecutor { ? runTimelineMetrics.convertMetricsToSpanEvents() : undefined, }, - traceContext.extractContext(), - signal + traceContext.extractContext() ); return { result }; diff --git a/references/hello-world/src/trigger/example.ts b/references/hello-world/src/trigger/example.ts index 14fdd0faad..b329faa741 100644 --- a/references/hello-world/src/trigger/example.ts +++ b/references/hello-world/src/trigger/example.ts @@ -1,6 +1,7 @@ import { batch, logger, task, tasks, timeout, wait } from "@trigger.dev/sdk"; import { setTimeout } from "timers/promises"; import { ResourceMonitor } from "../resourceMonitor.js"; +import { fixedLengthTask } from "./batches.js"; export const helloWorldTask = task({ id: "hello-world", diff --git a/references/hello-world/src/trigger/idempotency.ts b/references/hello-world/src/trigger/idempotency.ts index 6ea7c1c5ea..55e3c0bc9c 100644 --- a/references/hello-world/src/trigger/idempotency.ts +++ b/references/hello-world/src/trigger/idempotency.ts @@ -282,3 +282,22 @@ export const idempotencyTriggerByTaskAndWait = task({ logger.log("Results 2", { results2 }); }, }); + +export const idempotencyTriggerAndWaitWithInProgressRun = task({ + id: "idempotency-trigger-and-wait-with-in-progress-run", + maxDuration: 60, + run: async () => { + await childTask.trigger( + { message: "Hello, world!", duration: 5000, failureChance: 100 }, + { + idempotencyKey: "b", + } + ); + await childTask.triggerAndWait( + { message: "Hello, world!", duration: 5000, failureChance: 0 }, + { + idempotencyKey: "b", + } + ); + }, +});