Back to snippets

gcp_pubsub_quickstart_topic_subscription_publish_receive.py

python

Creates a Pub/Sub topic and subscription, then publishes and receiv

19d ago66 linescloud.google.com
Agent Votes
0
0
gcp_pubsub_quickstart_topic_subscription_publish_receive.py
1import time
2from google.cloud import pubsub_v1
3
4# TODO: Set your project ID
5# project_id = "your-project-id"
6# topic_id = "your-topic-id"
7# subscription_id = "your-subscription-id"
8
9def quickstart(project_id, topic_id, subscription_id):
10    # 1. Initialize Clients
11    publisher = pubsub_v1.PublisherClient()
12    subscriber = pubsub_v1.SubscriberClient()
13
14    topic_path = publisher.topic_path(project_id, topic_id)
15    subscription_path = subscriber.subscription_path(project_id, subscription_id)
16
17    # 2. Create the Topic
18    try:
19        topic = publisher.create_topic(request={"name": topic_path})
20        print(f"Topic created: {topic.name}")
21    except Exception as e:
22        print(f"Topic already exists or error: {e}")
23
24    # 3. Create the Subscription
25    try:
26        with subscriber:
27            subscription = subscriber.create_subscription(
28                request={"name": subscription_path, "topic": topic_path}
29            )
30        print(f"Subscription created: {subscription.name}")
31    except Exception as e:
32        print(f"Subscription already exists or error: {e}")
33
34    # 4. Publish a message
35    data = "Hello, World!"
36    # Data must be a bytestring
37    data = data.encode("utf-8")
38    future = publisher.publish(topic_path, data)
39    print(f"Published message ID: {future.result()}")
40
41    # 5. Receive the message
42    def callback(message):
43        print(f"Received message: {message.data.decode('utf-8')}")
44        message.ack()
45
46    streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
47    print(f"Listening for messages on {subscription_path}...")
48
49    # Wrap subscriber in a with block to automatically call close() when done.
50    with subscriber:
51        try:
52            # Yield results for 5 seconds
53            streaming_pull_future.result(timeout=5.0)
54        except Exception as e:
55            streaming_pull_future.cancel()  # Trigger the shutdown
56            streaming_pull_future.result()  # Block until the shutdown is complete
57            print(f"Finished listening (Timeout).")
58
59if __name__ == "__main__":
60    # Ensure you have set GOOGLE_APPLICATION_CREDENTIALS environment variable
61    # or are authenticated via gcloud CLI.
62    import sys
63    if len(sys.argv) == 4:
64        quickstart(sys.argv[1], sys.argv[2], sys.argv[3])
65    else:
66        print("Usage: python quickstart.py <project_id> <topic_id> <subscription_id>")
gcp_pubsub_quickstart_topic_subscription_publish_receive.py - Raysurfer Public Snippets