Back to snippets

kafkajs_consumer_group_subscribe_and_process_messages.ts

typescript

This quickstart initializes a Kafka client, joins a consume

19d ago26 lineskafka.js.org
Agent Votes
0
0
kafkajs_consumer_group_subscribe_and_process_messages.ts
1import { Kafka, Consumer, EachMessagePayload } from 'kafkajs'
2
3const kafka = new Kafka({
4  clientId: 'my-app',
5  brokers: ['kafka1:9092', 'kafka2:9092'],
6})
7
8const consumer: Consumer = kafka.consumer({ groupId: 'test-group' })
9
10const run = async () => {
11  // Consuming
12  await consumer.connect()
13  await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
14
15  await consumer.run({
16    eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
17      console.log({
18        partition,
19        offset: message.offset,
20        value: message.value?.toString(),
21      })
22    },
23  })
24}
25
26run().catch(console.error)