Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/slimy-tigers-trade.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@thirdweb-dev/service-utils": minor
---

feat: Add webhook producer to service-utils
1 change: 1 addition & 0 deletions packages/service-utils/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
},
"dependencies": {
"@confluentinc/kafka-javascript": "1.3.2",
"@paralleldrive/cuid2": "^2.2.2",
"aws4fetch": "1.0.20",
"zod": "3.25.62"
},
Expand Down
1 change: 1 addition & 0 deletions packages/service-utils/src/node/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export * from "../core/usage.js";
export * from "../core/usageV2.js";
export * from "./kafka.js";
export * from "./usageV2.js";
export * from "./webhookProducer.js";

type NodeServiceConfig = CoreServiceConfig;

Expand Down
98 changes: 51 additions & 47 deletions packages/service-utils/src/node/usageV2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,48 +6,60 @@ import {
} from "../core/usageV2.js";
import { KafkaProducer } from "./kafka.js";

const TEAM_ID_PREFIX = "team_";
const PROJECT_ID_PREFIX = "prj_";

/**
* Creates a UsageV2Producer which opens a persistent TCP connection.
* This class is thread-safe so your service should re-use one instance.
* Creates a producer for usage events.
*
* Example:
* ```ts
* usageV2 = new UsageV2Producer(..)
* await usageV2.sendEvents(events)
* // Non-blocking:
* // void usageV2.sendEvents(events).catch((e) => console.error(e))
* const kafkaProducer = new KafkaProducer({...});
* const usageV2 = new UsageV2Producer({ kafkaProducer, source: "storage" });
* await usageV2.sendEvents(events);
* ```
*/
export class UsageV2Producer {
private kafkaProducer: KafkaProducer;
private topic: string;

constructor(config: {
/**
* A descriptive name for your service. Example: "storage-server"
*/
producerName: string;
/**
* A comma-separated list of `host[:port]` Kafka servers.
*/
kafkaServers: string;
/**
* The product where usage is coming from.
*/
source: UsageV2Source;

username: string;
password: string;
}) {
this.kafkaProducer = new KafkaProducer({
kafkaServers: config.kafkaServers,
password: config.password,
producerName: config.producerName,
username: config.username,
});
constructor(
config:
| {
/**
* Shared KafkaProducer instance.
*/
kafkaProducer: KafkaProducer;
/**
* The product where usage is coming from.
*/
source: UsageV2Source;
}
| {
/**
* A descriptive name for your service. Example: "storage-server"
*/
producerName: string;
/**
* A comma-separated list of `host[:port]` Kafka servers.
* @deprecated: Instantiate and pass in `kafkaProducer` instead.
*/
kafkaServers: string;
/**
* The product where usage is coming from.
*/
source: UsageV2Source;
username: string;
password: string;
},
) {
if ("kafkaProducer" in config) {
this.kafkaProducer = config.kafkaProducer;
} else {
this.kafkaProducer = new KafkaProducer({
kafkaServers: config.kafkaServers,
password: config.password,
producerName: config.producerName,
username: config.username,
});
}
this.topic = getTopicName(config.source);
}

Expand All @@ -61,29 +73,21 @@ export class UsageV2Producer {
* @param events
*/
async sendEvents(events: UsageV2Event[]): Promise<void> {
const parsedEvents = events.map((event) => ({
const parsedEvents: UsageV2Event[] = events.map((event) => ({
...event,
// Default to now.
created_at: event.created_at ?? new Date(),
// Default to a generated UUID.
id: event.id ?? randomUUID(),
// Remove the "prj_" prefix, if any.
project_id: event.project_id?.startsWith(PROJECT_ID_PREFIX)
? event.project_id.slice(PROJECT_ID_PREFIX.length)
// Remove the "prj_" prefix.
project_id: event.project_id?.startsWith("prj_")
? event.project_id.slice(4)
: event.project_id,
// Remove the "team_" prefix, if any.
team_id: event.team_id.startsWith(TEAM_ID_PREFIX)
? event.team_id.slice(TEAM_ID_PREFIX.length)
// Remove the "team_" prefix.
team_id: event.team_id.startsWith("team_")
? event.team_id.slice(5)
: event.team_id,
}));
await this.kafkaProducer.send(this.topic, parsedEvents);
}

/**
* Disconnects UsageV2Producer.
* Useful when shutting down the service to flush in-flight events.
*/
async disconnect() {
await this.kafkaProducer.disconnect();
}
}
61 changes: 61 additions & 0 deletions packages/service-utils/src/node/webhookProducer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import assert from "node:assert";
import { createId } from "@paralleldrive/cuid2";
import type { KafkaProducer } from "./kafka.js";

interface WebhookEvent extends Record<string, unknown> {
id?: `evt_${string}`;
teamId: string;
projectId?: string;
createdAt?: Date;
/**
* This should match your model defined in api-server.
*/
payload: Record<string, unknown>;
}

/**
* Creates a producer for webhook events.
*
* Example:
* ```ts
* const kafkaProducer = new KafkaProducer({...});
* const webhookProducer = new WebhookEventProducer({ kafkaProducer });
* await webhookProducer.sendEvents("your.topic.name", events);
* ```
*/
export class WebhookEventProducer {
private kafkaProducer: KafkaProducer;

constructor(config: { kafkaProducer: KafkaProducer }) {
this.kafkaProducer = config.kafkaProducer;
}

/**
* Emit a webhook event.
* This method may throw. To call this non-blocking:
* ```ts
* void webhookProducer.sendEvents(events).catch((e) => console.error(e))
* ```
*/
async sendEvents(topic: string, events: WebhookEvent[]): Promise<void> {
const parsedEvents: WebhookEvent[] = events.map((event) => {
assert(
event.teamId.startsWith("team_"),
"teamId must start with 'team_'",
);
assert(
!event.projectId || event.projectId.startsWith("prj_"),
"projectId must start with 'prj_'",
);

return {
...event,
// Default to now.
created_at: event.createdAt ?? new Date(),
// Default to a generated UUID.
id: event.id ?? `evt_${createId()}`,
};
});
await this.kafkaProducer.send(topic, parsedEvents);
}
}
Loading
Loading