diff --git a/pubsub/cloud-client/README.rst b/pubsub/cloud-client/README.rst index c30fd190a23..f27f9438ea9 100644 --- a/pubsub/cloud-client/README.rst +++ b/pubsub/cloud-client/README.rst @@ -74,7 +74,7 @@ To run this sample: .. code-block:: bash - $ python publisher.py + $ python publisher.py --help usage: publisher.py [-h] project_id @@ -124,11 +124,11 @@ To run this sample: .. code-block:: bash - $ python subscriber.py + $ python subscriber.py --help usage: subscriber.py [-h] project_id - {list_in_topic,list_in_project,create,create-push,delete,update,receive,receive-custom-attributes,receive-flow-control,receive-synchronously,receive-synchronously-with-lease,listen_for_errors} + {list-in-topic,list-in-project,create,create-with-dead-letter-policy,create-push,delete,update-push,update-dead-letter-policy,remove-dead-letter-policy,receive,receive-custom-attributes,receive-flow-control,receive-synchronously,receive-synchronously-with-lease,listen-for-errors,receive-messages-with-delivery-attempts} ... This application demonstrates how to perform basic operations on @@ -139,15 +139,21 @@ To run this sample: positional arguments: project_id Your Google Cloud project ID - {list_in_topic,list_in_project,create,create-push,delete,update,receive,receive-custom-attributes,receive-flow-control,receive-synchronously,receive-synchronously-with-lease,listen_for_errors} - list_in_topic Lists all subscriptions for a given topic. - list_in_project Lists all subscriptions in the current project. + {list-in-topic,list-in-project,create,create-with-dead-letter-policy,create-push,delete,update-push,update-dead-letter-policy,remove-dead-letter-policy,receive,receive-custom-attributes,receive-flow-control,receive-synchronously,receive-synchronously-with-lease,listen-for-errors,receive-messages-with-delivery-attempts} + list-in-topic Lists all subscriptions for a given topic. + list-in-project Lists all subscriptions in the current project. create Create a new pull subscription on the given topic. + create-with-dead-letter-policy + Create a subscription with dead letter policy. create-push Create a new push subscription on the given topic. delete Deletes an existing Pub/Sub topic. - update Updates an existing Pub/Sub subscription's push + update-push Updates an existing Pub/Sub subscription's push endpoint URL. Note that certain properties of a subscription, such as its topic, are not modifiable. + update-dead-letter-policy + Update a subscription's dead letter policy. + remove-dead-letter-policy + Remove dead letter policy from a subscription. receive Receives messages from a pull subscription. receive-custom-attributes Receives messages from a pull subscription. @@ -158,8 +164,9 @@ To run this sample: Pulling messages synchronously. receive-synchronously-with-lease Pulling messages synchronously with lease management - listen_for_errors Receives messages and catches errors from a pull + listen-for-errors Receives messages and catches errors from a pull subscription. + receive-messages-with-delivery-attempts optional arguments: -h, --help show this help message and exit diff --git a/pubsub/cloud-client/publisher.py b/pubsub/cloud-client/publisher.py index 6802ec85fc1..9e7820fbf30 100644 --- a/pubsub/cloud-client/publisher.py +++ b/pubsub/cloud-client/publisher.py @@ -29,7 +29,8 @@ def list_topics(project_id): # [START pubsub_list_topics] from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" + # TODO(developer) + # project_id = "your-project-id" publisher = pubsub_v1.PublisherClient() project_path = publisher.project_path(project_id) @@ -45,8 +46,9 @@ def create_topic(project_id, topic_name): # [START pubsub_create_topic] from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO topic_name = "Your Pub/Sub topic name" + # TODO(developer) + # project_id = "your-project-id" + # topic_name = "your-topic-id" publisher = pubsub_v1.PublisherClient() topic_path = publisher.topic_path(project_id, topic_name) @@ -63,8 +65,9 @@ def delete_topic(project_id, topic_name): # [START pubsub_delete_topic] from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO topic_name = "Your Pub/Sub topic name" + # TODO(developer) + # project_id = "your-project-id" + # topic_name = "your-topic-id" publisher = pubsub_v1.PublisherClient() topic_path = publisher.topic_path(project_id, topic_name) @@ -81,8 +84,9 @@ def publish_messages(project_id, topic_name): # [START pubsub_publish] from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO topic_name = "Your Pub/Sub topic name" + # TODO(developer) + # project_id = "your-project-id" + # topic_name = "your-topic-id" publisher = pubsub_v1.PublisherClient() # The `topic_path` method creates a fully qualified identifier @@ -108,8 +112,9 @@ def publish_messages_with_custom_attributes(project_id, topic_name): # [START pubsub_publish_custom_attributes] from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO topic_name = "Your Pub/Sub topic name" + # TODO(developer) + # project_id = "your-project-id" + # topic_name = "your-topic-id" publisher = pubsub_v1.PublisherClient() topic_path = publisher.topic_path(project_id, topic_name) @@ -135,8 +140,9 @@ def publish_messages_with_error_handler(project_id, topic_name): from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO topic_name = "Your Pub/Sub topic name" + # TODO(developer) + # project_id = "your-project-id" + # topic_name = "your-topic-id" publisher = pubsub_v1.PublisherClient() topic_path = publisher.topic_path(project_id, topic_name) @@ -177,8 +183,9 @@ def publish_messages_with_batch_settings(project_id, topic_name): # [START pubsub_publisher_batch_settings] from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO topic_name = "Your Pub/Sub topic name" + # TODO(developer) + # project_id = "your-project-id" + # topic_name = "your-topic-id" # Configure the batch to publish as soon as there is ten messages, # one kilobyte of data, or one second has passed. @@ -212,8 +219,9 @@ def publish_messages_with_retry_settings(project_id, topic_name): # [START pubsub_publisher_retry_settings] from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO topic_name = "Your Pub/Sub topic name" + # TODO(developer) + # project_id = "your-project-id" + # topic_name = "your-topic-id" # Configure the retry settings. Defaults will be overwritten. retry_settings = { @@ -267,8 +275,7 @@ def publish_messages_with_retry_settings(project_id, topic_name): if __name__ == "__main__": parser = argparse.ArgumentParser( - description=__doc__, - formatter_class=argparse.RawDescriptionHelpFormatter, + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("project_id", help="Your Google Cloud project ID") @@ -281,9 +288,7 @@ def publish_messages_with_retry_settings(project_id, topic_name): delete_parser = subparsers.add_parser("delete", help=delete_topic.__doc__) delete_parser.add_argument("topic_name") - publish_parser = subparsers.add_parser( - "publish", help=publish_messages.__doc__ - ) + publish_parser = subparsers.add_parser("publish", help=publish_messages.__doc__) publish_parser.add_argument("topic_name") publish_with_custom_attributes_parser = subparsers.add_parser( @@ -293,8 +298,7 @@ def publish_messages_with_retry_settings(project_id, topic_name): publish_with_custom_attributes_parser.add_argument("topic_name") publish_with_error_handler_parser = subparsers.add_parser( - "publish-with-error-handler", - help=publish_messages_with_error_handler.__doc__, + "publish-with-error-handler", help=publish_messages_with_error_handler.__doc__, ) publish_with_error_handler_parser.add_argument("topic_name") @@ -321,9 +325,7 @@ def publish_messages_with_retry_settings(project_id, topic_name): elif args.command == "publish": publish_messages(args.project_id, args.topic_name) elif args.command == "publish-with-custom-attributes": - publish_messages_with_custom_attributes( - args.project_id, args.topic_name - ) + publish_messages_with_custom_attributes(args.project_id, args.topic_name) elif args.command == "publish-with-error-handler": publish_messages_with_error_handler(args.project_id, args.topic_name) elif args.command == "publish-with-batch-settings": diff --git a/pubsub/cloud-client/subscriber.py b/pubsub/cloud-client/subscriber.py index e22efc7b16f..b5af760aed4 100644 --- a/pubsub/cloud-client/subscriber.py +++ b/pubsub/cloud-client/subscriber.py @@ -29,8 +29,9 @@ def list_subscriptions_in_topic(project_id, topic_name): # [START pubsub_list_topic_subscriptions] from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO topic_name = "Your Pub/Sub topic name" + # TODO(developer) + # project_id = "your-project-id" + # topic_name = "your-topic-id" publisher = pubsub_v1.PublisherClient() topic_path = publisher.topic_path(project_id, topic_name) @@ -45,7 +46,8 @@ def list_subscriptions_in_project(project_id): # [START pubsub_list_subscriptions] from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" + # TODO(developer) + # project_id = "your-project-id" subscriber = pubsub_v1.SubscriberClient() project_path = subscriber.project_path(project_id) @@ -62,19 +64,16 @@ def create_subscription(project_id, topic_name, subscription_name): # [START pubsub_create_pull_subscription] from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO topic_name = "Your Pub/Sub topic name" - # TODO subscription_name = "Your Pub/Sub subscription name" + # TODO(developer) + # project_id = "your-project-id" + # topic_name = "your-topic-id" + # subscription_name = "your-subscription-id" subscriber = pubsub_v1.SubscriberClient() topic_path = subscriber.topic_path(project_id, topic_name) - subscription_path = subscriber.subscription_path( - project_id, subscription_name - ) + subscription_path = subscriber.subscription_path(project_id, subscription_name) - subscription = subscriber.create_subscription( - subscription_path, topic_path - ) + subscription = subscriber.create_subscription(subscription_path, topic_path) print("Subscription created: {}".format(subscription)) @@ -82,23 +81,68 @@ def create_subscription(project_id, topic_name, subscription_name): # [END pubsub_create_pull_subscription] -def create_push_subscription( - project_id, topic_name, subscription_name, endpoint +def create_subscription_with_dead_letter_topic( + project_id, topic_name, subscription_name, dead_letter_topic_name ): + """Create a subscription with dead letter policy.""" + # [START pubsub_dead_letter_create_subscription] + from google.cloud import pubsub_v1 + from google.cloud.pubsub_v1.types import DeadLetterPolicy + + # TODO(developer) + # project_id = "your-project-id" + # endpoint = "https://my-test-project.appspot.com/push" + # TODO(developer): This is an existing topic that the subscription + # with dead letter policy is attached to. + # topic_name = "your-topic-id" + # TODO(developer): This is an existing subscription with a dead letter policy. + # subscription_name = "your-subscription-id" + # TODO(developer): This is an existing dead letter topic that the subscription + # with dead letter policy will forward dead letter messages to. + # dead_letter_topic_name = "your-dead-letter-topic-id" + + subscriber = pubsub_v1.SubscriberClient() + topic_path = subscriber.topic_path(project_id, topic_name) + subscription_path = subscriber.subscription_path(project_id, subscription_name) + dead_letter_topic_path = subscriber.topic_path(project_id, dead_letter_topic_name) + + dead_letter_policy = DeadLetterPolicy( + dead_letter_topic=dead_letter_topic_path, max_delivery_attempts=10 + ) + + with subscriber: + subscription = subscriber.create_subscription( + subscription_path, topic_path, dead_letter_policy=dead_letter_policy + ) + + print("Subscription created: {}".format(subscription.name)) + print( + "It will forward dead letter messages to: {}".format( + subscription.dead_letter_policy.dead_letter_topic + ) + ) + print( + "After {} delivery attempts.".format( + subscription.dead_letter_policy.max_delivery_attempts + ) + ) + # [END pubsub_dead_letter_create_subscription] + + +def create_push_subscription(project_id, topic_name, subscription_name, endpoint): """Create a new push subscription on the given topic.""" # [START pubsub_create_push_subscription] from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO topic_name = "Your Pub/Sub topic name" - # TODO subscription_name = "Your Pub/Sub subscription name" - # TODO endpoint = "https://my-test-project.appspot.com/push" + # TODO(developer) + # project_id = "your-project-id" + # topic_name = "your-topic-id" + # subscription_name = "your-subscription-id" + # endpoint = "https://my-test-project.appspot.com/push" subscriber = pubsub_v1.SubscriberClient() topic_path = subscriber.topic_path(project_id, topic_name) - subscription_path = subscriber.subscription_path( - project_id, subscription_name - ) + subscription_path = subscriber.subscription_path(project_id, subscription_name) push_config = pubsub_v1.types.PushConfig(push_endpoint=endpoint) @@ -118,13 +162,12 @@ def delete_subscription(project_id, subscription_name): # [START pubsub_delete_subscription] from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO subscription_name = "Your Pub/Sub subscription name" + # TODO(developer) + # project_id = "your-project-id" + # subscription_name = "your-subscription-id" subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path( - project_id, subscription_name - ) + subscription_path = subscriber.subscription_path(project_id, subscription_name) subscriber.delete_subscription(subscription_path) @@ -134,7 +177,7 @@ def delete_subscription(project_id, subscription_name): # [END pubsub_delete_subscription] -def update_subscription(project_id, subscription_name, endpoint): +def update_push_subscription(project_id, topic_name, subscription_name, endpoint): """ Updates an existing Pub/Sub subscription's push endpoint URL. Note that certain properties of a subscription, such as @@ -143,26 +186,24 @@ def update_subscription(project_id, subscription_name, endpoint): # [START pubsub_update_push_configuration] from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO topic_name = "Your Pub/Sub topic name" - # TODO subscription_name = "Your Pub/Sub subscription name" - # TODO endpoint = "https://my-test-project.appspot.com/push" + # TODO(developer) + # project_id = "your-project-id" + # topic_name = "your-topic-id" + # subscription_name = "your-subscription-id" + # endpoint = "https://my-test-project.appspot.com/push" subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path( - project_id, subscription_name - ) + subscription_path = subscriber.subscription_path(project_id, subscription_name) push_config = pubsub_v1.types.PushConfig(push_endpoint=endpoint) subscription = pubsub_v1.types.Subscription( - name=subscription_path, push_config=push_config + name=subscription_path, topic=topic_name, push_config=push_config ) update_mask = {"paths": {"push_config"}} - subscriber.update_subscription(subscription, update_mask) - result = subscriber.get_subscription(subscription_path) + result = subscriber.update_subscription(subscription, update_mask) print("Subscription updated: {}".format(subscription_path)) print("New endpoint for subscription is: {}".format(result.push_config)) @@ -171,31 +212,126 @@ def update_subscription(project_id, subscription_name, endpoint): # [END pubsub_update_push_configuration] +def update_subscription_with_dead_letter_policy( + project_id, topic_name, subscription_name, dead_letter_topic_name +): + """Update a subscription's dead letter policy.""" + # [START pubsub_dead_letter_update_subscription] + from google.cloud import pubsub_v1 + from google.cloud.pubsub_v1.types import DeadLetterPolicy, FieldMask + + # TODO(developer) + # project_id = "your-project-id" + # TODO(developer): This is an existing topic that the subscription + # with dead letter policy is attached to. + # topic_name = "your-topic-name" + # TODO(developer): This is an existing subscription with a dead letter policy. + # subscription_name = "your-subscription-id" + # TODO(developer): This is an existing dead letter topic that the subscription + # with dead letter policy will forward dead letter messages to. + # dead_letter_topic_name = "your-dead-letter-topic-id" + + subscriber = pubsub_v1.SubscriberClient() + topic_path = subscriber.topic_path(project_id, topic_name) + subscription_path = subscriber.subscription_path(project_id, subscription_name) + dead_letter_topic_path = subscriber.topic_path(project_id, dead_letter_topic_name) + + subscription_before_update = subscriber.get_subscription(subscription_path) + print("Before the update: {}".format(subscription_before_update)) + + # Indicates which fields in the provided subscription to update. + update_mask = FieldMask(paths=["dead_letter_policy.max_delivery_attempts"]) + + # Construct a dead letter policy you expect to have after the update. + dead_letter_policy = DeadLetterPolicy( + dead_letter_topic=dead_letter_topic_path, max_delivery_attempts=20 + ) + + # Construct the subscription with the dead letter policy you expect to have + # after the update. Here, values in the required fields (name, topic) help + # identify the subscription. + subscription = pubsub_v1.types.Subscription( + name=subscription_path, topic=topic_path, dead_letter_policy=dead_letter_policy, + ) + + with subscriber: + subscription_after_update = subscriber.update_subscription( + subscription, update_mask + ) + + print("After the update: {}".format(subscription_after_update)) + # [END pubsub_dead_letter_update_subscription] + return subscription_after_update + + +def remove_dead_letter_policy(project_id, topic_name, subscription_name): + """Remove dead letter policy from a subscription.""" + # [START pubsub_dead_letter_remove] + from google.cloud import pubsub_v1 + from google.cloud.pubsub_v1.types import FieldMask + + # TODO(developer) + # project_id = "your-project-id" + # TODO(developer): This is an existing topic that the subscription + # with dead letter policy is attached to. + # topic_name = "your-topic-name" + # TODO(developer): This is an existing subscription with a dead letter policy. + # subscription_name = "your-subscription-id" + + subscriber = pubsub_v1.SubscriberClient() + topic_path = subscriber.topic_path(project_id, topic_name) + subscription_path = subscriber.subscription_path(project_id, subscription_name) + + subscription_before_update = subscriber.get_subscription(subscription_path) + print("Before removing the policy: {}".format(subscription_before_update)) + + # Indicates which fields in the provided subscription to update. + update_mask = FieldMask( + paths=[ + "dead_letter_policy.dead_letter_topic", + "dead_letter_policy.max_delivery_attempts", + ] + ) + + # Construct the subscription (without any dead letter policy) that you + # expect to have after the update. + subscription = pubsub_v1.types.Subscription( + name=subscription_path, topic=topic_path + ) + + with subscriber: + subscription_after_update = subscriber.update_subscription( + subscription, update_mask + ) + + print("After removing the policy: {}".format(subscription_after_update)) + # [END pubsub_dead_letter_remove] + return subscription_after_update + + def receive_messages(project_id, subscription_name, timeout=None): """Receives messages from a pull subscription.""" # [START pubsub_subscriber_async_pull] # [START pubsub_quickstart_subscriber] + from concurrent.futures import TimeoutError from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO subscription_name = "Your Pub/Sub subscription name" - # TODO timeout = 5.0 # "How long the subscriber should listen for - # messages in seconds" + # TODO(developer) + # project_id = "your-project-id" + # subscription_name = "your-subscription-id" + # Number of seconds the subscriber should listen for messages + # timeout = 5.0 subscriber = pubsub_v1.SubscriberClient() # The `subscription_path` method creates a fully qualified identifier # in the form `projects/{project_id}/subscriptions/{subscription_name}` - subscription_path = subscriber.subscription_path( - project_id, subscription_name - ) + subscription_path = subscriber.subscription_path(project_id, subscription_name) def callback(message): print("Received message: {}".format(message)) message.ack() - streaming_pull_future = subscriber.subscribe( - subscription_path, callback=callback - ) + streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) print("Listening for messages on {}..\n".format(subscription_path)) # Wrap subscriber in a 'with' block to automatically call close() when done. @@ -204,7 +340,7 @@ def callback(message): # When `timeout` is not set, result() will block indefinitely, # unless an exception is encountered first. streaming_pull_future.result(timeout=timeout) - except: # noqa + except TimeoutError: streaming_pull_future.cancel() # [END pubsub_subscriber_async_pull] # [END pubsub_quickstart_subscriber] @@ -216,17 +352,17 @@ def receive_messages_with_custom_attributes( """Receives messages from a pull subscription.""" # [START pubsub_subscriber_sync_pull_custom_attributes] # [START pubsub_subscriber_async_pull_custom_attributes] + from concurrent.futures import TimeoutError from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO subscription_name = "Your Pub/Sub subscription name" - # TODO timeout = 5.0 # "How long the subscriber should listen for - # messages in seconds" + # TODO(developer) + # project_id = "your-project-id" + # subscription_name = "your-subscription-id" + # Number of seconds the subscriber should listen for messages + # timeout = 5.0 subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path( - project_id, subscription_name - ) + subscription_path = subscriber.subscription_path(project_id, subscription_name) def callback(message): print("Received message: {}".format(message.data)) @@ -237,9 +373,7 @@ def callback(message): print("{}: {}".format(key, value)) message.ack() - streaming_pull_future = subscriber.subscribe( - subscription_path, callback=callback - ) + streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) print("Listening for messages on {}..\n".format(subscription_path)) # Wrap subscriber in a 'with' block to automatically call close() when done. @@ -248,28 +382,26 @@ def callback(message): # When `timeout` is not set, result() will block indefinitely, # unless an exception is encountered first. streaming_pull_future.result(timeout=timeout) - except: # noqa + except TimeoutError: streaming_pull_future.cancel() # [END pubsub_subscriber_async_pull_custom_attributes] # [END pubsub_subscriber_sync_pull_custom_attributes] -def receive_messages_with_flow_control( - project_id, subscription_name, timeout=None -): +def receive_messages_with_flow_control(project_id, subscription_name, timeout=None): """Receives messages from a pull subscription with flow control.""" # [START pubsub_subscriber_flow_settings] + from concurrent.futures import TimeoutError from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO subscription_name = "Your Pub/Sub subscription name" - # TODO timeout = 5.0 # "How long the subscriber should listen for - # messages in seconds" + # TODO(developer) + # project_id = "your-project-id" + # subscription_name = "your-subscription-id" + # Number of seconds the subscriber should listen for messages + # timeout = 5.0 subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path( - project_id, subscription_name - ) + subscription_path = subscriber.subscription_path(project_id, subscription_name) def callback(message): print("Received message: {}".format(message.data)) @@ -289,7 +421,7 @@ def callback(message): # When `timeout` is not set, result() will block indefinitely, # unless an exception is encountered first. streaming_pull_future.result(timeout=timeout) - except: # noqa + except TimeoutError: streaming_pull_future.cancel() # [END pubsub_subscriber_flow_settings] @@ -299,13 +431,12 @@ def synchronous_pull(project_id, subscription_name): # [START pubsub_subscriber_sync_pull] from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO subscription_name = "Your Pub/Sub subscription name" + # TODO(developer) + # project_id = "your-project-id" + # subscription_name = "your-subscription-id" subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path( - project_id, subscription_name - ) + subscription_path = subscriber.subscription_path(project_id, subscription_name) NUM_MESSAGES = 3 @@ -340,13 +471,12 @@ def synchronous_pull_with_lease_management(project_id, subscription_name): from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO subscription_name = "Your Pub/Sub subscription name" + # TODO(developer) + # project_id = "your-project-id" + # subscription_name = "your-subscription-id" subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path( - project_id, subscription_name - ) + subscription_path = subscriber.subscription_path(project_id, subscription_name) NUM_MESSAGES = 2 ACK_DEADLINE = 30 @@ -385,15 +515,11 @@ def worker(msg): if process.is_alive(): # `ack_deadline_seconds` must be between 10 to 600. subscriber.modify_ack_deadline( - subscription_path, - [ack_id], - ack_deadline_seconds=ACK_DEADLINE, + subscription_path, [ack_id], ack_deadline_seconds=ACK_DEADLINE, ) logger.info( "{}: Reset ack deadline for {} for {}s".format( - time.strftime("%X", time.gmtime()), - msg_data, - ACK_DEADLINE, + time.strftime("%X", time.gmtime()), msg_data, ACK_DEADLINE, ) ) @@ -426,23 +552,20 @@ def listen_for_errors(project_id, subscription_name, timeout=None): # [START pubsub_subscriber_error_listener] from google.cloud import pubsub_v1 - # TODO project_id = "Your Google Cloud Project ID" - # TODO subscription_name = "Your Pubsub subscription name" - # TODO timeout = 5.0 # "How long the subscriber should listen for - # messages in seconds" + # TODO(developer) + # project_id = "Your Google Cloud Project ID" + # subscription_name = "Your Pubsub subscription name" + # Number of seconds the subscriber should listen for messages + # timeout = 5.0 subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path( - project_id, subscription_name - ) + subscription_path = subscriber.subscription_path(project_id, subscription_name) def callback(message): print("Received message: {}".format(message)) message.ack() - streaming_pull_future = subscriber.subscribe( - subscription_path, callback=callback - ) + streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) print("Listening for messages on {}..\n".format(subscription_path)) # Wrap subscriber in a 'with' block to automatically call close() when done. @@ -461,29 +584,67 @@ def callback(message): # [END pubsub_subscriber_error_listener] +def receive_messages_with_delivery_attempts( + project_id, subscription_name, timeout=None +): + # [START pubsub_dead_letter_delivery_attempt] + from concurrent.futures import TimeoutError + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # subscription_name = "your-subscription-id" + + subscriber = pubsub_v1.SubscriberClient() + subscription_path = subscriber.subscription_path(project_id, subscription_name) + + def callback(message): + print("Received message: {}".format(message)) + print("With delivery attempts: {}".format(message.delivery_attempt)) + message.ack() + + streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) + print("Listening for messages on {}..\n".format(subscription_path)) + + # Wrap subscriber in a 'with' block to automatically call close() when done. + with subscriber: + # When `timeout` is not set, result() will block indefinitely, + # unless an exception is encountered first. + try: + streaming_pull_future.result(timeout=timeout) + except TimeoutError: + streaming_pull_future.cancel() + # [END pubsub_dead_letter_delivery_attempt] + + if __name__ == "__main__": parser = argparse.ArgumentParser( - description=__doc__, - formatter_class=argparse.RawDescriptionHelpFormatter, + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("project_id", help="Your Google Cloud project ID") subparsers = parser.add_subparsers(dest="command") list_in_topic_parser = subparsers.add_parser( - "list_in_topic", help=list_subscriptions_in_topic.__doc__ + "list-in-topic", help=list_subscriptions_in_topic.__doc__ ) list_in_topic_parser.add_argument("topic_name") list_in_project_parser = subparsers.add_parser( - "list_in_project", help=list_subscriptions_in_project.__doc__ + "list-in-project", help=list_subscriptions_in_project.__doc__ ) - create_parser = subparsers.add_parser( - "create", help=create_subscription.__doc__ - ) + create_parser = subparsers.add_parser("create", help=create_subscription.__doc__) create_parser.add_argument("topic_name") create_parser.add_argument("subscription_name") + create_with_dead_letter_policy_parser = subparsers.add_parser( + "create-with-dead-letter-policy", + help=create_subscription_with_dead_letter_topic.__doc__, + ) + create_with_dead_letter_policy_parser.add_argument("topic_name") + create_with_dead_letter_policy_parser.add_argument("subscription_name") + create_with_dead_letter_policy_parser.add_argument("dead_letter_topic_name") + create_push_parser = subparsers.add_parser( "create-push", help=create_push_subscription.__doc__ ) @@ -491,20 +652,31 @@ def callback(message): create_push_parser.add_argument("subscription_name") create_push_parser.add_argument("endpoint") - delete_parser = subparsers.add_parser( - "delete", help=delete_subscription.__doc__ - ) + delete_parser = subparsers.add_parser("delete", help=delete_subscription.__doc__) delete_parser.add_argument("subscription_name") - update_parser = subparsers.add_parser( - "update", help=update_subscription.__doc__ + update_push_parser = subparsers.add_parser( + "update-push", help=update_push_subscription.__doc__ + ) + update_push_parser.add_argument("topic_name") + update_push_parser.add_argument("subscription_name") + update_push_parser.add_argument("endpoint") + + update_dead_letter_policy_parser = subparsers.add_parser( + "update-dead-letter-policy", + help=update_subscription_with_dead_letter_policy.__doc__, ) - update_parser.add_argument("subscription_name") - update_parser.add_argument("endpoint") + update_dead_letter_policy_parser.add_argument("topic_name") + update_dead_letter_policy_parser.add_argument("subscription_name") + update_dead_letter_policy_parser.add_argument("dead_letter_topic_name") - receive_parser = subparsers.add_parser( - "receive", help=receive_messages.__doc__ + remove_dead_letter_policy_parser = subparsers.add_parser( + "remove-dead-letter-policy", help=remove_dead_letter_policy.__doc__ ) + remove_dead_letter_policy_parser.add_argument("topic_name") + remove_dead_letter_policy_parser.add_argument("subscription_name") + + receive_parser = subparsers.add_parser("receive", help=receive_messages.__doc__) receive_parser.add_argument("subscription_name") receive_parser.add_argument("--timeout", default=None, type=float) @@ -521,9 +693,7 @@ def callback(message): "receive-flow-control", help=receive_messages_with_flow_control.__doc__ ) receive_with_flow_control_parser.add_argument("subscription_name") - receive_with_flow_control_parser.add_argument( - "--timeout", default=None, type=float - ) + receive_with_flow_control_parser.add_argument("--timeout", default=None, type=float) synchronous_pull_parser = subparsers.add_parser( "receive-synchronously", help=synchronous_pull.__doc__ @@ -534,40 +704,58 @@ def callback(message): "receive-synchronously-with-lease", help=synchronous_pull_with_lease_management.__doc__, ) - synchronous_pull_with_lease_management_parser.add_argument( - "subscription_name" - ) + synchronous_pull_with_lease_management_parser.add_argument("subscription_name") listen_for_errors_parser = subparsers.add_parser( - "listen_for_errors", help=listen_for_errors.__doc__ + "listen-for-errors", help=listen_for_errors.__doc__ ) listen_for_errors_parser.add_argument("subscription_name") - listen_for_errors_parser.add_argument( + listen_for_errors_parser.add_argument("--timeout", default=None, type=float) + + receive_messages_with_delivery_attempts_parser = subparsers.add_parser( + "receive-messages-with-delivery-attempts", + help=receive_messages_with_delivery_attempts.__doc__, + ) + receive_messages_with_delivery_attempts_parser.add_argument("subscription_name") + receive_messages_with_delivery_attempts_parser.add_argument( "--timeout", default=None, type=float ) args = parser.parse_args() - if args.command == "list_in_topic": + if args.command == "list-in-topic": list_subscriptions_in_topic(args.project_id, args.topic_name) - elif args.command == "list_in_project": + elif args.command == "list-in-project": list_subscriptions_in_project(args.project_id) elif args.command == "create": - create_subscription( - args.project_id, args.topic_name, args.subscription_name + create_subscription(args.project_id, args.topic_name, args.subscription_name) + elif args.command == "create-with-dead-letter-policy": + create_subscription_with_dead_letter_topic( + args.project_id, + args.topic_name, + args.subscription_name, + args.dead_letter_topic_name, ) elif args.command == "create-push": create_push_subscription( + args.project_id, args.topic_name, args.subscription_name, args.endpoint, + ) + elif args.command == "delete": + delete_subscription(args.project_id, args.subscription_name) + elif args.command == "update-push": + update_push_subscription( + args.project_id, args.topic_name, args.subscription_name, args.endpoint, + ) + elif args.command == "update-dead-letter-policy": + update_subscription_with_dead_letter_policy( args.project_id, args.topic_name, args.subscription_name, - args.endpoint, + args.dead_letter_topic_name, ) - elif args.command == "delete": - delete_subscription(args.project_id, args.subscription_name) - elif args.command == "update": - update_subscription( - args.project_id, args.subscription_name, args.endpoint + elif args.command == "remove-dead-letter-policy": + remove_dead_letter_policy( + args.project_id, args.topic_name, args.subscription_name ) elif args.command == "receive": receive_messages(args.project_id, args.subscription_name, args.timeout) @@ -582,10 +770,10 @@ def callback(message): elif args.command == "receive-synchronously": synchronous_pull(args.project_id, args.subscription_name) elif args.command == "receive-synchronously-with-lease": - synchronous_pull_with_lease_management( - args.project_id, args.subscription_name - ) - elif args.command == "listen_for_errors": - listen_for_errors( + synchronous_pull_with_lease_management(args.project_id, args.subscription_name) + elif args.command == "listen-for-errors": + listen_for_errors(args.project_id, args.subscription_name, args.timeout) + elif args.command == "receive-messages-with-delivery-attempts": + receive_messages_with_delivery_attempts( args.project_id, args.subscription_name, args.timeout ) diff --git a/pubsub/cloud-client/subscriber_test.py b/pubsub/cloud-client/subscriber_test.py index 1c9520866f1..6b90396f942 100644 --- a/pubsub/cloud-client/subscriber_test.py +++ b/pubsub/cloud-client/subscriber_test.py @@ -24,9 +24,11 @@ UUID = uuid.uuid4().hex PROJECT = os.environ["GCLOUD_PROJECT"] TOPIC = "subscription-test-topic-" + UUID +DEAD_LETTER_TOPIC = "subscription-test-dead-letter-topic-" + UUID SUBSCRIPTION_ADMIN = "subscription-test-subscription-admin-" + UUID SUBSCRIPTION_ASYNC = "subscription-test-subscription-async-" + UUID SUBSCRIPTION_SYNC = "subscription-test-subscription-sync-" + UUID +SUBSCRIPTION_DLQ = "subscription-test-subscription-dlq-" + UUID ENDPOINT = "https://{}.appspot.com/push".format(PROJECT) NEW_ENDPOINT = "https://{}.appspot.com/push2".format(PROJECT) @@ -41,13 +43,27 @@ def topic(publisher_client): topic_path = publisher_client.topic_path(PROJECT, TOPIC) try: - subscription = publisher_client.get_topic(topic_path) + topic = publisher_client.get_topic(topic_path) except: # noqa - subscription = publisher_client.create_topic(topic_path) + topic = publisher_client.create_topic(topic_path) - yield subscription.name + yield topic.name + + publisher_client.delete_topic(topic.name) + + +@pytest.fixture(scope="module") +def dead_letter_topic(publisher_client): + topic_path = publisher_client.topic_path(PROJECT, DEAD_LETTER_TOPIC) + + try: + dead_letter_topic = publisher_client.get_topic(topic_path) + except: # noqa + dead_letter_topic = publisher_client.create_topic(topic_path) + + yield dead_letter_topic.name - publisher_client.delete_topic(subscription.name) + publisher_client.delete_topic(dead_letter_topic.name) @pytest.fixture(scope="module") @@ -59,9 +75,7 @@ def subscriber_client(): @pytest.fixture(scope="module") def subscription_admin(subscriber_client, topic): - subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_ADMIN - ) + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_ADMIN) try: subscription = subscriber_client.get_subscription(subscription_path) @@ -75,9 +89,7 @@ def subscription_admin(subscriber_client, topic): @pytest.fixture(scope="module") def subscription_sync(subscriber_client, topic): - subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_SYNC - ) + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_SYNC) try: subscription = subscriber_client.get_subscription(subscription_path) @@ -93,9 +105,23 @@ def subscription_sync(subscriber_client, topic): @pytest.fixture(scope="module") def subscription_async(subscriber_client, topic): - subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_ASYNC - ) + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_ASYNC) + + try: + subscription = subscriber_client.get_subscription(subscription_path) + except: # noqa + subscription = subscriber_client.create_subscription( + subscription_path, topic=topic + ) + + yield subscription.name + + subscriber_client.delete_subscription(subscription.name) + + +@pytest.fixture(scope="module") +def subscription_dlq(subscriber_client, topic): + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_DLQ) try: subscription = subscriber_client.get_subscription(subscription_path) @@ -130,9 +156,7 @@ def eventually_consistent_test(): def test_create(subscriber_client): - subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_ADMIN - ) + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_ADMIN) try: subscriber_client.delete_subscription(subscription_path) @@ -148,19 +172,36 @@ def eventually_consistent_test(): eventually_consistent_test() -def test_create_push(subscriber_client): - subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_ADMIN - ) +def test_create_subscription_with_dead_letter_policy( + subscriber_client, publisher_client, topic, dead_letter_topic, capsys +): + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_DLQ) + dead_letter_topic_path = publisher_client.topic_path(PROJECT, DEAD_LETTER_TOPIC) + try: subscriber_client.delete_subscription(subscription_path) except Exception: pass - subscriber.create_push_subscription( - PROJECT, TOPIC, SUBSCRIPTION_ADMIN, ENDPOINT + subscriber.create_subscription_with_dead_letter_topic( + PROJECT, TOPIC, SUBSCRIPTION_DLQ, DEAD_LETTER_TOPIC ) + out, _ = capsys.readouterr() + assert "Subscription created: " + subscription_path in out + assert "It will forward dead letter messages to: " + dead_letter_topic_path in out + assert "After 10 delivery attempts." in out + + +def test_create_push(subscriber_client): + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_ADMIN) + try: + subscriber_client.delete_subscription(subscription_path) + except Exception: + pass + + subscriber.create_push_subscription(PROJECT, TOPIC, SUBSCRIPTION_ADMIN, ENDPOINT) + @backoff.on_exception(backoff.expo, AssertionError, max_time=60) def eventually_consistent_test(): assert subscriber_client.get_subscription(subscription_path) @@ -169,12 +210,25 @@ def eventually_consistent_test(): def test_update(subscriber_client, subscription_admin, capsys): - subscriber.update_subscription(PROJECT, SUBSCRIPTION_ADMIN, NEW_ENDPOINT) + subscriber.update_push_subscription( + PROJECT, TOPIC, SUBSCRIPTION_ADMIN, NEW_ENDPOINT + ) out, _ = capsys.readouterr() assert "Subscription updated" in out +def test_update_dead_letter_policy( + subscriber_client, topic, subscription_dlq, dead_letter_topic, capsys +): + _ = subscriber.update_subscription_with_dead_letter_policy( + PROJECT, TOPIC, SUBSCRIPTION_DLQ, DEAD_LETTER_TOPIC + ) + + out, _ = capsys.readouterr() + assert "max_delivery_attempts: 20" in out + + def test_delete(subscriber_client, subscription_admin): subscriber.delete_subscription(PROJECT, SUBSCRIPTION_ADMIN) @@ -212,9 +266,7 @@ def test_receive_with_custom_attributes( _publish_messages(publisher_client, topic) - subscriber.receive_messages_with_custom_attributes( - PROJECT, SUBSCRIPTION_ASYNC, 5 - ) + subscriber.receive_messages_with_custom_attributes(PROJECT, SUBSCRIPTION_ASYNC, 5) out, _ = capsys.readouterr() assert "message" in out @@ -222,15 +274,11 @@ def test_receive_with_custom_attributes( assert "python-sample" in out -def test_receive_with_flow_control( - publisher_client, topic, subscription_async, capsys -): +def test_receive_with_flow_control(publisher_client, topic, subscription_async, capsys): _publish_messages(publisher_client, topic) - subscriber.receive_messages_with_flow_control( - PROJECT, SUBSCRIPTION_ASYNC, 5 - ) + subscriber.receive_messages_with_flow_control(PROJECT, SUBSCRIPTION_ASYNC, 5) out, _ = capsys.readouterr() assert "Listening" in out @@ -238,9 +286,7 @@ def test_receive_with_flow_control( assert "message" in out -def test_receive_synchronously( - publisher_client, topic, subscription_sync, capsys -): +def test_receive_synchronously(publisher_client, topic, subscription_sync, capsys): _publish_messages(publisher_client, topic) subscriber.synchronous_pull(PROJECT, SUBSCRIPTION_SYNC) @@ -254,17 +300,13 @@ def test_receive_synchronously_with_lease( ): _publish_messages(publisher_client, topic) - subscriber.synchronous_pull_with_lease_management( - PROJECT, SUBSCRIPTION_SYNC - ) + subscriber.synchronous_pull_with_lease_management(PROJECT, SUBSCRIPTION_SYNC) out, _ = capsys.readouterr() assert "Done." in out -def test_listen_for_errors( - publisher_client, topic, subscription_async, capsys -): +def test_listen_for_errors(publisher_client, topic, subscription_async, capsys): _publish_messages(publisher_client, topic) @@ -274,3 +316,26 @@ def test_listen_for_errors( assert "Listening" in out assert subscription_async in out assert "threw an exception" in out + + +def test_receive_with_delivery_attempts( + publisher_client, topic, subscription_dlq, dead_letter_topic, capsys +): + _publish_messages(publisher_client, topic) + + subscriber.receive_messages_with_delivery_attempts(PROJECT, SUBSCRIPTION_DLQ, 10) + + out, _ = capsys.readouterr() + assert "Listening" in out + assert subscription_dlq in out + assert "Received message: " in out + assert "message 4" in out + assert "With delivery attempts: " in out + + +def test_remove_dead_letter_policy(subscriber_client, subscription_dlq): + subscription_after_update = subscriber.remove_dead_letter_policy( + PROJECT, TOPIC, SUBSCRIPTION_DLQ + ) + + assert subscription_after_update.dead_letter_policy.dead_letter_topic == ""