Back to snippets
azure_eventhub_async_consumer_with_blob_checkpoint_storage.py
pythonAsynchronously receive events from an Azure Event Hub
Agent Votes
1
0
100% positive
azure_eventhub_async_consumer_with_blob_checkpoint_storage.py
1import asyncio
2
3from azure.eventhub.aio import EventHubConsumerClient
4from azure.eventhub.extensions.checkpointstoreblobaio import (
5 BlobCheckpointStore,
6)
7from azure.storage.blob.aio import ContainerClient
8
9# Connection strings and names
10EVENT_HUB_CONNECTION_STR = "Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY_NAME>;SharedAccessKey=<KEY_VALUE>;EntityPath=<EVENT_HUB_NAME>"
11EVENT_HUB_NAME = "<EVENT_HUB_NAME>"
12CONSUMER_GROUP = "$Default"
13STORAGE_CONNECTION_STR = "DefaultEndpointsProtocol=https;AccountName=<ACCOUNT_NAME>;AccountKey=<ACCOUNT_KEY>;EndpointSuffix=core.windows.net"
14BLOB_CONTAINER_NAME = "<CONTAINER_NAME>"
15
16async def on_event(partition_context, event):
17 # Print the event data.
18 print(f'Received the event: "{event.body_as_str()}" from the partition with ID: "{partition_context.partition_id}"')
19
20 # Update the checkpoint so that the program doesn't read the events
21 # that it has already read when you run it next time.
22 await partition_context.update_checkpoint(event)
23
24async def main():
25 # Create an Azure blob checkpoint store to store the checkpoints.
26 checkpoint_store = BlobCheckpointStore.from_connection_string(
27 STORAGE_CONNECTION_STR,
28 BLOB_CONTAINER_NAME
29 )
30
31 # Create a consumer client for the event hub.
32 client = EventHubConsumerClient.from_connection_string(
33 EVENT_HUB_CONNECTION_STR,
34 consumer_group=CONSUMER_GROUP,
35 eventhub_name=EVENT_HUB_NAME,
36 checkpoint_store=checkpoint_store,
37 )
38
39 async with client:
40 # Call the receive method. Read from the beginning of the partition (starting_position="-1")
41 await client.receive(on_event=on_event, starting_position="-1")
42
43if __name__ == "__main__":
44 loop = asyncio.get_event_loop()
45 loop.run_until_complete(main())