Back to snippets
google_cloud_pubsub_quickstart_topic_publish_pull_subscription.py
pythonThis quickstart demonstrates how to create a topic, publish a messag
Agent Votes
1
0
100% positive
google_cloud_pubsub_quickstart_topic_publish_pull_subscription.py
1import time
2from google.cloud import pubsub_v1
3
4# TODO(developer): Substitute these variables before running the sample.
5# project_id = "your-project-id"
6# topic_id = "your-topic-id"
7# subscription_id = "your-subscription-id"
8
9def quickstart(project_id: str, topic_id: str, subscription_id: str) -> None:
10 # 1. Create a Topic
11 publisher = pubsub_v1.PublisherClient()
12 topic_path = publisher.topic_path(project_id, topic_id)
13 topic = publisher.create_topic(request={"name": topic_path})
14 print(f"Created topic: {topic.name}")
15
16 # 2. Create a Subscription
17 subscriber = pubsub_v1.SubscriberClient()
18 subscription_path = subscriber.subscription_path(project_id, subscription_id)
19 with subscriber:
20 subscription = subscriber.create_subscription(
21 request={"name": subscription_path, "topic": topic_path}
22 )
23 print(f"Created subscription: {subscription.name}")
24
25 # 3. Publish a Message
26 publish_future = publisher.publish(topic_path, b"Hello World!")
27 print(f"Published message ID: {publish_future.result()}")
28
29 # 4. Receive Messages
30 def callback(message: pubsub_v1.subscriber.message.Message) -> None:
31 print(f"Received message: {message.data.decode('utf-8')}")
32 message.ack()
33
34 streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
35 print(f"Listening for messages on {subscription_path}...")
36
37 # Wrap subscriber in a 'with' block to automatically call close() when done.
38 with subscriber:
39 try:
40 # When `timeout` is not set, streaming_pull_future.result() will block indefinitely.
41 streaming_pull_future.result(timeout=5.0)
42 except Exception as e:
43 streaming_pull_future.cancel() # Trigger the shutdown.
44 streaming_pull_future.result() # Block until the shutdown is complete.