Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
cc9f2f1
WIP
ericallam Sep 21, 2025
233a5b2
WIP clickhouse repo
ericallam Sep 21, 2025
2d12c7b
wip
ericallam Sep 21, 2025
9286632
cleaned up what is returned from EventRepository.getSpan
ericallam Sep 21, 2025
4b0f413
simplify getSpan and remove some unnecessary properties
ericallam Sep 21, 2025
f1f9ef2
removed a bunch of unnecessary data
ericallam Sep 22, 2025
6f4be25
use feature flags to determine the run taskEvent store
ericallam Sep 22, 2025
fd49360
more clickhouse events stuff
ericallam Sep 23, 2025
793c00a
implement more
ericallam Sep 23, 2025
1f68ff8
Runs always show a trace
ericallam Sep 24, 2025
a27f6a7
crashEvent isn't used anymore
ericallam Sep 24, 2025
f9c70c9
implement ancestor overrides for trace summary
ericallam Sep 24, 2025
4c79831
implement getSpan
ericallam Sep 24, 2025
1616bf4
support debug logs
ericallam Sep 24, 2025
7c5ac9b
add support to prod v4 and v3 runs
ericallam Sep 24, 2025
bb66b43
v3 task trigger works with clickhouse
ericallam Sep 24, 2025
fe142b6
wip trace detail
ericallam Sep 25, 2025
d6cf5e4
Some fixes for the conflicts
ericallam Sep 26, 2025
8f02687
implemented both detailed trace view and events/download logs in clic…
ericallam Sep 27, 2025
2494ee4
move over the remaining v3 places that used the postgresql event repo…
ericallam Sep 27, 2025
ebdb5e9
Add percent-based rollout of clickhouse logs
ericallam Sep 27, 2025
73f42db
Fix cache memory store unbounded memory usage
ericallam Sep 29, 2025
cd3cff7
Pass the traceId into getSpanOriginalRunId
ericallam Sep 29, 2025
7d81d5d
better content type checking
ericallam Sep 29, 2025
ee299b0
set override duration correctly
ericallam Sep 29, 2025
cde851f
Improved flushing condition
ericallam Sep 29, 2025
f3de091
performance improvements
ericallam Oct 1, 2025
865a704
Add usage mvs and tables
ericallam Oct 1, 2025
bdbb18f
Allow more live reloading when using clickhouse event repo
ericallam Oct 1, 2025
2515fd5
increase log expiration ttl to 1 year
ericallam Oct 1, 2025
a98a803
Fix a few coderabbit comments
ericallam Oct 1, 2025
17a2cba
fix typecheck errors
ericallam Oct 1, 2025
acec66d
check attributes
ericallam Oct 1, 2025
4dac953
Remove unneeded file
ericallam Oct 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@
"**/node_modules/**": true,
"packages/cli-v3/e2e": true
},
"vitest.disableWorkspaceWarning": true
"vitest.disableWorkspaceWarning": true,
"typescript.experimental.useTsgo": false
}
5 changes: 5 additions & 0 deletions apps/webapp/app/entry.server.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ import { logger } from "./services/logger.server";
import { Prisma } from "./db.server";
import { registerRunEngineEventBusHandlers } from "./v3/runEngineHandlers.server";
import { remoteBuildsEnabled } from "./v3/remoteImageBuilder.server";
import { resourceMonitor } from "./services/resourceMonitor.server";

