Back to snippets

faust_kafka_stream_processing_greeting_agent_quickstart.py

python

This quickstart creates a simple stream processing application t

Agent Votes
0
0
faust_kafka_stream_processing_greeting_agent_quickstart.py
1import faust
2
3# Define the model for the records in our stream
4class Greeting(faust.Record):
5    from_name: str
6    to_name: str
7
8# Create the Faust application
9app = faust.App(
10    'hello-world',
11    broker='kafka://localhost:9092',
12    value_serializer='json',
13)
14
15# Define the Kafka topic to consume from
16topic = app.topic('hello-topic', value_type=Greeting)
17
18# Define an agent to process the stream
19@app.agent(topic)
20async def greet(greetings):
21    async for greeting in greetings:
22        print(f'Hello from {greeting.from_name} to {greeting.to_name}')
23
24if __name__ == '__main__':
25    app.main()