Skip to content
This repository was archived by the owner on Mar 20, 2018. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 95 additions & 10 deletions google/gax/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@
import collections


__version__ = '0.10.1'
__version__ = '0.10.2'


INITIAL_PAGE = object()
"""A placeholder for the page token passed into an initial paginated request."""


OPTION_INHERIT = object()
Expand All @@ -48,7 +52,8 @@ class CallSettings(object):
"""Encapsulates the call settings for an API call."""
# pylint: disable=too-few-public-methods
def __init__(self, timeout=30, retry=None, page_descriptor=None,
bundler=None, bundle_descriptor=None):
flatten_pages=None, page_token=None, bundler=None,
bundle_descriptor=None):
"""Constructor.

Args:
Expand All @@ -59,6 +64,14 @@ def __init__(self, timeout=30, retry=None, page_descriptor=None,
page_descriptor (:class:`PageDescriptor`): indicates the structure
of page streaming to be performed. If set to None, page streaming
is disabled.
flatten_pages (bool): If there is no ``page_descriptor``, this
attrbute has no meaning. Otherwise, determines whether a page
streamed response should make the page structure transparent to
the user by flattening the repeated field in the returned
generator.
page_token (str): If there is no ``page_descriptor``, this attribute
has no meaning. Otherwise, determines the page token used in the
page streaming request.
bundler (:class:`gax.bundling.Executor`): orchestrates bundling. If
None, bundling is not performed.
bundle_descriptor (:class:`BundleDescriptor`): indicates the
Expand All @@ -67,12 +80,18 @@ def __init__(self, timeout=30, retry=None, page_descriptor=None,
self.timeout = timeout
self.retry = retry
self.page_descriptor = page_descriptor
self.flatten_pages = flatten_pages
self.page_token = page_token
self.bundler = bundler
self.bundle_descriptor = bundle_descriptor

def merge(self, options):
"""Returns a new CallSettings merged from this and a CallOptions object.

Note that passing if the CallOptions instance specifies a page_token,
the merged CallSettings will have ``flatten_pages`` disabled. This
permits toggling per-resource/per-page page streaming.

Args:
options (:class:`CallOptions`): an instance whose values override
those in this object. If None, ``merge`` returns a copy of this
Expand All @@ -97,10 +116,12 @@ def merge(self, options):
else:
retry = options.retry

if options.is_page_streaming:
page_descriptor = self.page_descriptor
if options.page_token == OPTION_INHERIT:
flatten_pages = self.flatten_pages
page_token = self.page_token
else:
page_descriptor = None
flatten_pages = False
page_token = options.page_token

if options.is_bundling:
bundler = self.bundler
Expand All @@ -109,7 +130,8 @@ def merge(self, options):

return CallSettings(
timeout=timeout, retry=retry,
page_descriptor=page_descriptor, bundler=bundler,
page_descriptor=self.page_descriptor, page_token=page_token,
flatten_pages=flatten_pages, bundler=bundler,
bundle_descriptor=self.bundle_descriptor)


Expand All @@ -124,7 +146,7 @@ class CallOptions(object):
"""
# pylint: disable=too-few-public-methods
def __init__(self, timeout=OPTION_INHERIT, retry=OPTION_INHERIT,
is_page_streaming=OPTION_INHERIT, is_bundling=False):
page_token=OPTION_INHERIT, is_bundling=False):
"""Constructor.

