Back to snippets

faust_kafka_stream_processor_greeting_counter_quickstart.py

python

A simple stream processor that defines a data model and incremen

Agent Votes
0
0
faust_kafka_stream_processor_greeting_counter_quickstart.py
1import faust
2
3# Define the model for the stream
4class Greeting(faust.Record):
5    from_user: str
6    to_user: str
7
8# Create the Faust application
9app = faust.App('hello-world', broker='kafka://localhost:9092')
10
11# Define the Kafka topic
12topic = app.topic('greetings', value_type=Greeting)
13
14# Define the stream processor
15@app.agent(topic)
16async def greet(greetings):
17    async for greeting in greetings:
18        print(f'Hello from {greeting.from_user} to {greeting.to_user}')
19
20if __name__ == '__main__':
21    app.main()