Skip to content

Commit 128bc43

Browse files
authored
feat(otel): Add support for storing run spans and log data in Clickhouse (#2567)
1 parent 0597691 commit 128bc43

File tree

87 files changed

+6855
-2101
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

87 files changed

+6855
-2101
lines changed

.vscode/settings.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,6 @@
66
"**/node_modules/**": true,
77
"packages/cli-v3/e2e": true
88
},
9-
"vitest.disableWorkspaceWarning": true
9+
"vitest.disableWorkspaceWarning": true,
10+
"typescript.experimental.useTsgo": false
1011
}

apps/webapp/app/entry.server.tsx

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ import { logger } from "./services/logger.server";
240240
import { Prisma } from "./db.server";
241241
import { registerRunEngineEventBusHandlers } from "./v3/runEngineHandlers.server";
242242
import { remoteBuildsEnabled } from "./v3/remoteImageBuilder.server";
243+
import { resourceMonitor } from "./services/resourceMonitor.server";
243244

244245
if (env.EVENT_LOOP_MONITOR_ENABLED === "1") {
245246
eventLoopMonitor.enable();
@@ -250,3 +251,7 @@ if (remoteBuildsEnabled()) {
250251
} else {
251252
console.log("🏗️ Local builds enabled");
252253
}
254+
255+
if (env.RESOURCE_MONITOR_ENABLED === "1") {
256+
resourceMonitor.startMonitoring(1000);
257+
}

apps/webapp/app/env.server.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,7 @@ const EnvironmentSchema = z
493493
CENTS_PER_RUN: z.coerce.number().default(0),
494494

495495
EVENT_LOOP_MONITOR_ENABLED: z.string().default("1"),
496+
RESOURCE_MONITOR_ENABLED: z.string().default("0"),
496497
MAXIMUM_LIVE_RELOADING_EVENTS: z.coerce.number().int().default(1000),
497498
MAXIMUM_TRACE_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(25_000),
498499
MAXIMUM_TRACE_DETAILED_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(10_000),
@@ -1110,6 +1111,23 @@ const EnvironmentSchema = z
11101111
CLICKHOUSE_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
11111112
CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"),
11121113

1114+
EVENTS_CLICKHOUSE_URL: z
1115+
.string()
1116+
.optional()
1117+
.transform((v) => v ?? process.env.CLICKHOUSE_URL),
1118+
EVENTS_CLICKHOUSE_KEEP_ALIVE_ENABLED: z.string().default("1"),
1119+
EVENTS_CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(),
1120+
EVENTS_CLICKHOUSE_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10),
1121+
EVENTS_CLICKHOUSE_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
1122+
EVENTS_CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"),
1123+
EVENTS_CLICKHOUSE_BATCH_SIZE: z.coerce.number().int().default(1000),
1124+
EVENTS_CLICKHOUSE_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),
1125+
EVENT_REPOSITORY_CLICKHOUSE_ROLLOUT_PERCENT: z.coerce.number().optional(),
1126+
EVENT_REPOSITORY_DEFAULT_STORE: z.enum(["postgres", "clickhouse"]).default("postgres"),
1127+
EVENTS_CLICKHOUSE_MAX_TRACE_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(25_000),
1128+
EVENTS_CLICKHOUSE_MAX_TRACE_DETAILED_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(5_000),
1129+
EVENTS_CLICKHOUSE_MAX_LIVE_RELOADING_SETTING: z.coerce.number().int().default(2000),
1130+
11131131
// Bootstrap
11141132
TRIGGER_BOOTSTRAP_ENABLED: z.string().default("0"),
11151133
TRIGGER_BOOTSTRAP_WORKER_GROUP_NAME: z.string().optional(),

apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ export class ApiRetrieveRunPresenter {
7575
},
7676
select: {
7777
...commonRunSelect,
78+
traceId: true,
7879
payload: true,
7980
payloadType: true,
8081
output: true,

apps/webapp/app/presenters/v3/RunPresenter.server.ts

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ import { createTreeFromFlatItems, flattenTree } from "~/components/primitives/Tr
33
import { prisma, type PrismaClient } from "~/db.server";
44
import { createTimelineSpanEventsFromSpanEvents } from "~/utils/timelineSpanEvents";
55
import { getUsername } from "~/utils/username";
6-
import { eventRepository } from "~/v3/eventRepository.server";
6+
import { resolveEventRepositoryForStore } from "~/v3/eventRepository/index.server";
7+
import { SpanSummary } from "~/v3/eventRepository/eventRepository.types";
78
import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server";
89
import { isFinalRunStatus } from "~/v3/taskStatus";
10+
import { env } from "~/env.server";
911

1012
type Result = Awaited<ReturnType<RunPresenter["call"]>>;
1113
export type Run = Result["run"];
@@ -45,9 +47,11 @@ export class RunPresenter {
4547
id: true,
4648
createdAt: true,
4749
taskEventStore: true,
50+
taskIdentifier: true,
4851
number: true,
4952
traceId: true,
5053
spanId: true,
54+
parentSpanId: true,
5155
friendlyId: true,
5256
status: true,
5357
startedAt: true,
@@ -137,21 +141,55 @@ export class RunPresenter {
137141
return {
138142
run: runData,
139143
trace: undefined,
144+
maximumLiveReloadingSetting: env.MAXIMUM_LIVE_RELOADING_EVENTS,
140145
};
141146
}
142147

148+
const eventRepository = resolveEventRepositoryForStore(run.taskEventStore);
149+
143150
// get the events
144-
const traceSummary = await eventRepository.getTraceSummary(
151+
let traceSummary = await eventRepository.getTraceSummary(
145152
getTaskEventStoreTableForRun(run),
153+
run.runtimeEnvironment.id,
146154
run.traceId,
147155
run.rootTaskRun?.createdAt ?? run.createdAt,
148156
run.completedAt ?? undefined,
149157
{ includeDebugLogs: showDebug }
150158
);
159+
151160
if (!traceSummary) {
152-
return {
153-
run: runData,
154-
trace: undefined,
161+
const spanSummary: SpanSummary = {
162+
id: run.spanId,
163+
parentId: run.parentSpanId ?? undefined,
164+
runId: run.friendlyId,
165+
data: {
166+
message: run.taskIdentifier,
167+
style: { icon: "task", variant: "primary" },
168+
events: [],
169+
startTime: run.createdAt,
170+
duration: 0,
171+
isError:
172+
run.status === "COMPLETED_WITH_ERRORS" ||
173+
run.status === "CRASHED" ||
174+
run.status === "EXPIRED" ||
175+
run.status === "SYSTEM_FAILURE" ||
176+
run.status === "TIMED_OUT",
177+
isPartial:
178+
run.status === "DELAYED" ||
179+
run.status === "PENDING" ||
180+
run.status === "PAUSED" ||
181+
run.status === "RETRYING_AFTER_FAILURE" ||
182+
run.status === "DEQUEUED" ||
183+
run.status === "EXECUTING",
184+
isCancelled: run.status === "CANCELED",
185+
isDebug: false,
186+
level: "TRACE",
187+
},
188+
};
189+
190+
traceSummary = {
191+
rootSpan: spanSummary,
192+
spans: [spanSummary],
155193
};
156194
}
157195

@@ -220,7 +258,9 @@ export class RunPresenter {
220258
queuedDuration: run.startedAt
221259
? millisecondsToNanoseconds(run.startedAt.getTime() - run.createdAt.getTime())
222260
: undefined,
261+
overridesBySpanId: traceSummary.overridesBySpanId,
223262
},
263+
maximumLiveReloadingSetting: eventRepository.maximumLiveReloadingSetting,
224264
};
225265
}
226266
}

apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { eventStream } from "remix-utils/sse/server";
33
import { PrismaClient, prisma } from "~/db.server";
44
import { logger } from "~/services/logger.server";
55
import { throttle } from "~/utils/throttle";
6-
import { eventRepository } from "~/v3/eventRepository.server";
6+
import { tracePubSub } from "~/v3/services/tracePubSub.server";
77

88
const pingInterval = 1000;
99

@@ -41,7 +41,7 @@ export class RunStreamPresenter {
4141

4242
let pinger: NodeJS.Timeout | undefined = undefined;
4343

44-
const { unsubscribe, eventEmitter } = await eventRepository.subscribeToTrace(run.traceId);
44+
const { unsubscribe, eventEmitter } = await tracePubSub.subscribeToTrace(run.traceId);
4545

4646
return eventStream(request.signal, (send, close) => {
4747
const safeSend = (args: { event?: string; data: string }) => {

apps/webapp/app/presenters/v3/SpanPresenter.server.ts

Lines changed: 64 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,23 @@ import {
1010
import { AttemptId, getMaxDuration, parseTraceparent } from "@trigger.dev/core/v3/isomorphic";
1111
import { RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus";
1212
import { logger } from "~/services/logger.server";
13-
import { eventRepository, rehydrateAttribute } from "~/v3/eventRepository.server";
13+
import { rehydrateAttribute } from "~/v3/eventRepository/eventRepository.server";
1414
import { machinePresetFromRun } from "~/v3/machinePresets.server";
1515
import { getTaskEventStoreTableForRun, type TaskEventStoreTable } from "~/v3/taskEventStore.server";
1616
import { isFailedRunStatus, isFinalRunStatus } from "~/v3/taskStatus";
1717
import { BasePresenter } from "./basePresenter.server";
1818
import { WaitpointPresenter } from "./WaitpointPresenter.server";
1919
import { engine } from "~/v3/runEngine.server";
20+
import { resolveEventRepositoryForStore } from "~/v3/eventRepository/index.server";
21+
import { IEventRepository, SpanDetail } from "~/v3/eventRepository/eventRepository.types";
2022

2123
type Result = Awaited<ReturnType<SpanPresenter["call"]>>;
2224
export type Span = NonNullable<NonNullable<Result>["span"]>;
2325
export type SpanRun = NonNullable<NonNullable<Result>["run"]>;
2426
type FindRunResult = NonNullable<
2527
Awaited<ReturnType<InstanceType<typeof SpanPresenter>["findRun"]>>
2628
>;
27-
type GetSpanResult = NonNullable<Awaited<ReturnType<(typeof eventRepository)["getSpan"]>>>;
29+
type GetSpanResult = SpanDetail;
2830

2931
export class SpanPresenter extends BasePresenter {
3032
public async call({
@@ -74,14 +76,20 @@ export class SpanPresenter extends BasePresenter {
7476
return;
7577
}
7678

79+
const { traceId } = parentRun;
80+
81+
const eventRepository = resolveEventRepositoryForStore(parentRun.taskEventStore);
82+
7783
const eventStore = getTaskEventStoreTableForRun(parentRun);
7884

7985
const run = await this.getRun({
8086
eventStore,
81-
environmentId: parentRun.runtimeEnvironmentId,
87+
traceId,
88+
eventRepository,
8289
spanId,
8390
createdAt: parentRun.createdAt,
8491
completedAt: parentRun.completedAt,
92+
environmentId: parentRun.runtimeEnvironmentId,
8593
});
8694
if (run) {
8795
return {
@@ -93,10 +101,12 @@ export class SpanPresenter extends BasePresenter {
93101
const span = await this.#getSpan({
94102
eventStore,
95103
spanId,
104+
traceId,
96105
environmentId: parentRun.runtimeEnvironmentId,
97106
projectId: parentRun.projectId,
98107
createdAt: parentRun.createdAt,
99108
completedAt: parentRun.completedAt,
109+
eventRepository,
100110
});
101111

102112
if (!span) {
@@ -112,29 +122,30 @@ export class SpanPresenter extends BasePresenter {
112122
async getRun({
113123
eventStore,
114124
environmentId,
125+
traceId,
126+
eventRepository,
115127
spanId,
116128
createdAt,
117129
completedAt,
118130
}: {
119131
eventStore: TaskEventStoreTable;
120132
environmentId: string;
133+
traceId: string;
134+
eventRepository: IEventRepository;
121135
spanId: string;
122136
createdAt: Date;
123137
completedAt: Date | null;
124138
}) {
125-
const span = await eventRepository.getSpan({
126-
storeTable: eventStore,
127-
spanId,
139+
const originalRunId = await eventRepository.getSpanOriginalRunId(
140+
eventStore,
128141
environmentId,
129-
startCreatedAt: createdAt,
130-
endCreatedAt: completedAt ?? undefined,
131-
});
132-
133-
if (!span) {
134-
return;
135-
}
142+
spanId,
143+
traceId,
144+
createdAt,
145+
completedAt ?? undefined
146+
);
136147

137-
const run = await this.findRun({ span, spanId });
148+
const run = await this.findRun({ originalRunId, spanId, environmentId });
138149

139150
if (!run) {
140151
return;
@@ -259,7 +270,7 @@ export class SpanPresenter extends BasePresenter {
259270
workerQueue: run.workerQueue,
260271
traceId: run.traceId,
261272
spanId: run.spanId,
262-
isCached: !!span.originalRun,
273+
isCached: !!originalRunId,
263274
machinePreset: machine?.name,
264275
externalTraceId,
265276
};
@@ -294,7 +305,15 @@ export class SpanPresenter extends BasePresenter {
294305
};
295306
}
296307

297-
async findRun({ span, spanId }: { span: GetSpanResult; spanId: string }) {
308+
async findRun({
309+
originalRunId,
310+
spanId,
311+
environmentId,
312+
}: {
313+
originalRunId?: string;
314+
spanId: string;
315+
environmentId: string;
316+
}) {
298317
const run = await this._replica.taskRun.findFirst({
299318
select: {
300319
id: true,
@@ -404,12 +423,14 @@ export class SpanPresenter extends BasePresenter {
404423
},
405424
},
406425
},
407-
where: span.originalRun
426+
where: originalRunId
408427
? {
409-
friendlyId: span.originalRun,
428+
friendlyId: originalRunId,
429+
runtimeEnvironmentId: environmentId,
410430
}
411431
: {
412432
spanId,
433+
runtimeEnvironmentId: environmentId,
413434
},
414435
});
415436

@@ -418,27 +439,32 @@ export class SpanPresenter extends BasePresenter {
418439

419440
async #getSpan({
420441
eventStore,
442+
eventRepository,
443+
traceId,
421444
spanId,
422445
environmentId,
423446
projectId,
424447
createdAt,
425448
completedAt,
426449
}: {
450+
eventRepository: IEventRepository;
451+
traceId: string;
427452
spanId: string;
428453
environmentId: string;
429454
projectId: string;
430455
eventStore: TaskEventStoreTable;
431456
createdAt: Date;
432457
completedAt: Date | null;
433458
}) {
434-
const span = await eventRepository.getSpan({
435-
storeTable: eventStore,
436-
spanId,
459+
const span = await eventRepository.getSpan(
460+
eventStore,
437461
environmentId,
438-
startCreatedAt: createdAt,
439-
endCreatedAt: completedAt ?? undefined,
440-
options: { includeDebugLogs: true },
441-
});
462+
spanId,
463+
traceId,
464+
createdAt,
465+
completedAt ?? undefined,
466+
{ includeDebugLogs: true }
467+
);
442468

443469
if (!span) {
444470
return;
@@ -451,23 +477,29 @@ export class SpanPresenter extends BasePresenter {
451477
spanId: true,
452478
createdAt: true,
453479
number: true,
454-
lockedToVersion: {
455-
select: {
456-
version: true,
457-
},
458-
},
480+
taskVersion: true,
459481
},
460482
where: {
461483
parentSpanId: spanId,
462484
},
463485
});
464486

465487
const data = {
466-
...span,
488+
spanId: span.spanId,
489+
parentId: span.parentId,
490+
message: span.message,
491+
isError: span.isError,
492+
isPartial: span.isPartial,
493+
isCancelled: span.isCancelled,
494+
level: span.level,
495+
startTime: span.startTime,
496+
duration: span.duration,
467497
events: span.events,
498+
style: span.style,
468499
properties: span.properties ? JSON.stringify(span.properties, null, 2) : undefined,
500+
entity: span.entity,
501+
metadata: span.metadata,
469502
triggeredRuns,
470-
showActionBar: span.show?.actions === true,
471503
};
472504

473505
switch (span.entity.type) {

0 commit comments

Comments
 (0)