Skip to content

Commit 407bc2c

Browse files
committed
feat: add webhook producer to service-utils
1 parent 830125c commit 407bc2c

File tree

5 files changed

+209
-47
lines changed

5 files changed

+209
-47
lines changed

.changeset/slimy-tigers-trade.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@thirdweb-dev/service-utils": minor
3+
---
4+
5+
feat: Add webhook producer to service-utils
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# Shared KafkaProducer Pattern
2+
3+
When using multiple event producers in the same service (e.g., `UsageV2Producer` and `WebhookEventProducer`), it's recommended to share a single `KafkaProducer` instance to avoid multiple TCP connections.
4+
5+
## Recommended Usage
6+
7+
```typescript
8+
import { KafkaProducer, UsageV2Producer, WebhookEventProducer } from '@thirdweb-dev/service-utils/node';
9+
10+
// Create one shared KafkaProducer
11+
const kafkaProducer = new KafkaProducer({
12+
producerName: "my-service",
13+
kafkaServers: process.env.KAFKA_SERVERS,
14+
username: process.env.KAFKA_USERNAME,
15+
password: process.env.KAFKA_PASSWORD,
16+
});
17+
18+
// Share it between producers
19+
const usageProducer = new UsageV2Producer({
20+
kafkaProducer,
21+
source: "storage"
22+
});
23+
24+
const webhookProducer = new WebhookEventProducer({
25+
kafkaProducer
26+
});
27+
28+
// Use the producers
29+
await Promise.all([
30+
usageProducer.sendEvents([...]),
31+
webhookProducer.sendEvents([...])
32+
]);
33+
34+
// Shutdown: disconnect the shared producer directly
35+
await kafkaProducer.disconnect();
36+
```
37+
38+
## Legacy Usage (Still Supported)
39+
40+
Each producer can still create its own connection:
41+
42+
```typescript
43+
const usageProducer = new UsageV2Producer({
44+
producerName: "my-service",
45+
kafkaServers: process.env.KAFKA_SERVERS,
46+
source: "storage",
47+
username: process.env.KAFKA_USERNAME,
48+
password: process.env.KAFKA_PASSWORD,
49+
});
50+
51+
const webhookProducer = new WebhookEventProducer({
52+
producerName: "my-service",
53+
kafkaServers: process.env.KAFKA_SERVERS,
54+
username: process.env.KAFKA_USERNAME,
55+
password: process.env.KAFKA_PASSWORD,
56+
});
57+
58+
// Note: With legacy usage, you need to disconnect each producer's internal KafkaProducer
59+
// This is less efficient and not recommended for services using multiple producers
60+
```
61+
62+
## Benefits of Shared Producer
63+
64+
- **Resource efficiency**: One TCP connection instead of multiple
65+
- **Consistent configuration**: Single source of connection config
66+
- **Simplified shutdown**: One disconnect call instead of multiple
67+
- **Better performance**: Reduced connection overhead

packages/service-utils/src/node/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ export * from "../core/usage.js";
1919
export * from "../core/usageV2.js";
2020
export * from "./kafka.js";
2121
export * from "./usageV2.js";
22+
export * from "./webhookProducer.js";
2223

2324
type NodeServiceConfig = CoreServiceConfig;
2425

packages/service-utils/src/node/usageV2.ts

Lines changed: 50 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -6,48 +6,59 @@ import {
66
} from "../core/usageV2.js";
77
import { KafkaProducer } from "./kafka.js";
88

