Skip to content

Commit 628ee9a

Browse files
authored
Merge pull request #1950 from daspecster/enable-pubsub-bundling
Re-enable bundling (for consideration)
2 parents 7d29482 + da259a8 commit 628ee9a

File tree

3 files changed

+47
-8
lines changed

3 files changed

+47
-8
lines changed

gcloud/_testing.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,17 @@ def __init__(self, items, page_token):
5959
def next(self):
6060
items, self._items = self._items, None
6161
return items
62+
63+
64+
class _GAXBundlingEvent(object):
65+
66+
result = None
67+
68+
def __init__(self, result):
69+
self._result = result
70+
71+
def is_set(self):
72+
return self.result is not None
73+
74+
def wait(self, *_):
75+
self.result = self._result

gcloud/pubsub/_gax.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,17 +162,17 @@ def topic_publish(self, topic_path, messages):
162162
:raises: :exc:`gcloud.exceptions.NotFound` if the topic does not
163163
exist
164164
"""
165-
options = CallOptions(is_bundling=False)
166165
message_pbs = [_message_pb_from_dict(message)
167166
for message in messages]
168167
try:
169-
result = self._gax_api.publish(topic_path, message_pbs,
170-
options=options)
168+
event = self._gax_api.publish(topic_path, message_pbs)
169+
if not event.is_set():
170+
event.wait()
171171
except GaxError as exc:
172172
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
173173
raise NotFound(topic_path)
174174
raise
175-
return result.message_ids
175+
return event.result.message_ids
176176

177177
def topic_list_subscriptions(self, topic_path, page_size=0,
178178
page_token=None):

gcloud/pubsub/test__gax.py

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -204,12 +204,15 @@ def test_topic_delete_error(self):
204204

205205
def test_topic_publish_hit(self):
206206
import base64
207+
from gcloud._testing import _GAXBundlingEvent
207208
PAYLOAD = b'This is the message text'
208209
B64 = base64.b64encode(PAYLOAD).decode('ascii')
209210
MSGID = 'DEADBEEF'
210211
MESSAGE = {'data': B64, 'attributes': {}}
211212
response = _PublishResponsePB([MSGID])
212-
gax_api = _GAXPublisherAPI(_publish_response=response)
213+
event = _GAXBundlingEvent(response)
214+
event.wait() # already received result
215+
gax_api = _GAXPublisherAPI(_publish_response=event)
213216
api = self._makeOne(gax_api)
214217

215218
resource = api.topic_publish(self.TOPIC_PATH, [MESSAGE])
@@ -220,7 +223,29 @@ def test_topic_publish_hit(self):
220223
message_pb, = message_pbs
221224
self.assertEqual(message_pb.data, B64)
222225
self.assertEqual(message_pb.attributes, {})
223-
self.assertEqual(options.is_bundling, False)
226+
self.assertEqual(options, None)
227+
228+
def test_topic_publish_hit_with_wait(self):
229+
import base64
230+
from gcloud._testing import _GAXBundlingEvent
231+
PAYLOAD = b'This is the message text'
232+
B64 = base64.b64encode(PAYLOAD).decode('ascii')
233+
MSGID = 'DEADBEEF'
234+
MESSAGE = {'data': B64, 'attributes': {}}
235+
response = _PublishResponsePB([MSGID])
236+
event = _GAXBundlingEvent(response)
237+
gax_api = _GAXPublisherAPI(_publish_response=event)
238+
api = self._makeOne(gax_api)
239+
240+
resource = api.topic_publish(self.TOPIC_PATH, [MESSAGE])
241+
242+
self.assertEqual(resource, [MSGID])
243+
topic_path, message_pbs, options = gax_api._publish_called_with
244+
self.assertEqual(topic_path, self.TOPIC_PATH)
245+
message_pb, = message_pbs
246+
self.assertEqual(message_pb.data, B64)
247+
self.assertEqual(message_pb.attributes, {})
248+
self.assertEqual(options, None)
224249

225250
def test_topic_publish_miss_w_attrs_w_bytes_payload(self):
226251
import base64
@@ -239,7 +264,7 @@ def test_topic_publish_miss_w_attrs_w_bytes_payload(self):
239264
message_pb, = message_pbs
240265
self.assertEqual(message_pb.data, B64)
241266
self.assertEqual(message_pb.attributes, {'foo': 'bar'})
242-
self.assertEqual(options.is_bundling, False)
267+
self.assertEqual(options, None)
243268

244269
def test_topic_publish_error(self):
245270
import base64
@@ -258,7 +283,7 @@ def test_topic_publish_error(self):
258283
message_pb, = message_pbs
259284
self.assertEqual(message_pb.data, B64)
260285
self.assertEqual(message_pb.attributes, {})
261-
self.assertEqual(options.is_bundling, False)
286+
self.assertEqual(options, None)
262287

263288
def test_topic_list_subscriptions_no_paging(self):
264289
from google.gax import INITIAL_PAGE

0 commit comments

Comments
 (0)