Example:
Expand All @@ -144,14 +166,17 @@ def __init__(self, timeout=OPTION_INHERIT, retry=OPTION_INHERIT,
timeout (int): The client-side timeout for API calls.
retry (:class:`RetryOptions`): determines whether and how to retry
on transient errors. When set to None, the call will not retry.
is_page_streaming (bool): If set and the call is configured for page
streaming, page streaming is performed.
page_token (str): If set and the call is configured for page
streaming, page streaming is performed per-page, starting with
this page_token. Use ``INITIAL_PAGE`` for the first request.
If unset and the call is configured for page streaming, page
streaming is performed per-resource.
is_bundling (bool): If set and the call is configured for bundling,
bundling is performed. Bundling is always disabled by default.
"""
self.timeout = timeout
self.retry = retry
self.is_page_streaming = is_page_streaming
self.page_token = page_token
self.is_bundling = is_bundling


Expand Down Expand Up @@ -340,3 +365,63 @@ def __new__(cls,
request_byte_threshold,
request_byte_limit,
delay_threshold)


class PageIterator(object):
"""An iterator over the pages of a page streaming API call.

Provides access to the individual pages of the call, as well as the page
token.

Attributes:
response: The full response message for the call most recently made, or
None if a call has not yet been made.
page_token: The page token to be passed in the request for the next call
to be made.
"""
# pylint: disable=too-few-public-methods
def __init__(self, api_call, page_descriptor, page_token, request, **kwargs):
"""Constructor.

Args:
api_call (callable[[req], resp]): an API call that is page
streaming.
page_descriptor (:class:`PageDescriptor`): indicates the structure
of page streaming to be performed.
page_token (str): The page token to be passed to API call request.
If no page token has yet been acquired, this field should be set
to ``INITIAL_PAGE``.
request (object): The request to be passed to the API call. The page
token field of the request is overwritten by the ``page_token``
passed to the constructor, unless ``page_token`` is
``INITIAL_PAGE``.
**kwargs: Arbitrary keyword arguments to be passed to the API call.

Returns:
A PageIterator object.
"""
self.response = None
self.page_token = page_token
self._func = api_call
self._page_descriptor = page_descriptor
self._request = request
self._kwargs = kwargs
self._done = False

def __iter__(self):
return self

def next(self):
"""Retrieves the next page."""
if self._done:
raise StopIteration
if self.page_token != INITIAL_PAGE:
setattr(self._request,
self._page_descriptor.request_page_token_field,
self.page_token)
response = self._func(self._request, **self._kwargs)
self.page_token = getattr(
response, self._page_descriptor.response_page_token_field)
if not self.page_token:
self._done = True
return getattr(response, self._page_descriptor.resource_field)
52 changes: 33 additions & 19 deletions google/gax/api_callable.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import time

from . import (BackoffSettings, BundleOptions, bundling, CallSettings, config,
OPTION_INHERIT, RetryOptions)
OPTION_INHERIT, PageIterator, RetryOptions)
from .errors import GaxError, RetryError

_MILLIS_PER_SECOND = 1000
Expand Down Expand Up @@ -161,36 +161,46 @@ def inner(*args, **kwargs):
return inner


def _page_streamable(a_func,
request_page_token_field,
response_page_token_field,
resource_field):
def _page_streamable(a_func, page_descriptor, page_token=None,
flatten_pages=True):
"""Creates a function that yields an iterable to performs page-streaming.

Args:
a_func: an API call that is page streaming.
request_page_token_field: The field of the page token in the request.
response_page_token_field: The field of the next page token in the
response.
resource_field: The field to be streamed.
a_func (callable[[req], resp]): an API call that is page streaming.
page_descriptor (:class:`PageDescriptor`): indicates the structure
of page streaming to be performed.
page_token (str): Optional. If set and page streaming is over pages of
the response, indicates the page_token to be passed to the API call.
flatten_pages (bool): Optional. If set, the returned iterable is over
``resource_field``; otherwise the returned iterable is over the pages
of the response, each of which is an iterable over ``resource_field``.

Returns:
A function that returns an iterable over the specified field.
A function that returns an iterable.
"""

def inner(*args, **kwargs):
def flattened(*args, **kwargs):
"""A generator that yields all the paged responses."""
request = args[0]
while True:
response = a_func(request, **kwargs)
for obj in getattr(response, resource_field):
for obj in getattr(response, page_descriptor.resource_field):
yield obj
next_page_token = getattr(response, response_page_token_field)
next_page_token = getattr(
response, page_descriptor.response_page_token_field)
if not next_page_token:
break
setattr(request, request_page_token_field, next_page_token)
setattr(request,
page_descriptor.request_page_token_field,
next_page_token)

return inner
def unflattened(*args, **kwargs):
"""A generator that yields individual pages."""
request = args[0]
return PageIterator(
a_func, page_descriptor, page_token, request, **kwargs)

return flattened if flatten_pages else unflattened


def _construct_bundling(method_config, method_bundling_override,
Expand Down Expand Up @@ -459,11 +469,15 @@ def create_api_call(func, settings):
if settings.bundler and settings.bundle_descriptor:
raise ValueError('The API call has incompatible settings: '
'bundling and page streaming')
if settings.flatten_pages is None:
flatten_pages = True
else:
flatten_pages = settings.flatten_pages
return _page_streamable(
api_call,
settings.page_descriptor.request_page_token_field,
settings.page_descriptor.response_page_token_field,
settings.page_descriptor.resource_field)
settings.page_descriptor,
page_token=settings.page_token,
flatten_pages=flatten_pages)

if settings.bundler and settings.bundle_descriptor:
return _bundleable(api_call, settings.bundle_descriptor,
Expand Down
48 changes: 39 additions & 9 deletions test/test_api_callable.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

from google.gax import (
api_callable, bundling, BackoffSettings, BundleDescriptor, BundleOptions,
CallSettings, PageDescriptor, RetryOptions)
CallSettings, INITIAL_PAGE, PageDescriptor, RetryOptions)
from google.gax.errors import GaxError, RetryError


Expand Down Expand Up @@ -267,6 +267,7 @@ def test_page_streaming(self):
# integers, returning `page_size` integers with each call and using
# the next integer to return as the page token, until `pages_to_stream`
# pages have been returned.
# pylint:disable=too-many-locals
page_size = 3
pages_to_stream = 5

Expand All @@ -284,27 +285,56 @@ def __init__(self, nums=(), next_page_token=0):
'page_token', 'next_page_token', 'nums')

def grpc_return_value(request, *dummy_args, **dummy_kwargs):
if (request.page_token > 0 and
request.page_token < page_size * pages_to_stream):
start = int(request.page_token) if request.page_token else 0
if start > 0 and start < page_size * pages_to_stream:
return PageStreamingResponse(
nums=iter(range(request.page_token,
request.page_token + page_size)),
next_page_token=request.page_token + page_size)
elif request.page_token >= page_size * pages_to_stream:
nums=list(range(start,
start + page_size)),
next_page_token=start + page_size)
elif start >= page_size * pages_to_stream:
return PageStreamingResponse()
else:
return PageStreamingResponse(nums=iter(range(page_size)),
return PageStreamingResponse(nums=list(range(page_size)),
next_page_token=page_size)

with mock.patch('grpc.framework.crust.implementations.'
'_UnaryUnaryMultiCallable') as mock_grpc:
mock_grpc.side_effect = grpc_return_value
settings = CallSettings(
page_descriptor=fake_grpc_func_descriptor, timeout=0)
my_callable = api_callable.create_api_call(mock_grpc, settings=settings)
my_callable = api_callable.create_api_call(
mock_grpc, settings=settings)
self.assertEqual(list(my_callable(PageStreamingRequest())),
list(range(page_size * pages_to_stream)))

unflattened_settings = CallSettings(
page_descriptor=fake_grpc_func_descriptor, timeout=0,
flatten_pages=False, page_token=INITIAL_PAGE)
unflattened_callable = api_callable.create_api_call(
mock_grpc, settings=unflattened_settings)
# Expect a list of pages_to_stream pages, each of size page_size,
# plus one empty page
expected = [list(range(page_size * n, page_size * (n + 1)))
for n in range(pages_to_stream)] + [()]
self.assertEqual(list(unflattened_callable(PageStreamingRequest())),
expected)

pages_already_read = 2
explicit_page_token_settings = CallSettings(
page_descriptor=fake_grpc_func_descriptor, timeout=0,
flatten_pages=False,
page_token=str(page_size * pages_already_read))
explicit_page_token_callable = api_callable.create_api_call(
mock_grpc, settings=explicit_page_token_settings)
# Expect a list of pages_to_stream pages, each of size page_size,
# plus one empty page, minus the pages_already_read
expected = [list(range(page_size * n, page_size * (n + 1)))
for n in range(pages_already_read, pages_to_stream)]
expected += [()]
self.assertEqual(
list(explicit_page_token_callable(PageStreamingRequest())),
expected)

def test_bundling_page_streaming_error(self):
settings = CallSettings(
page_descriptor=object(), bundle_descriptor=object(),
Expand Down
15 changes: 10 additions & 5 deletions test/test_gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
import unittest2

from google.gax import (
BundleOptions, CallOptions, CallSettings, OPTION_INHERIT, RetryOptions)
BundleOptions, CallOptions, CallSettings, INITIAL_PAGE, OPTION_INHERIT,
RetryOptions)


class TestBundleOptions(unittest2.TestCase):
Expand Down Expand Up @@ -63,7 +64,7 @@ def test_call_options_simple(self):
options = CallOptions(timeout=23)
self.assertEqual(options.timeout, 23)
self.assertEqual(options.retry, OPTION_INHERIT)
self.assertEqual(options.is_page_streaming, OPTION_INHERIT)
self.assertEqual(options.page_token, OPTION_INHERIT)

def test_settings_merge_options1(self):
retry = RetryOptions(None, None)
Expand All @@ -85,11 +86,15 @@ def test_settings_merge_options2(self):

def test_settings_merge_options_page_streaming(self):
retry = RetryOptions(None, None)
options = CallOptions(timeout=46, is_page_streaming=False)
settings = CallSettings(timeout=9, retry=retry)
page_descriptor = object()
options = CallOptions(timeout=46, page_token=INITIAL_PAGE)
settings = CallSettings(timeout=9, retry=retry,
page_descriptor=page_descriptor)
final = settings.merge(options)
self.assertEqual(final.timeout, 46)
self.assertIsNone(final.page_descriptor)
self.assertEqual(final.page_descriptor, page_descriptor)
self.assertEqual(final.page_token, INITIAL_PAGE)
self.assertFalse(final.flatten_pages)
self.assertEqual(final.retry, retry)

def test_settings_merge_none(self):
Expand Down