9-
const TEAM_ID_PREFIX = "team_";
10-
const PROJECT_ID_PREFIX = "prj_";
11-
129
/**
13-
* Creates a UsageV2Producer which opens a persistent TCP connection.
14-
* This class is thread-safe so your service should re-use one instance.
10+
* Creates a producer for usage events.
1511
*
1612
* Example:
1713
* ```ts
18-
* usageV2 = new UsageV2Producer(..)
19-
* await usageV2.sendEvents(events)
20-
* // Non-blocking:
21-
* // void usageV2.sendEvents(events).catch((e) => console.error(e))
14+
* const kafkaProducer = new KafkaProducer({...});
15+
* const usageV2 = new UsageV2Producer({ kafkaProducer, source: "storage" });
16+
* await usageV2.sendEvents(events);
2217
* ```
2318
*/
2419
export class UsageV2Producer {
2520
private kafkaProducer: KafkaProducer;
2621
private topic: string;
2722

28-
constructor(config: {
29-
/**
30-
* A descriptive name for your service. Example: "storage-server"
31-
*/
32-
producerName: string;
33-
/**
34-
* A comma-separated list of `host[:port]` Kafka servers.
35-
*/
36-
kafkaServers: string;
37-
/**
38-
* The product where usage is coming from.
39-
*/
40-
source: UsageV2Source;
41-
42-
username: string;
43-
password: string;
44-
}) {
45-
this.kafkaProducer = new KafkaProducer({
46-
kafkaServers: config.kafkaServers,
47-
password: config.password,
48-
producerName: config.producerName,
49-
username: config.username,
50-
});
23+
constructor(
24+
config:
25+
| {
26+
/**
27+
* Shared KafkaProducer instance.
28+
*/
29+
kafkaProducer: KafkaProducer;
30+
/**
31+
* The product where usage is coming from.
32+
*/
33+
source: UsageV2Source;
34+
}
35+
| {
36+
/**
37+
* A descriptive name for your service. Example: "storage-server"
38+
*/
39+
producerName: string;
40+
/**
41+
* A comma-separated list of `host[:port]` Kafka servers.
42+
*/
43+
kafkaServers: string;
44+
/**
45+
* The product where usage is coming from.
46+
*/
47+
source: UsageV2Source;
48+
username: string;
49+
password: string;
50+
},
51+
) {
52+
if ("kafkaProducer" in config) {
53+
this.kafkaProducer = config.kafkaProducer;
54+
} else {
55+
this.kafkaProducer = new KafkaProducer({
56+
kafkaServers: config.kafkaServers,
57+
password: config.password,
58+
producerName: config.producerName,
59+
username: config.username,
60+
});
61+
}
5162
this.topic = getTopicName(config.source);
5263
}
5364

@@ -61,29 +72,21 @@ export class UsageV2Producer {
6172
* @param events
6273
*/
6374
async sendEvents(events: UsageV2Event[]): Promise<void> {
64-
const parsedEvents = events.map((event) => ({
75+
const parsedEvents: UsageV2Event[] = events.map((event) => ({
6576
...event,
6677
// Default to now.
6778
created_at: event.created_at ?? new Date(),
6879
// Default to a generated UUID.
6980
id: event.id ?? randomUUID(),
70-
// Remove the "prj_" prefix, if any.
71-
project_id: event.project_id?.startsWith(PROJECT_ID_PREFIX)
72-
? event.project_id.slice(PROJECT_ID_PREFIX.length)
81+
// Remove the "prj_" prefix.
82+
project_id: event.project_id?.startsWith("prj_")
83+
? event.project_id.slice(4)
7384
: event.project_id,
74-
// Remove the "team_" prefix, if any.
75-
team_id: event.team_id.startsWith(TEAM_ID_PREFIX)
76-
? event.team_id.slice(TEAM_ID_PREFIX.length)
85+
// Remove the "team_" prefix.
86+
team_id: event.team_id.startsWith("team_")
87+
? event.team_id.slice(5)
7788
: event.team_id,
7889
}));
7990
await this.kafkaProducer.send(this.topic, parsedEvents);
8091
}
81-
82-
/**
83-
* Disconnects UsageV2Producer.
84-
* Useful when shutting down the service to flush in-flight events.
85-
*/
86-
async disconnect() {
87-
await this.kafkaProducer.disconnect();
88-
}
8992
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import { randomUUID } from "node:crypto";
2+
import { KafkaProducer } from "./kafka.js";
3+
4+
interface WebhookEvent extends Record<string, unknown> {
5+
id?: `evt_${string}`;
6+
teamId: string;
7+
projectId?: string;
8+
createdAt?: Date;
9+
/**
10+
* This should match your model defined in api-server.
11+
*/
12+
payload: Record<string, unknown>;
13+
}
14+
15+
/**
16+
* Creates a producer for webhook events.
17+
*
18+
* Example:
19+
* ```ts
20+
* const kafkaProducer = new KafkaProducer({...});
21+
* const webhookProducer = new WebhookEventProducer({ kafkaProducer });
22+
* await webhookProducer.sendEvents(events);
23+
* ```
24+
*/
25+
export class WebhookEventProducer {
26+
private kafkaProducer: KafkaProducer;
27+
28+
constructor(
29+
config:
30+
| {
31+
/**
32+
* Shared KafkaProducer instance.
33+
*/
34+
kafkaProducer: KafkaProducer;
35+
}
36+
| {
37+
/**
38+
* A descriptive name for your service. Example: "webhook-service"
39+
*/
40+
producerName: string;
41+
/**
42+
* A comma-separated list of `host[:port]` Kafka servers.
43+
*/
44+
kafkaServers: string;
45+
username: string;
46+
password: string;
47+
},
48+
) {
49+
if ("kafkaProducer" in config) {
50+
this.kafkaProducer = config.kafkaProducer;
51+
} else {
52+
this.kafkaProducer = new KafkaProducer({
53+
kafkaServers: config.kafkaServers,
54+
password: config.password,
55+
producerName: config.producerName,
56+
username: config.username,
57+
});
58+
}
59+
}
60+
61+
/**
62+
* Emit a webhook event.
63+
* This method may throw. To call this non-blocking:
64+
* ```ts
65+
* void webhookProducer.sendEvents(events).catch((e) => console.error(e))
66+
* ```
67+
*/
68+
async sendEvents(topic: string, events: WebhookEvent[]): Promise<void> {
69+
const parsedEvents: WebhookEvent[] = events.map((event) => ({
70+
// Default to now.
71+
created_at: event.createdAt ?? new Date(),
72+
// Default to a generated UUID.
73+
id: event.id ?? `evt_${randomUUID()}`,
74+
payload: event.payload,
75+
// Add the "prj_" prefix.
76+
projectId: event.projectId?.startsWith("prj_")
77+
? event.projectId
78+
: `prj_${event.projectId}`,
79+
// Add the "team_" prefix.
80+
teamId: event.teamId.startsWith("team_")
81+
? event.teamId
82+
: `team_${event.teamId}`,
83+
}));
84+
await this.kafkaProducer.send(topic, parsedEvents);
85+
}
86+
}

0 commit comments

Comments
 (0)