Skip to content

Commit c5f9921

Browse files
authored
feat: add webhook producer to service-utils (#7439)
1 parent 830125c commit c5f9921

File tree

6 files changed

+164
-216
lines changed

6 files changed

+164
-216
lines changed

.changeset/slimy-tigers-trade.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@thirdweb-dev/service-utils": minor
3+
---
4+
5+
feat: Add webhook producer to service-utils

packages/service-utils/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
},
66
"dependencies": {
77
"@confluentinc/kafka-javascript": "1.3.2",
8+
"@paralleldrive/cuid2": "^2.2.2",
89
"aws4fetch": "1.0.20",
910
"zod": "3.25.62"
1011
},

packages/service-utils/src/node/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ export * from "../core/usage.js";
1919
export * from "../core/usageV2.js";
2020
export * from "./kafka.js";
2121
export * from "./usageV2.js";
22+
export * from "./webhookProducer.js";
2223

2324
type NodeServiceConfig = CoreServiceConfig;
2425

packages/service-utils/src/node/usageV2.ts

Lines changed: 51 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -6,48 +6,60 @@ import {
66
} from "../core/usageV2.js";
77
import { KafkaProducer } from "./kafka.js";
88

9-
const TEAM_ID_PREFIX = "team_";
10-
const PROJECT_ID_PREFIX = "prj_";
11-
129
/**
13-
* Creates a UsageV2Producer which opens a persistent TCP connection.
14-
* This class is thread-safe so your service should re-use one instance.
10+
* Creates a producer for usage events.
1511
*
1612
* Example:
1713
* ```ts
18-
* usageV2 = new UsageV2Producer(..)
19-
* await usageV2.sendEvents(events)
20-
* // Non-blocking:
21-
* // void usageV2.sendEvents(events).catch((e) => console.error(e))
14+
* const kafkaProducer = new KafkaProducer({...});
15+
* const usageV2 = new UsageV2Producer({ kafkaProducer, source: "storage" });
16+
* await usageV2.sendEvents(events);
2217
* ```
2318
*/
2419
export class UsageV2Producer {
2520
private kafkaProducer: KafkaProducer;
2621
private topic: string;
2722

28-
constructor(config: {
29-
/**
30-
* A descriptive name for your service. Example: "storage-server"
31-
*/
32-
producerName: string;
33-
/**
34-
* A comma-separated list of `host[:port]` Kafka servers.
35-
*/
36-
kafkaServers: string;
37-
/**
38-
* The product where usage is coming from.
39-
*/
40-
source: UsageV2Source;
41-
42-
username: string;
43-
password: string;
44-
}) {
45-
this.kafkaProducer = new KafkaProducer({
46-
kafkaServers: config.kafkaServers,
47-
password: config.password,
48-
producerName: config.producerName,
49-
username: config.username,
50-
});
23+
constructor(
24+
config:
25+
| {
26+
/**
27+
* Shared KafkaProducer instance.
28+
*/
29+
kafkaProducer: KafkaProducer;
30+
/**
31+
* The product where usage is coming from.
32+
*/
33+
source: UsageV2Source;
34+
}
35+
| {
36+
/**
37+
* A descriptive name for your service. Example: "storage-server"
38+
*/
39+
producerName: string;
40+
/**
41+
* A comma-separated list of `host[:port]` Kafka servers.
42+
* @deprecated: Instantiate and pass in `kafkaProducer` instead.
43+
*/
44+
kafkaServers: string;
45+
/**
46+
* The product where usage is coming from.
47+
*/
48+
source: UsageV2Source;
49+
username: string;
50+
password: string;
51+
},
52+
) {
53+
if ("kafkaProducer" in config) {
54+
this.kafkaProducer = config.kafkaProducer;
55+
} else {
56+
this.kafkaProducer = new KafkaProducer({
57+
kafkaServers: config.kafkaServers,
58+
password: config.password,
59+
producerName: config.producerName,
60+
username: config.username,
61+
});
62+
}
5163
this.topic = getTopicName(config.source);
5264
}
5365

@@ -61,29 +73,21 @@ export class UsageV2Producer {
6173
* @param events
6274
*/
6375
async sendEvents(events: UsageV2Event[]): Promise<void> {
64-
const parsedEvents = events.map((event) => ({
76+
const parsedEvents: UsageV2Event[] = events.map((event) => ({
6577
...event,
6678
// Default to now.
6779
created_at: event.created_at ?? new Date(),
6880
// Default to a generated UUID.
6981
id: event.id ?? randomUUID(),
70-
// Remove the "prj_" prefix, if any.
71-
project_id: event.project_id?.startsWith(PROJECT_ID_PREFIX)
72-
? event.project_id.slice(PROJECT_ID_PREFIX.length)
82+
// Remove the "prj_" prefix.
83+
project_id: event.project_id?.startsWith("prj_")
84+
? event.project_id.slice(4)
7385
: event.project_id,
74-
// Remove the "team_" prefix, if any.
75-
team_id: event.team_id.startsWith(TEAM_ID_PREFIX)
76-
? event.team_id.slice(TEAM_ID_PREFIX.length)
86+
// Remove the "team_" prefix.
87+
team_id: event.team_id.startsWith("team_")
88+
? event.team_id.slice(5)
7789
: event.team_id,
7890
}));
7991
await this.kafkaProducer.send(this.topic, parsedEvents);
8092
}
81-
82-
/**
83-
* Disconnects UsageV2Producer.
84-
* Useful when shutting down the service to flush in-flight events.
85-
*/
86-
async disconnect() {
87-
await this.kafkaProducer.disconnect();
88-
}
8993
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import assert from "node:assert";
2+
import { createId } from "@paralleldrive/cuid2";
3+
import type { KafkaProducer } from "./kafka.js";
4+
5+
interface WebhookEvent extends Record<string, unknown> {
6+
id?: `evt_${string}`;
7+
teamId: string;
8+
projectId?: string;
9+
createdAt?: Date;
10+
/**
11+
* This should match your model defined in api-server.
12+
*/
13+
payload: Record<string, unknown>;
14+
}
15+
16+
/**
17+
* Creates a producer for webhook events.
18+
*
19+
* Example:
20+
* ```ts
21+
* const kafkaProducer = new KafkaProducer({...});
22+
* const webhookProducer = new WebhookEventProducer({ kafkaProducer });
23+
* await webhookProducer.sendEvents("your.topic.name", events);
24+
* ```
25+
*/
26+
export class WebhookEventProducer {
27+
private kafkaProducer: KafkaProducer;
28+
29+
constructor(config: { kafkaProducer: KafkaProducer }) {
30+
this.kafkaProducer = config.kafkaProducer;
31+
}
32+
33+
/**
34+
* Emit a webhook event.
35+
* This method may throw. To call this non-blocking:
36+
* ```ts
37+
* void webhookProducer.sendEvents(events).catch((e) => console.error(e))
38+
* ```
39+
*/
40+
async sendEvents(topic: string, events: WebhookEvent[]): Promise<void> {
41+
const parsedEvents: WebhookEvent[] = events.map((event) => {
42+
assert(
43+
event.teamId.startsWith("team_"),
44+
"teamId must start with 'team_'",
45+
);
46+
assert(
47+
!event.projectId || event.projectId.startsWith("prj_"),
48+
"projectId must start with 'prj_'",
49+
);
50+
51+
return {
52+
...event,
53+
// Default to now.
54+
created_at: event.createdAt ?? new Date(),
55+
// Default to a generated UUID.
56+
id: event.id ?? `evt_${createId()}`,
57+
};
58+
});
59+
await this.kafkaProducer.send(topic, parsedEvents);
60+
}
61+
}

0 commit comments

Comments
 (0)