diff --git a/google/gax/__init__.py b/google/gax/__init__.py index 1413313..450fb55 100644 --- a/google/gax/__init__.py +++ b/google/gax/__init__.py @@ -33,7 +33,11 @@ import collections -__version__ = '0.10.1' +__version__ = '0.10.2' + + +INITIAL_PAGE = object() +"""A placeholder for the page token passed into an initial paginated request.""" OPTION_INHERIT = object() @@ -48,7 +52,8 @@ class CallSettings(object): """Encapsulates the call settings for an API call.""" # pylint: disable=too-few-public-methods def __init__(self, timeout=30, retry=None, page_descriptor=None, - bundler=None, bundle_descriptor=None): + flatten_pages=None, page_token=None, bundler=None, + bundle_descriptor=None): """Constructor. Args: @@ -59,6 +64,14 @@ def __init__(self, timeout=30, retry=None, page_descriptor=None, page_descriptor (:class:`PageDescriptor`): indicates the structure of page streaming to be performed. If set to None, page streaming is disabled. + flatten_pages (bool): If there is no ``page_descriptor``, this + attrbute has no meaning. Otherwise, determines whether a page + streamed response should make the page structure transparent to + the user by flattening the repeated field in the returned + generator. + page_token (str): If there is no ``page_descriptor``, this attribute + has no meaning. Otherwise, determines the page token used in the + page streaming request. bundler (:class:`gax.bundling.Executor`): orchestrates bundling. If None, bundling is not performed. bundle_descriptor (:class:`BundleDescriptor`): indicates the @@ -67,12 +80,18 @@ def __init__(self, timeout=30, retry=None, page_descriptor=None, self.timeout = timeout self.retry = retry self.page_descriptor = page_descriptor + self.flatten_pages = flatten_pages + self.page_token = page_token self.bundler = bundler self.bundle_descriptor = bundle_descriptor def merge(self, options): """Returns a new CallSettings merged from this and a CallOptions object. + Note that passing if the CallOptions instance specifies a page_token, + the merged CallSettings will have ``flatten_pages`` disabled. This + permits toggling per-resource/per-page page streaming. + Args: options (:class:`CallOptions`): an instance whose values override those in this object. If None, ``merge`` returns a copy of this @@ -97,10 +116,12 @@ def merge(self, options): else: retry = options.retry - if options.is_page_streaming: - page_descriptor = self.page_descriptor + if options.page_token == OPTION_INHERIT: + flatten_pages = self.flatten_pages + page_token = self.page_token else: - page_descriptor = None + flatten_pages = False + page_token = options.page_token if options.is_bundling: bundler = self.bundler @@ -109,7 +130,8 @@ def merge(self, options): return CallSettings( timeout=timeout, retry=retry, - page_descriptor=page_descriptor, bundler=bundler, + page_descriptor=self.page_descriptor, page_token=page_token, + flatten_pages=flatten_pages, bundler=bundler, bundle_descriptor=self.bundle_descriptor) @@ -124,7 +146,7 @@ class CallOptions(object): """ # pylint: disable=too-few-public-methods def __init__(self, timeout=OPTION_INHERIT, retry=OPTION_INHERIT, - is_page_streaming=OPTION_INHERIT, is_bundling=False): + page_token=OPTION_INHERIT, is_bundling=False): """Constructor. Example: @@ -144,14 +166,17 @@ def __init__(self, timeout=OPTION_INHERIT, retry=OPTION_INHERIT, timeout (int): The client-side timeout for API calls. retry (:class:`RetryOptions`): determines whether and how to retry on transient errors. When set to None, the call will not retry. - is_page_streaming (bool): If set and the call is configured for page - streaming, page streaming is performed. + page_token (str): If set and the call is configured for page + streaming, page streaming is performed per-page, starting with + this page_token. Use ``INITIAL_PAGE`` for the first request. + If unset and the call is configured for page streaming, page + streaming is performed per-resource. is_bundling (bool): If set and the call is configured for bundling, bundling is performed. Bundling is always disabled by default. """ self.timeout = timeout self.retry = retry - self.is_page_streaming = is_page_streaming + self.page_token = page_token self.is_bundling = is_bundling @@ -340,3 +365,63 @@ def __new__(cls, request_byte_threshold, request_byte_limit, delay_threshold) + + +class PageIterator(object): + """An iterator over the pages of a page streaming API call. + + Provides access to the individual pages of the call, as well as the page + token. + + Attributes: + response: The full response message for the call most recently made, or + None if a call has not yet been made. + page_token: The page token to be passed in the request for the next call + to be made. + """ + # pylint: disable=too-few-public-methods + def __init__(self, api_call, page_descriptor, page_token, request, **kwargs): + """Constructor. + + Args: + api_call (callable[[req], resp]): an API call that is page + streaming. + page_descriptor (:class:`PageDescriptor`): indicates the structure + of page streaming to be performed. + page_token (str): The page token to be passed to API call request. + If no page token has yet been acquired, this field should be set + to ``INITIAL_PAGE``. + request (object): The request to be passed to the API call. The page + token field of the request is overwritten by the ``page_token`` + passed to the constructor, unless ``page_token`` is + ``INITIAL_PAGE``. + **kwargs: Arbitrary keyword arguments to be passed to the API call. + + Returns: + A PageIterator object. + """ + self.response = None + self.page_token = page_token + self._func = api_call + self._page_descriptor = page_descriptor + self._request = request + self._kwargs = kwargs + self._done = False + + def __iter__(self): + return self + + def next(self): + """Retrieves the next page.""" + if self._done: + raise StopIteration + if self.page_token != INITIAL_PAGE: + setattr(self._request, + self._page_descriptor.request_page_token_field, + self.page_token) + response = self._func(self._request, **self._kwargs) + self.page_token = getattr( + response, self._page_descriptor.response_page_token_field) + if not self.page_token: + self._done = True + return getattr(response, self._page_descriptor.resource_field) diff --git a/google/gax/api_callable.py b/google/gax/api_callable.py index dc497f1..d3a5594 100644 --- a/google/gax/api_callable.py +++ b/google/gax/api_callable.py @@ -35,7 +35,7 @@ import time from . import (BackoffSettings, BundleOptions, bundling, CallSettings, config, - OPTION_INHERIT, RetryOptions) + OPTION_INHERIT, PageIterator, RetryOptions) from .errors import GaxError, RetryError _MILLIS_PER_SECOND = 1000 @@ -161,36 +161,46 @@ def inner(*args, **kwargs): return inner -def _page_streamable(a_func, - request_page_token_field, - response_page_token_field, - resource_field): +def _page_streamable(a_func, page_descriptor, page_token=None, + flatten_pages=True): """Creates a function that yields an iterable to performs page-streaming. Args: - a_func: an API call that is page streaming. - request_page_token_field: The field of the page token in the request. - response_page_token_field: The field of the next page token in the - response. - resource_field: The field to be streamed. + a_func (callable[[req], resp]): an API call that is page streaming. + page_descriptor (:class:`PageDescriptor`): indicates the structure + of page streaming to be performed. + page_token (str): Optional. If set and page streaming is over pages of + the response, indicates the page_token to be passed to the API call. + flatten_pages (bool): Optional. If set, the returned iterable is over + ``resource_field``; otherwise the returned iterable is over the pages + of the response, each of which is an iterable over ``resource_field``. Returns: - A function that returns an iterable over the specified field. + A function that returns an iterable. """ - def inner(*args, **kwargs): + def flattened(*args, **kwargs): """A generator that yields all the paged responses.""" request = args[0] while True: response = a_func(request, **kwargs) - for obj in getattr(response, resource_field): + for obj in getattr(response, page_descriptor.resource_field): yield obj - next_page_token = getattr(response, response_page_token_field) + next_page_token = getattr( + response, page_descriptor.response_page_token_field) if not next_page_token: break - setattr(request, request_page_token_field, next_page_token) + setattr(request, + page_descriptor.request_page_token_field, + next_page_token) - return inner + def unflattened(*args, **kwargs): + """A generator that yields individual pages.""" + request = args[0] + return PageIterator( + a_func, page_descriptor, page_token, request, **kwargs) + + return flattened if flatten_pages else unflattened def _construct_bundling(method_config, method_bundling_override, @@ -459,11 +469,15 @@ def create_api_call(func, settings): if settings.bundler and settings.bundle_descriptor: raise ValueError('The API call has incompatible settings: ' 'bundling and page streaming') + if settings.flatten_pages is None: + flatten_pages = True + else: + flatten_pages = settings.flatten_pages return _page_streamable( api_call, - settings.page_descriptor.request_page_token_field, - settings.page_descriptor.response_page_token_field, - settings.page_descriptor.resource_field) + settings.page_descriptor, + page_token=settings.page_token, + flatten_pages=flatten_pages) if settings.bundler and settings.bundle_descriptor: return _bundleable(api_call, settings.bundle_descriptor, diff --git a/test/test_api_callable.py b/test/test_api_callable.py index 3329a05..db8c4ea 100644 --- a/test/test_api_callable.py +++ b/test/test_api_callable.py @@ -36,7 +36,7 @@ from google.gax import ( api_callable, bundling, BackoffSettings, BundleDescriptor, BundleOptions, - CallSettings, PageDescriptor, RetryOptions) + CallSettings, INITIAL_PAGE, PageDescriptor, RetryOptions) from google.gax.errors import GaxError, RetryError @@ -267,6 +267,7 @@ def test_page_streaming(self): # integers, returning `page_size` integers with each call and using # the next integer to return as the page token, until `pages_to_stream` # pages have been returned. + # pylint:disable=too-many-locals page_size = 3 pages_to_stream = 5 @@ -284,16 +285,16 @@ def __init__(self, nums=(), next_page_token=0): 'page_token', 'next_page_token', 'nums') def grpc_return_value(request, *dummy_args, **dummy_kwargs): - if (request.page_token > 0 and - request.page_token < page_size * pages_to_stream): + start = int(request.page_token) if request.page_token else 0 + if start > 0 and start < page_size * pages_to_stream: return PageStreamingResponse( - nums=iter(range(request.page_token, - request.page_token + page_size)), - next_page_token=request.page_token + page_size) - elif request.page_token >= page_size * pages_to_stream: + nums=list(range(start, + start + page_size)), + next_page_token=start + page_size) + elif start >= page_size * pages_to_stream: return PageStreamingResponse() else: - return PageStreamingResponse(nums=iter(range(page_size)), + return PageStreamingResponse(nums=list(range(page_size)), next_page_token=page_size) with mock.patch('grpc.framework.crust.implementations.' @@ -301,10 +302,39 @@ def grpc_return_value(request, *dummy_args, **dummy_kwargs): mock_grpc.side_effect = grpc_return_value settings = CallSettings( page_descriptor=fake_grpc_func_descriptor, timeout=0) - my_callable = api_callable.create_api_call(mock_grpc, settings=settings) + my_callable = api_callable.create_api_call( + mock_grpc, settings=settings) self.assertEqual(list(my_callable(PageStreamingRequest())), list(range(page_size * pages_to_stream))) + unflattened_settings = CallSettings( + page_descriptor=fake_grpc_func_descriptor, timeout=0, + flatten_pages=False, page_token=INITIAL_PAGE) + unflattened_callable = api_callable.create_api_call( + mock_grpc, settings=unflattened_settings) + # Expect a list of pages_to_stream pages, each of size page_size, + # plus one empty page + expected = [list(range(page_size * n, page_size * (n + 1))) + for n in range(pages_to_stream)] + [()] + self.assertEqual(list(unflattened_callable(PageStreamingRequest())), + expected) + + pages_already_read = 2 + explicit_page_token_settings = CallSettings( + page_descriptor=fake_grpc_func_descriptor, timeout=0, + flatten_pages=False, + page_token=str(page_size * pages_already_read)) + explicit_page_token_callable = api_callable.create_api_call( + mock_grpc, settings=explicit_page_token_settings) + # Expect a list of pages_to_stream pages, each of size page_size, + # plus one empty page, minus the pages_already_read + expected = [list(range(page_size * n, page_size * (n + 1))) + for n in range(pages_already_read, pages_to_stream)] + expected += [()] + self.assertEqual( + list(explicit_page_token_callable(PageStreamingRequest())), + expected) + def test_bundling_page_streaming_error(self): settings = CallSettings( page_descriptor=object(), bundle_descriptor=object(), diff --git a/test/test_gax.py b/test/test_gax.py index 4ff84da..2f1e211 100644 --- a/test/test_gax.py +++ b/test/test_gax.py @@ -35,7 +35,8 @@ import unittest2 from google.gax import ( - BundleOptions, CallOptions, CallSettings, OPTION_INHERIT, RetryOptions) + BundleOptions, CallOptions, CallSettings, INITIAL_PAGE, OPTION_INHERIT, + RetryOptions) class TestBundleOptions(unittest2.TestCase): @@ -63,7 +64,7 @@ def test_call_options_simple(self): options = CallOptions(timeout=23) self.assertEqual(options.timeout, 23) self.assertEqual(options.retry, OPTION_INHERIT) - self.assertEqual(options.is_page_streaming, OPTION_INHERIT) + self.assertEqual(options.page_token, OPTION_INHERIT) def test_settings_merge_options1(self): retry = RetryOptions(None, None) @@ -85,11 +86,15 @@ def test_settings_merge_options2(self): def test_settings_merge_options_page_streaming(self): retry = RetryOptions(None, None) - options = CallOptions(timeout=46, is_page_streaming=False) - settings = CallSettings(timeout=9, retry=retry) + page_descriptor = object() + options = CallOptions(timeout=46, page_token=INITIAL_PAGE) + settings = CallSettings(timeout=9, retry=retry, + page_descriptor=page_descriptor) final = settings.merge(options) self.assertEqual(final.timeout, 46) - self.assertIsNone(final.page_descriptor) + self.assertEqual(final.page_descriptor, page_descriptor) + self.assertEqual(final.page_token, INITIAL_PAGE) + self.assertFalse(final.flatten_pages) self.assertEqual(final.retry, retry) def test_settings_merge_none(self):