From acd3a537ccf77c44b1c7d74901453f64d6638b95 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Wed, 25 Mar 2020 10:34:22 -0700 Subject: [PATCH 1/4] non-blocking publish --- pubsub/cloud-client/publisher.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/pubsub/cloud-client/publisher.py b/pubsub/cloud-client/publisher.py index df7a9f23fd9..92cd55476f6 100644 --- a/pubsub/cloud-client/publisher.py +++ b/pubsub/cloud-client/publisher.py @@ -175,6 +175,8 @@ def callback(f): def publish_messages_with_batch_settings(project_id, topic_name): """Publishes multiple messages to a Pub/Sub topic with batch settings.""" # [START pubsub_publisher_batch_settings] + import threading + from google.cloud import pubsub_v1 # TODO project_id = "Your Google Cloud Project ID" @@ -183,17 +185,24 @@ def publish_messages_with_batch_settings(project_id, topic_name): # Configure the batch to publish as soon as there is one kilobyte # of data or one second has passed. batch_settings = pubsub_v1.types.BatchSettings( - max_bytes=1024, max_latency=1 # One kilobyte # One second + max_bytes=1024, # One kilobyte + max_latency=1, # One second ) publisher = pubsub_v1.PublisherClient(batch_settings) topic_path = publisher.topic_path(project_id, topic_name) + # Resolve the publish future in a separate thread. + def callback(future): + message_id = future.result() + print(message_id) + for n in range(1, 10): data = u"Message number {}".format(n) # Data must be a bytestring data = data.encode("utf-8") future = publisher.publish(topic_path, data=data) - print(future.result()) + # Non-blocking. Allow the publisher client to batch multiple messages. + future.add_done_callback(callback) print("Published messages with batch settings.") # [END pubsub_publisher_batch_settings] From 412978071a63291b5c04ca647832e2d3ac12bc86 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Wed, 25 Mar 2020 10:37:55 -0700 Subject: [PATCH 2/4] remove unused lib --- pubsub/cloud-client/publisher.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pubsub/cloud-client/publisher.py b/pubsub/cloud-client/publisher.py index 92cd55476f6..24e4fc055a3 100644 --- a/pubsub/cloud-client/publisher.py +++ b/pubsub/cloud-client/publisher.py @@ -175,8 +175,6 @@ def callback(f): def publish_messages_with_batch_settings(project_id, topic_name): """Publishes multiple messages to a Pub/Sub topic with batch settings.""" # [START pubsub_publisher_batch_settings] - import threading - from google.cloud import pubsub_v1 # TODO project_id = "Your Google Cloud Project ID" From 0c57b2ecd2561fc4cd76742044c43d31173adaaf Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Wed, 25 Mar 2020 10:44:38 -0700 Subject: [PATCH 3/4] lint --- pubsub/cloud-client/publisher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pubsub/cloud-client/publisher.py b/pubsub/cloud-client/publisher.py index 24e4fc055a3..ec8e8b4805a 100644 --- a/pubsub/cloud-client/publisher.py +++ b/pubsub/cloud-client/publisher.py @@ -183,8 +183,8 @@ def publish_messages_with_batch_settings(project_id, topic_name): # Configure the batch to publish as soon as there is one kilobyte # of data or one second has passed. batch_settings = pubsub_v1.types.BatchSettings( - max_bytes=1024, # One kilobyte - max_latency=1, # One second + max_bytes=1024, + max_latency=1 ) publisher = pubsub_v1.PublisherClient(batch_settings) topic_path = publisher.topic_path(project_id, topic_name) From e52ff79021925385f617f02c503d71abf7bc77a5 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Wed, 25 Mar 2020 12:17:09 -0700 Subject: [PATCH 4/4] add defaults --- pubsub/cloud-client/publisher.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pubsub/cloud-client/publisher.py b/pubsub/cloud-client/publisher.py index ec8e8b4805a..6802ec85fc1 100644 --- a/pubsub/cloud-client/publisher.py +++ b/pubsub/cloud-client/publisher.py @@ -180,11 +180,12 @@ def publish_messages_with_batch_settings(project_id, topic_name): # TODO project_id = "Your Google Cloud Project ID" # TODO topic_name = "Your Pub/Sub topic name" - # Configure the batch to publish as soon as there is one kilobyte - # of data or one second has passed. + # Configure the batch to publish as soon as there is ten messages, + # one kilobyte of data, or one second has passed. batch_settings = pubsub_v1.types.BatchSettings( - max_bytes=1024, - max_latency=1 + max_messages=10, # default 100 + max_bytes=1024, # default 1 MB + max_latency=1, # default 10 ms ) publisher = pubsub_v1.PublisherClient(batch_settings) topic_path = publisher.topic_path(project_id, topic_name)