Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions system_tests/attempt_system_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,18 @@
import subprocess
import sys

from run_system_test import FailedSystemTestModule
from run_system_test import run_module_tests


MODULES = (
'bigquery',
MODULES = ( # ordered from most to least stable
'datastore',
'logging',
'monitoring',
'pubsub',
'storage',
'bigquery',
'pubsub',
'logging',
'translate',
'monitoring',
)
if sys.version_info[:2] == (2, 7):
MODULES += ('bigtable', 'bigtable-happybase')
Expand Down Expand Up @@ -111,9 +112,14 @@ def prepare_to_run():
def main():
"""Run all the system tests if necessary."""
prepare_to_run()
failed_modules = 0
for module in MODULES:
run_module_tests(module)
try:
run_module_tests(module)
except FailedSystemTestModule:
failed_modules += 1

sys.exit(failed_modules)

if __name__ == '__main__':
main()
51 changes: 27 additions & 24 deletions system_tests/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.

import operator
import time

import unittest

Expand All @@ -22,7 +21,9 @@
from gcloud import bigquery
from gcloud.exceptions import Forbidden

from retry import Retry
from retry import RetryErrors
from retry import RetryInstanceState
from retry import RetryResult
from system_test_utils import unique_resource_id


Expand Down Expand Up @@ -86,7 +87,13 @@ def test_patch_dataset(self):
def test_update_dataset(self):
dataset = Config.CLIENT.dataset(DATASET_NAME)
self.assertFalse(dataset.exists())
dataset.create()

# We need to wait to stay within the rate limits.
# The alternative outcome is a 403 Forbidden response from upstream.
# See: https://cloud.google.com/bigquery/quota-policy
retry = RetryErrors(Forbidden, max_tries=2, delay=30)
retry(dataset.create)()

self.to_delete.append(dataset)
self.assertTrue(dataset.exists())
after = [grant for grant in dataset.access_grants
Expand All @@ -96,11 +103,8 @@ def test_update_dataset(self):
# We need to wait to stay within the rate limits.
# The alternative outcome is a 403 Forbidden response from upstream.
# See: https://cloud.google.com/bigquery/quota-policy
@Retry(Forbidden, tries=2, delay=30)
def update_dataset():
dataset.update()
retry(dataset.update)()

update_dataset()
self.assertEqual(len(dataset.access_grants), len(after))
for found, expected in zip(dataset.access_grants, after):
self.assertEqual(found.role, expected.role)
Expand Down Expand Up @@ -202,11 +206,9 @@ def test_update_table(self):
# We need to wait to stay within the rate limits.
# The alternative outcome is a 403 Forbidden response from upstream.
# See: https://cloud.google.com/bigquery/quota-policy
@Retry(Forbidden, tries=2, delay=30)
def create_dataset():
dataset.create()
retry = RetryErrors(Forbidden, max_tries=2, delay=30)
retry(dataset.create)()

create_dataset()
self.to_delete.append(dataset)
TABLE_NAME = 'test_table'
full_name = bigquery.SchemaField('full_name', 'STRING',
Expand Down Expand Up @@ -261,15 +263,15 @@ def test_load_table_then_dump_table(self):
self.assertEqual(len(errors), 0)

rows = ()
counter = 9

def _has_rows(result):
return len(result[0]) > 0

# Allow for 90 seconds of "warm up" before rows visible. See:
# https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataavailability

while len(rows) == 0 and counter > 0:
counter -= 1
rows, _, _ = table.fetch_data()
if len(rows) == 0:
time.sleep(10)
# 8 tries -> 1 + 2 + 4 + 8 + 16 + 32 + 64 = 127 seconds
retry = RetryResult(_has_rows, max_tries=8)
rows, _, _ = retry(table.fetch_data)()

by_age = operator.itemgetter(1)
self.assertEqual(sorted(rows, key=by_age),
Expand Down Expand Up @@ -329,13 +331,14 @@ def test_load_table_from_storage_then_dump_table(self):

job.begin()

counter = 9 # Allow for 90 seconds of lag.
def _job_done(instance):
return instance.state in ('DONE', 'done')

while job.state not in ('DONE', 'done') and counter > 0:
counter -= 1
job.reload()
if job.state not in ('DONE', 'done'):
time.sleep(10)
# Allow for 90 seconds of "warm up" before rows visible. See:
# https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataavailability
# 8 tries -> 1 + 2 + 4 + 8 + 16 + 32 + 64 = 127 seconds
retry = RetryInstanceState(_job_done, max_tries=8)
retry(job.reload)()

self.assertTrue(job.state in ('DONE', 'done'))

Expand Down
36 changes: 13 additions & 23 deletions system_tests/bigtable.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import datetime
import operator
import time

import unittest

Expand All @@ -32,6 +31,8 @@
from gcloud.bigtable.row_data import PartialRowData
from gcloud.environment_vars import TESTS_PROJECT

from retry import RetryErrors
from retry import RetryResult
from system_test_utils import unique_resource_id


Expand Down Expand Up @@ -75,39 +76,28 @@ def _operation_wait(operation, max_attempts=5):
:rtype: bool
:returns: Boolean indicating if the operation finished.
"""
total_sleep = 0
while not operation.finished():
if total_sleep > max_attempts:
return False
time.sleep(1)
total_sleep += 1

return True
def _operation_finished(result):
return result

retry = RetryResult(_operation_finished, max_tries=max_attempts)
return retry(operation.finished)()

def _retry_backoff(meth, *args, **kw):

def _retry_on_unavailable(exc):
"""Retry only AbortionErrors whose status code is 'UNAVAILABLE'."""
from grpc.beta.interfaces import StatusCode
from grpc.framework.interfaces.face.face import AbortionError
backoff_intervals = [1, 2, 4, 8]
while True:
try:
return meth(*args, **kw)
except AbortionError as error:
if error.code != StatusCode.UNAVAILABLE:
raise
if backoff_intervals:
time.sleep(backoff_intervals.pop(0))
else:
raise
return exc.code == StatusCode.UNAVAILABLE


def setUpModule():
from grpc.framework.interfaces.face.face import AbortionError
_helpers.PROJECT = TESTS_PROJECT
Config.CLIENT = Client(admin=True)
Config.INSTANCE = Config.CLIENT.instance(INSTANCE_ID, LOCATION_ID)
Config.CLIENT.start()
instances, failed_locations = _retry_backoff(
Config.CLIENT.list_instances)
retry = RetryErrors(AbortionError, error_predicate=_retry_on_unavailable)
instances, failed_locations = retry(Config.CLIENT.list_instances)()

if len(failed_locations) != 0:
raise ValueError('List instances failed in module set up.')
Expand Down
69 changes: 29 additions & 40 deletions system_tests/logging_.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import time

import unittest

from gcloud import _helpers
from gcloud.environment_vars import TESTS_PROJECT
from gcloud import logging

from retry import RetryErrors
from retry import RetryResult
from system_test_utils import unique_resource_id


Expand All @@ -33,27 +33,10 @@
TOPIC_NAME = 'gcloud-python-system-testing%s' % (_RESOURCE_ID,)


def _retry_backoff(result_predicate, meth, *args, **kw):
def _retry_on_unavailable(exc):
"""Retry only AbortionErrors whose status code is 'UNAVAILABLE'."""
from grpc.beta.interfaces import StatusCode
from grpc.framework.interfaces.face.face import AbortionError
backoff_intervals = [1, 2, 4, 8]
while True:
try:
result = meth(*args, **kw)
except AbortionError as error:
if error.code != StatusCode.UNAVAILABLE:
raise
if backoff_intervals:
time.sleep(backoff_intervals.pop(0))
continue
else:
raise
if result_predicate(result):
return result
if backoff_intervals:
time.sleep(backoff_intervals.pop(0))
else:
raise RuntimeError('%s: %s %s' % (meth, args, kw))
return exc.code == StatusCode.UNAVAILABLE


def _has_entries(result):
Expand Down Expand Up @@ -81,28 +64,26 @@ def setUp(self):

def tearDown(self):
from gcloud.exceptions import NotFound
retry = RetryErrors(NotFound)
for doomed in self.to_delete:
backoff_intervals = [1, 2, 4, 8]
while True:
try:
doomed.delete()
break
except NotFound:
if backoff_intervals:
time.sleep(backoff_intervals.pop(0))
else:
raise
retry(doomed.delete)()

@staticmethod
def _logger_name():
return 'system-tests-logger' + unique_resource_id('-')

def _list_entries(self, logger):
from grpc.framework.interfaces.face.face import AbortionError
inner = RetryResult(_has_entries)(logger.list_entries)
outer = RetryErrors(AbortionError, _retry_on_unavailable)(inner)
return outer()

This comment was marked as spam.

This comment was marked as spam.


def test_log_text(self):
TEXT_PAYLOAD = 'System test: test_log_text'
logger = Config.CLIENT.logger(self._logger_name())
self.to_delete.append(logger)
logger.log_text(TEXT_PAYLOAD)
entries, _ = _retry_backoff(_has_entries, logger.list_entries)
entries, _ = self._list_entries(logger)
self.assertEqual(len(entries), 1)
self.assertEqual(entries[0].payload, TEXT_PAYLOAD)

Expand All @@ -123,7 +104,7 @@ def test_log_text_w_metadata(self):

logger.log_text(TEXT_PAYLOAD, insert_id=INSERT_ID, severity=SEVERITY,
http_request=REQUEST)
entries, _ = _retry_backoff(_has_entries, logger.list_entries)
entries, _ = self._list_entries(logger)

self.assertEqual(len(entries), 1)

Expand All @@ -146,7 +127,7 @@ def test_log_struct(self):
self.to_delete.append(logger)

logger.log_struct(JSON_PAYLOAD)
entries, _ = _retry_backoff(_has_entries, logger.list_entries)
entries, _ = self._list_entries(logger)

self.assertEqual(len(entries), 1)
self.assertEqual(entries[0].payload, JSON_PAYLOAD)
Expand All @@ -171,7 +152,7 @@ def test_log_struct_w_metadata(self):

logger.log_struct(JSON_PAYLOAD, insert_id=INSERT_ID, severity=SEVERITY,
http_request=REQUEST)
entries, _ = _retry_backoff(_has_entries, logger.list_entries)
entries, _ = self._list_entries(logger)

self.assertEqual(len(entries), 1)
self.assertEqual(entries[0].payload, JSON_PAYLOAD)
Expand Down Expand Up @@ -205,10 +186,12 @@ def test_list_metrics(self):
set([DEFAULT_METRIC_NAME]))

def test_reload_metric(self):
from gcloud.exceptions import Conflict
retry = RetryErrors(Conflict)
metric = Config.CLIENT.metric(
DEFAULT_METRIC_NAME, DEFAULT_FILTER, DEFAULT_DESCRIPTION)
self.assertFalse(metric.exists())
metric.create()
retry(metric.create)()

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

self.to_delete.append(metric)
metric.filter_ = 'logName:other'
metric.description = 'local changes'
Expand All @@ -217,12 +200,14 @@ def test_reload_metric(self):
self.assertEqual(metric.description, DEFAULT_DESCRIPTION)

def test_update_metric(self):
from gcloud.exceptions import Conflict
retry = RetryErrors(Conflict)
NEW_FILTER = 'logName:other'
NEW_DESCRIPTION = 'updated'
metric = Config.CLIENT.metric(
DEFAULT_METRIC_NAME, DEFAULT_FILTER, DEFAULT_DESCRIPTION)
self.assertFalse(metric.exists())
metric.create()
retry(metric.create)()

This comment was marked as spam.

self.to_delete.append(metric)
metric.filter_ = NEW_FILTER
metric.description = NEW_DESCRIPTION
Expand Down Expand Up @@ -324,10 +309,12 @@ def test_list_sinks(self):
set([DEFAULT_SINK_NAME]))

def test_reload_sink(self):
from gcloud.exceptions import Conflict
retry = RetryErrors(Conflict)
uri = self._init_bigquery_dataset()
sink = Config.CLIENT.sink(DEFAULT_SINK_NAME, DEFAULT_FILTER, uri)
self.assertFalse(sink.exists())
sink.create()
retry(sink.create)()

This comment was marked as spam.

self.to_delete.append(sink)
sink.filter_ = 'BOGUS FILTER'
sink.destination = 'BOGUS DESTINATION'
Expand All @@ -336,13 +323,15 @@ def test_reload_sink(self):
self.assertEqual(sink.destination, uri)

def test_update_sink(self):
from gcloud.exceptions import Conflict
retry = RetryErrors(Conflict)
bucket_uri = self._init_storage_bucket()
dataset_uri = self._init_bigquery_dataset()
UPDATED_FILTER = 'logName:syslog'
sink = Config.CLIENT.sink(
DEFAULT_SINK_NAME, DEFAULT_FILTER, bucket_uri)
self.assertFalse(sink.exists())
sink.create()
retry(sink.create)()
self.to_delete.append(sink)
sink.filter_ = UPDATED_FILTER
sink.destination = dataset_uri
Expand Down
Loading