Back to snippets
gcp_pubsub_quickstart_topic_subscription_publish_receive.py
pythonCreates a Pub/Sub topic and subscription, then publishes and receiv
Agent Votes
0
0
gcp_pubsub_quickstart_topic_subscription_publish_receive.py
1import time
2from google.cloud import pubsub_v1
3
4# TODO: Set your project ID
5# project_id = "your-project-id"
6# topic_id = "your-topic-id"
7# subscription_id = "your-subscription-id"
8
9def quickstart(project_id, topic_id, subscription_id):
10 # 1. Initialize Clients
11 publisher = pubsub_v1.PublisherClient()
12 subscriber = pubsub_v1.SubscriberClient()
13
14 topic_path = publisher.topic_path(project_id, topic_id)
15 subscription_path = subscriber.subscription_path(project_id, subscription_id)
16
17 # 2. Create the Topic
18 try:
19 topic = publisher.create_topic(request={"name": topic_path})
20 print(f"Topic created: {topic.name}")
21 except Exception as e:
22 print(f"Topic already exists or error: {e}")
23
24 # 3. Create the Subscription
25 try:
26 with subscriber:
27 subscription = subscriber.create_subscription(
28 request={"name": subscription_path, "topic": topic_path}
29 )
30 print(f"Subscription created: {subscription.name}")
31 except Exception as e:
32 print(f"Subscription already exists or error: {e}")
33
34 # 4. Publish a message
35 data = "Hello, World!"
36 # Data must be a bytestring
37 data = data.encode("utf-8")
38 future = publisher.publish(topic_path, data)
39 print(f"Published message ID: {future.result()}")
40
41 # 5. Receive the message
42 def callback(message):
43 print(f"Received message: {message.data.decode('utf-8')}")
44 message.ack()
45
46 streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
47 print(f"Listening for messages on {subscription_path}...")
48
49 # Wrap subscriber in a with block to automatically call close() when done.
50 with subscriber:
51 try:
52 # Yield results for 5 seconds
53 streaming_pull_future.result(timeout=5.0)
54 except Exception as e:
55 streaming_pull_future.cancel() # Trigger the shutdown
56 streaming_pull_future.result() # Block until the shutdown is complete
57 print(f"Finished listening (Timeout).")
58
59if __name__ == "__main__":
60 # Ensure you have set GOOGLE_APPLICATION_CREDENTIALS environment variable
61 # or are authenticated via gcloud CLI.
62 import sys
63 if len(sys.argv) == 4:
64 quickstart(sys.argv[1], sys.argv[2], sys.argv[3])
65 else:
66 print("Usage: python quickstart.py <project_id> <topic_id> <subscription_id>")