if (env.EVENT_LOOP_MONITOR_ENABLED === "1") {
eventLoopMonitor.enable();
Expand All @@ -250,3 +251,7 @@ if (remoteBuildsEnabled()) {
} else {
console.log("🏗️ Local builds enabled");
}

if (env.RESOURCE_MONITOR_ENABLED === "1") {
resourceMonitor.startMonitoring(1000);
}
18 changes: 18 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ const EnvironmentSchema = z
CENTS_PER_RUN: z.coerce.number().default(0),

EVENT_LOOP_MONITOR_ENABLED: z.string().default("1"),
RESOURCE_MONITOR_ENABLED: z.string().default("0"),
MAXIMUM_LIVE_RELOADING_EVENTS: z.coerce.number().int().default(1000),
MAXIMUM_TRACE_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(25_000),
MAXIMUM_TRACE_DETAILED_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(10_000),
Expand Down Expand Up @@ -1110,6 +1111,23 @@ const EnvironmentSchema = z
CLICKHOUSE_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"),

EVENTS_CLICKHOUSE_URL: z
.string()
.optional()
.transform((v) => v ?? process.env.CLICKHOUSE_URL),
EVENTS_CLICKHOUSE_KEEP_ALIVE_ENABLED: z.string().default("1"),
EVENTS_CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(),
EVENTS_CLICKHOUSE_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10),
EVENTS_CLICKHOUSE_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
EVENTS_CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"),
EVENTS_CLICKHOUSE_BATCH_SIZE: z.coerce.number().int().default(1000),
EVENTS_CLICKHOUSE_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),
EVENT_REPOSITORY_CLICKHOUSE_ROLLOUT_PERCENT: z.coerce.number().optional(),
EVENT_REPOSITORY_DEFAULT_STORE: z.enum(["postgres", "clickhouse"]).default("postgres"),
EVENTS_CLICKHOUSE_MAX_TRACE_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(25_000),
EVENTS_CLICKHOUSE_MAX_TRACE_DETAILED_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(5_000),
EVENTS_CLICKHOUSE_MAX_LIVE_RELOADING_SETTING: z.coerce.number().int().default(2000),

