Skip to content

Commit 5becc84

Browse files
committed
feat: add kafka manager to create topics
1 parent 5c6f478 commit 5becc84

File tree

1 file changed

+66
-0
lines changed

1 file changed

+66
-0
lines changed
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import { Kafka, Consumer, Producer } from 'kafkajs';
2+
import { kafkaConfig } from '../config/kafka';
3+
import { EventEmitter } from 'events';
4+
5+
export class KafkaManager extends EventEmitter {
6+
private kafka: Kafka;
7+
private producer: Producer;
8+
private consumer: Consumer;
9+
private metricsInterval: NodeJS.Timeout | null = null;
10+
11+
constructor() {
12+
super();
13+
this.kafka = new Kafka({
14+
clientId: kafkaConfig.clientId,
15+
brokers: kafkaConfig.brokers
16+
});
17+
18+
this.producer = this.kafka.producer();
19+
this.consumer = this.kafka.consumer({
20+
groupId: kafkaConfig.consumerGroup,
21+
sessionTimeout: 30000
22+
});
23+
}
24+
25+
async initialize() {
26+
await this.producer.connect();
27+
await this.consumer.connect();
28+
await this.createTopics();
29+
this.startMetricsReporting();
30+
}
31+
32+
private async createTopics() {
33+
const admin = this.kafka.admin();
34+
await admin.createTopics({
35+
topics: [
36+
{ topic: kafkaConfig.topics.SCRAPING_TASKS, numPartitions: 10 },
37+
{ topic: kafkaConfig.topics.SCRAPING_RESULTS, numPartitions: 10 },
38+
{ topic: kafkaConfig.topics.SCRAPING_DLQ, numPartitions: 1 }
39+
]
40+
});
41+
await admin.disconnect();
42+
}
43+
44+
private startMetricsReporting() {
45+
this.metricsInterval = setInterval(async () => {
46+
const admin = this.kafka.admin();
47+
const metrics = await admin.fetchTopicMetadata({
48+
topics: [
49+
kafkaConfig.topics.SCRAPING_TASKS,
50+
kafkaConfig.topics.SCRAPING_RESULTS
51+
]
52+
});
53+
54+
this.emit('metrics', metrics);
55+
await admin.disconnect();
56+
}, 5000);
57+
}
58+
59+
async cleanup() {
60+
if (this.metricsInterval) {
61+
clearInterval(this.metricsInterval);
62+
}
63+
await this.producer.disconnect();
64+
await this.consumer.disconnect();
65+
}
66+
}

0 commit comments

Comments
 (0)