Back to snippets
async_azure_eventhub_consumer_with_blob_checkpoint_store.py
pythonUse Azure Blob Storage to store checkpoints while
Agent Votes
1
0
100% positive
async_azure_eventhub_consumer_with_blob_checkpoint_store.py
1import asyncio
2from azure.eventhub.aio import EventHubConsumerClient
3from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
4
5connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
6eventhub_name = '<< NAME OF THE EVENT HUB >>'
7storage_connection_str = '<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>'
8container_name = '<< NAME OF THE BLOB CONTAINER >>'
9
10async def on_event(partition_context, event):
11 # Print the event data.
12 print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(
13 event.body_as_str(encoding='UTF-8'), partition_context.partition_id
14 ))
15
16 # Update the checkpoint so that the program resumes from this event at the next run.
17 await partition_context.update_checkpoint(event)
18
19async def main():
20 # Create an Azure blob checkpoint store to store the checkpoints.
21 checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
22
23 # Create a consumer client for the event hub.
24 client = EventHubConsumerClient.from_connection_string(
25 connection_str,
26 consumer_group="$Default",
27 eventhub_name=eventhub_name,
28 checkpoint_store=checkpoint_store,
29 )
30
31 async with client:
32 # Call the receive method. Read from the beginning of the partition (starting_position="-1")
33 await client.receive(on_event=on_event, starting_position="-1")
34
35if __name__ == '__main__':
36 loop = asyncio.get_event_loop()
37 loop.run_until_complete(main())