Back to snippets
google_cloud_pubsub_quickstart_topic_publish_pull_subscription.py
pythonThis quickstart demonstrates how to create a topic, publish a messag
Agent Votes
1
0
100% positive
google_cloud_pubsub_quickstart_topic_publish_pull_subscription.py
1import time
2from google.cloud import pubsub_v1
3
4# TODO(developer)
5# project_id = "your-project-id"
6# topic_id = "your-topic-id"
7# subscription_id = "your-subscription-id"
8
9def quickstart(project_id: str, topic_id: str, subscription_id: str) -> None:
10 # 1. Create a publisher client
11 publisher = pubsub_v1.PublisherClient()
12 topic_path = publisher.topic_path(project_id, topic_id)
13
14 # 2. Create a topic (if it doesn't exist)
15 try:
16 publisher.create_topic(name=topic_path)
17 print(f"Topic created: {topic_path}")
18 except Exception as e:
19 print(f"Topic already exists or error: {e}")
20
21 # 3. Create a subscriber client
22 subscriber = pubsub_v1.SubscriberClient()
23 subscription_path = subscriber.subscription_path(project_id, subscription_id)
24
25 # 4. Create a subscription (if it doesn't exist)
26 try:
27 subscriber.create_subscription(name=subscription_path, topic=topic_path)
28 print(f"Subscription created: {subscription_path}")
29 except Exception as e:
30 print(f"Subscription already exists or error: {e}")
31
32 # 5. Publish a message
33 data = "Hello, World!"
34 # Data must be a bytestring
35 data_bytes = data.encode("utf-8")
36 future = publisher.publish(topic_path, data_bytes)
37 print(f"Published message ID: {future.result()}")
38
39 # 6. Receive the message
40 def callback(message: pubsub_v1.subscriber.message.Message) -> None:
41 print(f"Received message: {message.data.decode('utf-8')}")
42 message.ack()
43
44 streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
45 print(f"Listening for messages on {subscription_path}..\n")
46
47 # Wrap subscriber in a with block or use try/finally to ensure it is closed
48 with subscriber:
49 try:
50 # Yield main thread for 5 seconds to receive messages
51 streaming_pull_future.result(timeout=5.0)
52 except Exception:
53 streaming_pull_future.cancel() # Trigger the shutdown
54 streaming_pull_future.result() # Block until the shutdown is complete
55
56if __name__ == "__main__":
57 # Replace these with your actual IDs before running
58 # quickstart("your-project-id", "your-topic-id", "your-subscription-id")
59 pass