@@ -6,48 +6,60 @@ import {
66} from "../core/usageV2.js" ;
77import { KafkaProducer } from "./kafka.js" ;
88
9- const TEAM_ID_PREFIX = "team_" ;
10- const PROJECT_ID_PREFIX = "prj_" ;
11-
129/**
13- * Creates a UsageV2Producer which opens a persistent TCP connection.
14- * This class is thread-safe so your service should re-use one instance.
10+ * Creates a producer for usage events.
1511 *
1612 * Example:
1713 * ```ts
18- * usageV2 = new UsageV2Producer(..)
19- * await usageV2.sendEvents(events)
20- * // Non-blocking:
21- * // void usageV2.sendEvents(events).catch((e) => console.error(e))
14+ * const kafkaProducer = new KafkaProducer({...});
15+ * const usageV2 = new UsageV2Producer({ kafkaProducer, source: "storage" });
16+ * await usageV2.sendEvents(events);
2217 * ```
2318 */
2419export class UsageV2Producer {
2520 private kafkaProducer : KafkaProducer ;
2621 private topic : string ;
2722
28- constructor ( config : {
29- /**
30- * A descriptive name for your service. Example: "storage-server"
31- */
32- producerName : string ;
33- /**
34- * A comma-separated list of `host[:port]` Kafka servers.
35- */
36- kafkaServers : string ;
37- /**
38- * The product where usage is coming from.
39- */
40- source : UsageV2Source ;
41-
42- username : string ;
43- password : string ;
44- } ) {
45- this . kafkaProducer = new KafkaProducer ( {
46- kafkaServers : config . kafkaServers ,
47- password : config . password ,
48- producerName : config . producerName ,
49- username : config . username ,
50- } ) ;
23+ constructor (
24+ config :
25+ | {
26+ /**
27+ * Shared KafkaProducer instance.
28+ */
29+ kafkaProducer : KafkaProducer ;
30+ /**
31+ * The product where usage is coming from.
32+ */
33+ source : UsageV2Source ;
34+ }
35+ | {
36+ /**
37+ * A descriptive name for your service. Example: "storage-server"
38+ */
39+ producerName : string ;
40+ /**
41+ * A comma-separated list of `host[:port]` Kafka servers.
42+ * @deprecated : Instantiate and pass in `kafkaProducer` instead.
43+ */
44+ kafkaServers : string ;
45+ /**
46+ * The product where usage is coming from.
47+ */
48+ source : UsageV2Source ;
49+ username : string ;
50+ password : string ;
51+ } ,
52+ ) {
53+ if ( "kafkaProducer" in config ) {
54+ this . kafkaProducer = config . kafkaProducer ;
55+ } else {
56+ this . kafkaProducer = new KafkaProducer ( {
57+ kafkaServers : config . kafkaServers ,
58+ password : config . password ,
59+ producerName : config . producerName ,
60+ username : config . username ,
61+ } ) ;
62+ }
5163 this . topic = getTopicName ( config . source ) ;
5264 }
5365
@@ -61,29 +73,21 @@ export class UsageV2Producer {
6173 * @param events
6274 */
6375 async sendEvents ( events : UsageV2Event [ ] ) : Promise < void > {
64- const parsedEvents = events . map ( ( event ) => ( {
76+ const parsedEvents : UsageV2Event [ ] = events . map ( ( event ) => ( {
6577 ...event ,
6678 // Default to now.
6779 created_at : event . created_at ?? new Date ( ) ,
6880 // Default to a generated UUID.
6981 id : event . id ?? randomUUID ( ) ,
70- // Remove the "prj_" prefix, if any .
71- project_id : event . project_id ?. startsWith ( PROJECT_ID_PREFIX )
72- ? event . project_id . slice ( PROJECT_ID_PREFIX . length )
82+ // Remove the "prj_" prefix.
83+ project_id : event . project_id ?. startsWith ( "prj_" )
84+ ? event . project_id . slice ( 4 )
7385 : event . project_id ,
74- // Remove the "team_" prefix, if any .
75- team_id : event . team_id . startsWith ( TEAM_ID_PREFIX )
76- ? event . team_id . slice ( TEAM_ID_PREFIX . length )
86+ // Remove the "team_" prefix.
87+ team_id : event . team_id . startsWith ( "team_" )
88+ ? event . team_id . slice ( 5 )
7789 : event . team_id ,
7890 } ) ) ;
7991 await this . kafkaProducer . send ( this . topic , parsedEvents ) ;
8092 }
81-
82- /**
83- * Disconnects UsageV2Producer.
84- * Useful when shutting down the service to flush in-flight events.
85- */
86- async disconnect ( ) {
87- await this . kafkaProducer . disconnect ( ) ;
88- }
8993}
0 commit comments