// Bootstrap
TRIGGER_BOOTSTRAP_ENABLED: z.string().default("0"),
TRIGGER_BOOTSTRAP_WORKER_GROUP_NAME: z.string().optional(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ export class ApiRetrieveRunPresenter {
},
select: {
...commonRunSelect,
traceId: true,
payload: true,
payloadType: true,
output: true,
Expand Down
50 changes: 45 additions & 5 deletions apps/webapp/app/presenters/v3/RunPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ import { createTreeFromFlatItems, flattenTree } from "~/components/primitives/Tr
import { prisma, type PrismaClient } from "~/db.server";
import { createTimelineSpanEventsFromSpanEvents } from "~/utils/timelineSpanEvents";
import { getUsername } from "~/utils/username";
import { eventRepository } from "~/v3/eventRepository.server";
import { resolveEventRepositoryForStore } from "~/v3/eventRepository/index.server";
import { SpanSummary } from "~/v3/eventRepository/eventRepository.types";
import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server";
import { isFinalRunStatus } from "~/v3/taskStatus";
import { env } from "~/env.server";

type Result = Awaited<ReturnType<RunPresenter["call"]>>;
export type Run = Result["run"];
Expand Down Expand Up @@ -45,9 +47,11 @@ export class RunPresenter {
id: true,
createdAt: true,
taskEventStore: true,
taskIdentifier: true,
number: true,
traceId: true,
spanId: true,
parentSpanId: true,
friendlyId: true,
status: true,
startedAt: true,
Expand Down Expand Up @@ -137,21 +141,55 @@ export class RunPresenter {
return {
run: runData,
trace: undefined,
maximumLiveReloadingSetting: env.MAXIMUM_LIVE_RELOADING_EVENTS,
};
}

const eventRepository = resolveEventRepositoryForStore(run.taskEventStore);

// get the events
const traceSummary = await eventRepository.getTraceSummary(
let traceSummary = await eventRepository.getTraceSummary(
getTaskEventStoreTableForRun(run),
run.runtimeEnvironment.id,
run.traceId,
run.rootTaskRun?.createdAt ?? run.createdAt,
run.completedAt ?? undefined,
{ includeDebugLogs: showDebug }
);

if (!traceSummary) {
return {
run: runData,
trace: undefined,
const spanSummary: SpanSummary = {
id: run.spanId,
parentId: run.parentSpanId ?? undefined,
runId: run.friendlyId,
data: {
message: run.taskIdentifier,
style: { icon: "task", variant: "primary" },
events: [],
startTime: run.createdAt,
duration: 0,
isError:
run.status === "COMPLETED_WITH_ERRORS" ||
run.status === "CRASHED" ||
run.status === "EXPIRED" ||
run.status === "SYSTEM_FAILURE" ||
run.status === "TIMED_OUT",
isPartial:
run.status === "DELAYED" ||
run.status === "PENDING" ||
run.status === "PAUSED" ||
run.status === "RETRYING_AFTER_FAILURE" ||
run.status === "DEQUEUED" ||
run.status === "EXECUTING",
isCancelled: run.status === "CANCELED",
isDebug: false,
level: "TRACE",
},
};

traceSummary = {
rootSpan: spanSummary,
spans: [spanSummary],
};
}

Expand Down Expand Up @@ -220,7 +258,9 @@ export class RunPresenter {
queuedDuration: run.startedAt
? millisecondsToNanoseconds(run.startedAt.getTime() - run.createdAt.getTime())
: undefined,
overridesBySpanId: traceSummary.overridesBySpanId,
},
maximumLiveReloadingSetting: eventRepository.maximumLiveReloadingSetting,
};
}
}
4 changes: 2 additions & 2 deletions apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { eventStream } from "remix-utils/sse/server";
import { PrismaClient, prisma } from "~/db.server";
import { logger } from "~/services/logger.server";
import { throttle } from "~/utils/throttle";
import { eventRepository } from "~/v3/eventRepository.server";
import { tracePubSub } from "~/v3/services/tracePubSub.server";

const pingInterval = 1000;

Expand Down Expand Up @@ -41,7 +41,7 @@ export class RunStreamPresenter {

let pinger: NodeJS.Timeout | undefined = undefined;

const { unsubscribe, eventEmitter } = await eventRepository.subscribeToTrace(run.traceId);
const { unsubscribe, eventEmitter } = await tracePubSub.subscribeToTrace(run.traceId);

return eventStream(request.signal, (send, close) => {
const safeSend = (args: { event?: string; data: string }) => {
Expand Down
96 changes: 64 additions & 32 deletions apps/webapp/app/presenters/v3/SpanPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,23 @@ import {
import { AttemptId, getMaxDuration, parseTraceparent } from "@trigger.dev/core/v3/isomorphic";
import { RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus";
import { logger } from "~/services/logger.server";
import { eventRepository, rehydrateAttribute } from "~/v3/eventRepository.server";
import { rehydrateAttribute } from "~/v3/eventRepository/eventRepository.server";
import { machinePresetFromRun } from "~/v3/machinePresets.server";
import { getTaskEventStoreTableForRun, type TaskEventStoreTable } from "~/v3/taskEventStore.server";
import { isFailedRunStatus, isFinalRunStatus } from "~/v3/taskStatus";
import { BasePresenter } from "./basePresenter.server";
import { WaitpointPresenter } from "./WaitpointPresenter.server";
import { engine } from "~/v3/runEngine.server";
import { resolveEventRepositoryForStore } from "~/v3/eventRepository/index.server";
import { IEventRepository, SpanDetail } from "~/v3/eventRepository/eventRepository.types";

type Result = Awaited<ReturnType<SpanPresenter["call"]>>;
export type Span = NonNullable<NonNullable<Result>["span"]>;
export type SpanRun = NonNullable<NonNullable<Result>["run"]>;
type FindRunResult = NonNullable<
Awaited<ReturnType<InstanceType<typeof SpanPresenter>["findRun"]>>
>;
type GetSpanResult = NonNullable<Awaited<ReturnType<(typeof eventRepository)["getSpan"]>>>;
type GetSpanResult = SpanDetail;

export class SpanPresenter extends BasePresenter {
public async call({
Expand Down Expand Up @@ -74,14 +76,20 @@ export class SpanPresenter extends BasePresenter {
return;
}

const { traceId } = parentRun;

const eventRepository = resolveEventRepositoryForStore(parentRun.taskEventStore);

const eventStore = getTaskEventStoreTableForRun(parentRun);

const run = await this.getRun({
eventStore,
environmentId: parentRun.runtimeEnvironmentId,
traceId,
eventRepository,
spanId,
createdAt: parentRun.createdAt,
completedAt: parentRun.completedAt,
environmentId: parentRun.runtimeEnvironmentId,
});
if (run) {
return {
Expand All @@ -93,10 +101,12 @@ export class SpanPresenter extends BasePresenter {
const span = await this.#getSpan({
eventStore,
spanId,
traceId,
environmentId: parentRun.runtimeEnvironmentId,
projectId: parentRun.projectId,
createdAt: parentRun.createdAt,
completedAt: parentRun.completedAt,
eventRepository,
});

if (!span) {
Expand All @@ -112,29 +122,30 @@ export class SpanPresenter extends BasePresenter {
async getRun({
eventStore,
environmentId,
traceId,
eventRepository,
spanId,
createdAt,
completedAt,
}: {
eventStore: TaskEventStoreTable;
environmentId: string;
traceId: string;
eventRepository: IEventRepository;
spanId: string;
createdAt: Date;
completedAt: Date | null;
}) {
const span = await eventRepository.getSpan({
storeTable: eventStore,
spanId,
const originalRunId = await eventRepository.getSpanOriginalRunId(
eventStore,
environmentId,
startCreatedAt: createdAt,
endCreatedAt: completedAt ?? undefined,
});

if (!span) {
return;
}
spanId,
traceId,
createdAt,
completedAt ?? undefined
);

const run = await this.findRun({ span, spanId });
const run = await this.findRun({ originalRunId, spanId, environmentId });

if (!run) {
return;
Expand Down Expand Up @@ -259,7 +270,7 @@ export class SpanPresenter extends BasePresenter {
workerQueue: run.workerQueue,
traceId: run.traceId,
spanId: run.spanId,
isCached: !!span.originalRun,
isCached: !!originalRunId,
machinePreset: machine?.name,
externalTraceId,
};
Expand Down Expand Up @@ -294,7 +305,15 @@ export class SpanPresenter extends BasePresenter {
};
}

async findRun({ span, spanId }: { span: GetSpanResult; spanId: string }) {
async findRun({
originalRunId,
spanId,
environmentId,
}: {
originalRunId?: string;
spanId: string;
environmentId: string;
}) {
const run = await this._replica.taskRun.findFirst({
select: {
id: true,
Expand Down Expand Up @@ -404,12 +423,14 @@ export class SpanPresenter extends BasePresenter {
},
},
},
where: span.originalRun
where: originalRunId
? {
friendlyId: span.originalRun,
friendlyId: originalRunId,
runtimeEnvironmentId: environmentId,
}
: {
spanId,
runtimeEnvironmentId: environmentId,
},
});

Expand All @@ -418,27 +439,32 @@ export class SpanPresenter extends BasePresenter {

async #getSpan({
eventStore,
eventRepository,
traceId,
spanId,
environmentId,
projectId,
createdAt,
completedAt,
}: {
eventRepository: IEventRepository;
traceId: string;
spanId: string;
environmentId: string;
projectId: string;
eventStore: TaskEventStoreTable;
createdAt: Date;
completedAt: Date | null;
}) {
const span = await eventRepository.getSpan({
storeTable: eventStore,
spanId,
const span = await eventRepository.getSpan(
eventStore,
environmentId,
startCreatedAt: createdAt,
endCreatedAt: completedAt ?? undefined,
options: { includeDebugLogs: true },
});
spanId,
traceId,
createdAt,
completedAt ?? undefined,
{ includeDebugLogs: true }
);

if (!span) {
return;
Expand All @@ -451,23 +477,29 @@ export class SpanPresenter extends BasePresenter {
spanId: true,
createdAt: true,
number: true,
lockedToVersion: {
select: {
version: true,
},
},
taskVersion: true,
},
where: {
parentSpanId: spanId,
},
});

const data = {
...span,
spanId: span.spanId,
parentId: span.parentId,
message: span.message,
isError: span.isError,
isPartial: span.isPartial,
isCancelled: span.isCancelled,
level: span.level,
startTime: span.startTime,
duration: span.duration,
events: span.events,
style: span.style,
properties: span.properties ? JSON.stringify(span.properties, null, 2) : undefined,
entity: span.entity,
metadata: span.metadata,
triggeredRuns,
showActionBar: span.show?.actions === true,
};

switch (span.entity.type) {
Expand Down
Loading