From ba4f07e001fbd93bc63dccf49c23eb48a43ea7a5 Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Tue, 3 Oct 2017 15:16:01 -0700 Subject: [PATCH 01/17] Retry for rows that return retryable error codes. --- bigtable/google/cloud/bigtable/table.py | 93 +++++++++++++++++++++---- 1 file changed, 81 insertions(+), 12 deletions(-) diff --git a/bigtable/google/cloud/bigtable/table.py b/bigtable/google/cloud/bigtable/table.py index d1711f5be704..0d7890b2ce10 100644 --- a/bigtable/google/cloud/bigtable/table.py +++ b/bigtable/google/cloud/bigtable/table.py @@ -15,7 +15,9 @@ """User-friendly container for Google Cloud Bigtable Table.""" +import random import six +import time from google.cloud._helpers import _to_bytes from google.cloud.bigtable._generated import ( @@ -30,12 +32,15 @@ from google.cloud.bigtable.row import ConditionalRow from google.cloud.bigtable.row import DirectRow from google.cloud.bigtable.row_data import PartialRowsData +from grpc import StatusCode # Maximum number of mutations in bulk (MutateRowsRequest message): # https://cloud.google.com/bigtable/docs/reference/data/rpc/google.bigtable.v2#google.bigtable.v2.MutateRowRequest _MAX_BULK_MUTATIONS = 100000 +_MILLIS_PER_SECOND = 1000 + class TableMismatchError(ValueError): """Row from another table.""" @@ -312,18 +317,24 @@ def mutate_rows(self, rows): corresponding to success or failure of each row mutation sent. These will be in the same order as the `rows`. """ - mutate_rows_request = _mutate_rows_request(self.name, rows) - client = self._instance._client - responses = client._data_stub.MutateRows(mutate_rows_request) - - responses_statuses = [ - None for _ in six.moves.xrange(len(mutate_rows_request.entries))] - for response in responses: - for entry in response.entries: - responses_statuses[entry.index] = entry.status - if entry.status.code == 0: - rows[entry.index].clear() - return responses_statuses + delay_millis = 1000 + delay_mult = 1.5 + max_delay_millis = 15 * 1000 + total_timeout_millis = 5 * 60 * 1000 + + now = time.time() + deadline = now + total_timeout_millis / _MILLIS_PER_SECOND + + retryable_mutate_rows = _RetryableMutateRowsWorker(self._instance._client, self.name, rows) + while now < deadline: + try: + return retryable_mutate_rows() + except _MutateRowsRetryableError: + to_sleep = random.uniform(0, delay_millis * 2) + time.sleep(to_sleep / _MILLIS_PER_SECOND) + delay_millis = min(delay_millis * delay_mult, max_delay_millis) + now = time.time() + return retryable_mutate_rows.responses_statuses def sample_row_keys(self): """Read a sample of row keys in the table. @@ -363,6 +374,64 @@ def sample_row_keys(self): return response_iterator +class _MutateRowsRetryableError(Exception): + """A retryable error in Mutate Rows response.""" + pass + + +class _RetryableMutateRowsWorker(object): + RETRY_CODES = ( + StatusCode.DEADLINE_EXCEEDED.value[0], + StatusCode.ABORTED.value[0], + StatusCode.INTERNAL.value[0], + StatusCode.UNAVAILABLE.value[0], + ) + + def __init__(self, client, table_name, rows): + self.client = client + self.table_name = table_name + self.rows = rows + self.responses_statuses = [None for _ in six.moves.xrange(len(self.rows))] + + def __call__(self): + return self._do_mutate_retryable_rows() + + def _is_retryable(self, status): + return status is None or status.code in _RetryableMutateRowsWorker.RETRY_CODES + + def _next_retryable_row_index(self, begin_index): + i = begin_index + while i < len(self.responses_statuses): + status = self.responses_statuses[i] + if self._is_retryable(status): + return i + i =+ 1 + return i + + def _do_mutate_retryable_rows(self): + curr_rows = [] + for i, status in enumerate(self.responses_statuses): + if self._is_retryable(status): + curr_rows.append(self.rows[i]) + mutate_rows_request = _mutate_rows_request(self.table_name, curr_rows) + responses = self.client._data_stub.MutateRows(mutate_rows_request) + + has_retryable_responses = False + for response in responses: + index = 0 + for entry in response.entries: + index = self._next_retryable_row_index(index) + self.responses_statuses[index] = entry.status + has_retryable_responses = self._is_retryable(entry.status) + if entry.status.code == 0: + self.rows[index].clear() + index += 1 + + if has_retryable_responses: + raise _MutateRowsRetryableError() + return self.responses_statuses + + def _create_row_request(table_name, row_key=None, start_key=None, end_key=None, filter_=None, limit=None, end_inclusive=False): """Creates a request to read rows in a table. From 5fdb37a3e9c7f7b3122489340eb8dbb5d1552f6f Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Tue, 3 Oct 2017 15:44:42 -0700 Subject: [PATCH 02/17] Stop processing once all rows are successful or non-retryable. --- bigtable/google/cloud/bigtable/table.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/bigtable/google/cloud/bigtable/table.py b/bigtable/google/cloud/bigtable/table.py index 0d7890b2ce10..95d0876dd807 100644 --- a/bigtable/google/cloud/bigtable/table.py +++ b/bigtable/google/cloud/bigtable/table.py @@ -409,11 +409,16 @@ def _next_retryable_row_index(self, begin_index): return i def _do_mutate_retryable_rows(self): - curr_rows = [] + retryable_rows = [] for i, status in enumerate(self.responses_statuses): if self._is_retryable(status): - curr_rows.append(self.rows[i]) - mutate_rows_request = _mutate_rows_request(self.table_name, curr_rows) + retryable_rows.append(self.rows[i]) + + if not retryable_rows: + # All mutations are either successful or non-retryable now. + return self.responses_statuses + + mutate_rows_request = _mutate_rows_request(self.table_name, retryable_rows) responses = self.client._data_stub.MutateRows(mutate_rows_request) has_retryable_responses = False From 2671950d358cc3713ed61e61fd075f7cb8e008d2 Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Tue, 3 Oct 2017 17:01:09 -0700 Subject: [PATCH 03/17] Remove retry for INTERNAL error. Fix the case where has_retryable_responses can get overridden by non-retryableresponse. --- bigtable/google/cloud/bigtable/table.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/bigtable/google/cloud/bigtable/table.py b/bigtable/google/cloud/bigtable/table.py index 95d0876dd807..09e9036da075 100644 --- a/bigtable/google/cloud/bigtable/table.py +++ b/bigtable/google/cloud/bigtable/table.py @@ -383,7 +383,6 @@ class _RetryableMutateRowsWorker(object): RETRY_CODES = ( StatusCode.DEADLINE_EXCEEDED.value[0], StatusCode.ABORTED.value[0], - StatusCode.INTERNAL.value[0], StatusCode.UNAVAILABLE.value[0], ) @@ -421,18 +420,19 @@ def _do_mutate_retryable_rows(self): mutate_rows_request = _mutate_rows_request(self.table_name, retryable_rows) responses = self.client._data_stub.MutateRows(mutate_rows_request) - has_retryable_responses = False + num_retryable_responses = 0 for response in responses: index = 0 for entry in response.entries: index = self._next_retryable_row_index(index) self.responses_statuses[index] = entry.status - has_retryable_responses = self._is_retryable(entry.status) + if self._is_retryable(entry.status): + num_retryable_responses += 1 if entry.status.code == 0: self.rows[index].clear() index += 1 - if has_retryable_responses: + if num_retryable_responses: raise _MutateRowsRetryableError() return self.responses_statuses From 372a1934117e9645990b23ccc08e8fdc3d89985b Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Wed, 4 Oct 2017 17:43:33 -0700 Subject: [PATCH 04/17] Fix lint errors. --- bigtable/google/cloud/bigtable/table.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/bigtable/google/cloud/bigtable/table.py b/bigtable/google/cloud/bigtable/table.py index 09e9036da075..b3d65276f695 100644 --- a/bigtable/google/cloud/bigtable/table.py +++ b/bigtable/google/cloud/bigtable/table.py @@ -16,8 +16,8 @@ import random -import six import time +import six from google.cloud._helpers import _to_bytes from google.cloud.bigtable._generated import ( @@ -325,7 +325,8 @@ def mutate_rows(self, rows): now = time.time() deadline = now + total_timeout_millis / _MILLIS_PER_SECOND - retryable_mutate_rows = _RetryableMutateRowsWorker(self._instance._client, self.name, rows) + retryable_mutate_rows = _RetryableMutateRowsWorker( + self._instance._client, self.name, rows) while now < deadline: try: return retryable_mutate_rows() @@ -380,6 +381,7 @@ class _MutateRowsRetryableError(Exception): class _RetryableMutateRowsWorker(object): + # pylint: disable=unsubscriptable-object RETRY_CODES = ( StatusCode.DEADLINE_EXCEEDED.value[0], StatusCode.ABORTED.value[0], @@ -390,13 +392,15 @@ def __init__(self, client, table_name, rows): self.client = client self.table_name = table_name self.rows = rows - self.responses_statuses = [None for _ in six.moves.xrange(len(self.rows))] + self.responses_statuses = [ + None for _ in six.moves.xrange(len(self.rows))] def __call__(self): return self._do_mutate_retryable_rows() - def _is_retryable(self, status): - return status is None or status.code in _RetryableMutateRowsWorker.RETRY_CODES + def _is_retryable(self, status): # pylint: disable=no-self-use + return (status is None or + status.code in _RetryableMutateRowsWorker.RETRY_CODES) def _next_retryable_row_index(self, begin_index): i = begin_index @@ -404,7 +408,7 @@ def _next_retryable_row_index(self, begin_index): status = self.responses_statuses[i] if self._is_retryable(status): return i - i =+ 1 + i += 1 return i def _do_mutate_retryable_rows(self): @@ -417,8 +421,10 @@ def _do_mutate_retryable_rows(self): # All mutations are either successful or non-retryable now. return self.responses_statuses - mutate_rows_request = _mutate_rows_request(self.table_name, retryable_rows) - responses = self.client._data_stub.MutateRows(mutate_rows_request) + mutate_rows_request = _mutate_rows_request( + self.table_name, retryable_rows) + responses = self.client._data_stub.MutateRows( + mutate_rows_request) num_retryable_responses = 0 for response in responses: From 2b27e3c00dd6661dd70b9be55c0671b78241aa2c Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Thu, 5 Oct 2017 17:51:38 -0700 Subject: [PATCH 05/17] Add tests for google.cloud.bigtable.table.Table.mutate_rows.\nCases for 1) no retry 2) one retry 3) timeout. --- bigtable/tests/unit/test_table.py | 65 +++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/bigtable/tests/unit/test_table.py b/bigtable/tests/unit/test_table.py index 3890d097f572..f495074bc60d 100644 --- a/bigtable/tests/unit/test_table.py +++ b/bigtable/tests/unit/test_table.py @@ -488,6 +488,66 @@ def test_mutate_rows(self): self.assertEqual(result, expected_result) + def test_retryable_mutate_rows_no_retry(self): + from google.rpc.status_pb2 import Status + + instance = mock.MagicMock() + table = self._make_one(self.TABLE_ID, instance) + + response = [Status(code=0), Status(code=1)] + + mock_worker = mock.Mock(side_effect=[response]) + with mock.patch( + 'google.cloud.bigtable.table._RetryableMutateRowsWorker', + new=mock.MagicMock(return_value=mock_worker)): + statuses = table.mutate_rows([mock.MagicMock(), mock.MagicMock()]) + result = [status.code for status in statuses] + expected_result = [0, 1] + + self.assertEqual(result, expected_result) + + def test_retryable_mutate_rows_retry(self): + from google.cloud.bigtable.table import _MutateRowsRetryableError + from google.rpc.status_pb2 import Status + + instance = mock.MagicMock() + table = self._make_one(self.TABLE_ID, instance) + + response = [Status(code=0), Status(code=1)] + + mock_worker = mock.Mock(side_effect=[_MutateRowsRetryableError, response]) + with mock.patch( + 'google.cloud.bigtable.table._RetryableMutateRowsWorker', + new=mock.MagicMock(return_value=mock_worker)): + statuses = table.mutate_rows([mock.MagicMock(), mock.MagicMock()]) + result = [status.code for status in statuses] + expected_result = [0, 1] + + self.assertEqual(result, expected_result) + + def test_retryable_mutate_rows_retry_timeout(self): + from google.cloud.bigtable.table import _MutateRowsRetryableError + from google.rpc.status_pb2 import Status + + instance = mock.MagicMock() + table = self._make_one(self.TABLE_ID, instance) + + response = [Status(code=0), Status(code=1)] + + mock_worker = mock.Mock( + side_effect=[_MutateRowsRetryableError, _MutateRowsRetryableError], + responses_statuses=response) + # total_timeout_millis = 5 * 60 * 1000 + mock_time = mock.Mock(side_effect=[0, 2000, 5 * 60 * 1000]) + with mock.patch( + 'google.cloud.bigtable.table._RetryableMutateRowsWorker', + new=mock.MagicMock(return_value=mock_worker)), mock.patch( + 'time.time', new=mock_time): + statuses = table.mutate_rows([mock.MagicMock(), mock.MagicMock()]) + result = [status.code for status in statuses] + expected_result = [0, 1] + + self.assertEqual(result, expected_result) def test_read_rows(self): from google.cloud._testing import _Monkey @@ -744,6 +804,11 @@ def _ColumnFamilyPB(*args, **kw): return table_v2_pb2.ColumnFamily(*args, **kw) +def _MockRetryableMutateRowsWorker(): + from google.cloud.bigtable.table import _RetryableMutateRowsWorker + return mock.create_autospec(_RetryableMutateRowsWorker) + + class _Client(object): data_stub = None From 2b7c410bd6c427ac48f63aec94c7b9d182d79b3a Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Fri, 6 Oct 2017 15:46:04 -0700 Subject: [PATCH 06/17] Add test cases for _RetryableMutateRowsWorker. --- bigtable/google/cloud/bigtable/table.py | 2 +- bigtable/tests/unit/test_table.py | 265 +++++++++++++++++++++++- 2 files changed, 259 insertions(+), 8 deletions(-) diff --git a/bigtable/google/cloud/bigtable/table.py b/bigtable/google/cloud/bigtable/table.py index b3d65276f695..961c9ca2693c 100644 --- a/bigtable/google/cloud/bigtable/table.py +++ b/bigtable/google/cloud/bigtable/table.py @@ -409,7 +409,7 @@ def _next_retryable_row_index(self, begin_index): if self._is_retryable(status): return i i += 1 - return i + return -1 def _do_mutate_retryable_rows(self): retryable_rows = [] diff --git a/bigtable/tests/unit/test_table.py b/bigtable/tests/unit/test_table.py index f495074bc60d..6f9a9849b43c 100644 --- a/bigtable/tests/unit/test_table.py +++ b/bigtable/tests/unit/test_table.py @@ -532,7 +532,7 @@ def test_retryable_mutate_rows_retry_timeout(self): instance = mock.MagicMock() table = self._make_one(self.TABLE_ID, instance) - response = [Status(code=0), Status(code=1)] + response = [Status(code=0), Status(code=4)] mock_worker = mock.Mock( side_effect=[_MutateRowsRetryableError, _MutateRowsRetryableError], @@ -545,7 +545,7 @@ def test_retryable_mutate_rows_retry_timeout(self): 'time.time', new=mock_time): statuses = table.mutate_rows([mock.MagicMock(), mock.MagicMock()]) result = [status.code for status in statuses] - expected_result = [0, 1] + expected_result = [0, 4] self.assertEqual(result, expected_result) @@ -630,6 +630,262 @@ def test_sample_row_keys(self): )]) +class Test__RetryableMutateRowsWorker(unittest.TestCase): + PROJECT_ID = 'project-id' + INSTANCE_ID = 'instance-id' + INSTANCE_NAME = ('projects/' + PROJECT_ID + '/instances/' + INSTANCE_ID) + TABLE_ID = 'table-id' + + @staticmethod + def _get_target_class_for_worker(): + from google.cloud.bigtable.table import _RetryableMutateRowsWorker + + return _RetryableMutateRowsWorker + + def _make_worker(self, *args, **kwargs): + return self._get_target_class_for_worker()(*args, **kwargs) + + @staticmethod + def _get_target_class_for_table(): + from google.cloud.bigtable.table import Table + + return Table + + def _make_table(self, *args, **kwargs): + return self._get_target_class_for_table()(*args, **kwargs) + + def _make_responses_statuses(self, codes): + from google.rpc.status_pb2 import Status + + response = [Status(code=code) for code in codes] + return response + + def test_next_retryable_row_index_empty_rows(self): + worker = self._make_worker(mock.MagicMock(), mock.MagicMock(), []) + worker.responses_statuses = self._make_responses_statuses([]) + self.assertEqual(worker._next_retryable_row_index(0), -1) + self.assertEqual(worker._next_retryable_row_index(1), -1) + + def test_next_retryable_row_index(self): + worker = self._make_worker(mock.MagicMock(), mock.MagicMock(), []) + worker.responses_statuses = self._make_responses_statuses( + [4, 10, 14, 0, 4, 1]) + self.assertEqual(worker._next_retryable_row_index(0), 0) + self.assertEqual(worker._next_retryable_row_index(1), 1) + self.assertEqual(worker._next_retryable_row_index(2), 2) + self.assertEqual(worker._next_retryable_row_index(3), 4) + self.assertEqual(worker._next_retryable_row_index(4), 4) + self.assertEqual(worker._next_retryable_row_index(5), -1) + + def test_do_mutate_retryable_rows_empty_rows(self): + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + table = self._make_table(self.TABLE_ID, instance) + + worker = self._make_worker(table._instance._client, table.name, []) + statuses = worker._do_mutate_retryable_rows() + + self.assertEqual(len(statuses), 0) + + def test_do_mutate_retryable_rows(self): + from google.cloud.bigtable._generated.bigtable_pb2 import ( + MutateRowsResponse) + from google.cloud.bigtable.row import DirectRow + from google.rpc.status_pb2 import Status + from tests.unit._testing import _FakeStub + + # Setup: + # - Mutate 2 rows. + # Action: + # - Initial attempt will mutate all 2 rows. + # Expectation: + # - Expect [success, non-retryable] + + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + table = self._make_table(self.TABLE_ID, instance) + + row_1 = DirectRow(row_key=b'row_key', table=table) + row_1.set_cell('cf', b'col', b'value1') + row_2 = DirectRow(row_key=b'row_key_2', table=table) + row_2.set_cell('cf', b'col', b'value2') + + response = MutateRowsResponse( + entries=[ + MutateRowsResponse.Entry( + index=0, + status=Status(code=0), + ), + MutateRowsResponse.Entry( + index=1, + status=Status(code=1), + ), + ], + ) + + # Patch the stub used by the API method. + client._data_stub = _FakeStub([response]) + + worker = self._make_worker(table._instance._client, table.name, [row_1, row_2]) + + statuses = worker._do_mutate_retryable_rows() + result = [status.code for status in statuses] + expected_result = [0, 1] + + self.assertEqual(result, expected_result) + + def test_do_mutate_retryable_rows_retry(self): + from google.cloud.bigtable._generated.bigtable_pb2 import ( + MutateRowsResponse) + from google.cloud.bigtable.table import _MutateRowsRetryableError + from google.cloud.bigtable.row import DirectRow + from google.rpc.status_pb2 import Status + from tests.unit._testing import _FakeStub + + # Setup: + # - Mutate 3 rows. + # Action: + # - Initial attempt will mutate all 3 rows. + # Expectation: + # - Second row returns retryable error code, so expect a raise. + # - State of responses_statuses should be + # [success, retryable, non-retryable] + + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + table = self._make_table(self.TABLE_ID, instance) + + row_1 = DirectRow(row_key=b'row_key', table=table) + row_1.set_cell('cf', b'col', b'value1') + row_2 = DirectRow(row_key=b'row_key_2', table=table) + row_2.set_cell('cf', b'col', b'value2') + row_3 = DirectRow(row_key=b'row_key_3', table=table) + row_3.set_cell('cf', b'col', b'value3') + + response = MutateRowsResponse( + entries=[ + MutateRowsResponse.Entry( + index=0, + status=Status(code=0), + ), + MutateRowsResponse.Entry( + index=1, + status=Status(code=4), + ), + MutateRowsResponse.Entry( + index=1, + status=Status(code=1), + ), + ], + ) + + # Patch the stub used by the API method. + client._data_stub = _FakeStub([response]) + + worker = self._make_worker(table._instance._client, + table.name, [row_1, row_2, row_3]) + + with self.assertRaises(_MutateRowsRetryableError): + worker._do_mutate_retryable_rows() + statuses = worker.responses_statuses + result = [status.code for status in statuses] + expected_result = [0, 4, 1] + + self.assertEqual(result, expected_result) + + def test_do_mutate_retryable_rows_second_try(self): + from google.cloud.bigtable._generated.bigtable_pb2 import ( + MutateRowsResponse) + from google.cloud.bigtable.row import DirectRow + from google.rpc.status_pb2 import Status + from tests.unit._testing import _FakeStub + + # Setup: + # - Mutate 4 rows. + # - First try results: + # [success, retryable, non-retryable, retryable] + # Action: + # - Second try should re-attempt the 'retryable' rows. + # Expectation: + # - After second try: + # [success, non-retryable, non-retryable, success] + + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + table = self._make_table(self.TABLE_ID, instance) + + row_1 = DirectRow(row_key=b'row_key', table=table) + row_1.set_cell('cf', b'col', b'value1') + row_2 = DirectRow(row_key=b'row_key_2', table=table) + row_2.set_cell('cf', b'col', b'value2') + row_3 = DirectRow(row_key=b'row_key_3', table=table) + row_3.set_cell('cf', b'col', b'value3') + row_4 = DirectRow(row_key=b'row_key_4', table=table) + row_4.set_cell('cf', b'col', b'value4') + + response = MutateRowsResponse( + entries=[ + MutateRowsResponse.Entry( + index=0, + status=Status(code=1), + ), + MutateRowsResponse.Entry( + index=0, + status=Status(code=0), + ), + ], + ) + + # Patch the stub used by the API method. + client._data_stub = _FakeStub([response]) + + worker = self._make_worker(table._instance._client, + table.name, [row_1, row_2, row_3, row_4]) + worker.responses_statuses = self._make_responses_statuses( + [0, 4, 1, 10]) + + statuses = worker._do_mutate_retryable_rows() + result = [status.code for status in statuses] + expected_result = [0, 1, 1, 0] + + self.assertEqual(result, expected_result) + + def test_do_mutate_retryable_rows_second_try_no_retryable(self): + from google.cloud.bigtable._generated.bigtable_pb2 import ( + MutateRowsResponse) + from google.cloud.bigtable.row import DirectRow + from google.rpc.status_pb2 import Status + from tests.unit._testing import _FakeStub + + # Setup: + # - Mutate 2 rows. + # - First try results: [success, non-retryable] + # Action: + # - Second try has no row to retry. + # Expectation: + # - After second try: [success, non-retryable] + + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + table = self._make_table(self.TABLE_ID, instance) + + row_1 = DirectRow(row_key=b'row_key', table=table) + row_1.set_cell('cf', b'col', b'value1') + row_2 = DirectRow(row_key=b'row_key_2', table=table) + row_2.set_cell('cf', b'col', b'value2') + + worker = self._make_worker(table._instance._client, + table.name, [row_1, row_2]) + worker.responses_statuses = self._make_responses_statuses( + [0, 1]) + + statuses = worker._do_mutate_retryable_rows() + result = [status.code for status in statuses] + expected_result = [0, 1] + + self.assertEqual(result, expected_result) + + class Test__create_row_request(unittest.TestCase): def _call_fut(self, table_name, row_key=None, start_key=None, end_key=None, @@ -804,11 +1060,6 @@ def _ColumnFamilyPB(*args, **kw): return table_v2_pb2.ColumnFamily(*args, **kw) -def _MockRetryableMutateRowsWorker(): - from google.cloud.bigtable.table import _RetryableMutateRowsWorker - return mock.create_autospec(_RetryableMutateRowsWorker) - - class _Client(object): data_stub = None From b78fd87fdd046af23bfcaecdc54b739fa9ca1e30 Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Fri, 6 Oct 2017 17:40:55 -0700 Subject: [PATCH 07/17] Refactor test for _RetryableMutateRowsWorker to cover callable interface. --- bigtable/tests/unit/test_table.py | 41 ++----------------------------- 1 file changed, 2 insertions(+), 39 deletions(-) diff --git a/bigtable/tests/unit/test_table.py b/bigtable/tests/unit/test_table.py index 6f9a9849b43c..70ba5780919f 100644 --- a/bigtable/tests/unit/test_table.py +++ b/bigtable/tests/unit/test_table.py @@ -451,43 +451,6 @@ def test_read_row_still_partial(self): with self.assertRaises(ValueError): self._read_row_helper(chunks, None) - def test_mutate_rows(self): - from google.cloud.bigtable._generated.bigtable_pb2 import ( - MutateRowsResponse) - from google.cloud.bigtable.row import DirectRow - from google.rpc.status_pb2 import Status - from tests.unit._testing import _FakeStub - - client = _Client() - instance = _Instance(self.INSTANCE_NAME, client=client) - table = self._make_one(self.TABLE_ID, instance) - - row_1 = DirectRow(row_key=b'row_key', table=table) - row_1.set_cell('cf', b'col', b'value1') - row_2 = DirectRow(row_key=b'row_key_2', table=table) - row_2.set_cell('cf', b'col', b'value2') - - response = MutateRowsResponse( - entries=[ - MutateRowsResponse.Entry( - index=0, - status=Status(code=0), - ), - MutateRowsResponse.Entry( - index=1, - status=Status(code=1), - ), - ], - ) - - # Patch the stub used by the API method. - client._data_stub = _FakeStub([response]) - statuses = table.mutate_rows([row_1, row_2]) - result = [status.code for status in statuses] - expected_result = [0, 1] - - self.assertEqual(result, expected_result) - def test_retryable_mutate_rows_no_retry(self): from google.rpc.status_pb2 import Status @@ -677,13 +640,13 @@ def test_next_retryable_row_index(self): self.assertEqual(worker._next_retryable_row_index(4), 4) self.assertEqual(worker._next_retryable_row_index(5), -1) - def test_do_mutate_retryable_rows_empty_rows(self): + def test_callable_empty_rows(self): client = _Client() instance = _Instance(self.INSTANCE_NAME, client=client) table = self._make_table(self.TABLE_ID, instance) worker = self._make_worker(table._instance._client, table.name, []) - statuses = worker._do_mutate_retryable_rows() + statuses = worker() self.assertEqual(len(statuses), 0) From 218c5b740b9a3c747a2f0be453f47b3fd140278e Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Wed, 11 Oct 2017 15:16:59 -0700 Subject: [PATCH 08/17] Remove assumption that responses come back in the same linear order as request rows. --- bigtable/google/cloud/bigtable/table.py | 15 +++------------ bigtable/tests/unit/test_table.py | 21 ++------------------- 2 files changed, 5 insertions(+), 31 deletions(-) diff --git a/bigtable/google/cloud/bigtable/table.py b/bigtable/google/cloud/bigtable/table.py index 961c9ca2693c..23bcf05d0172 100644 --- a/bigtable/google/cloud/bigtable/table.py +++ b/bigtable/google/cloud/bigtable/table.py @@ -402,20 +402,13 @@ def _is_retryable(self, status): # pylint: disable=no-self-use return (status is None or status.code in _RetryableMutateRowsWorker.RETRY_CODES) - def _next_retryable_row_index(self, begin_index): - i = begin_index - while i < len(self.responses_statuses): - status = self.responses_statuses[i] - if self._is_retryable(status): - return i - i += 1 - return -1 - def _do_mutate_retryable_rows(self): retryable_rows = [] + index_into_all_rows = [] for i, status in enumerate(self.responses_statuses): if self._is_retryable(status): retryable_rows.append(self.rows[i]) + index_into_all_rows.append(i) if not retryable_rows: # All mutations are either successful or non-retryable now. @@ -428,15 +421,13 @@ def _do_mutate_retryable_rows(self): num_retryable_responses = 0 for response in responses: - index = 0 for entry in response.entries: - index = self._next_retryable_row_index(index) + index = index_into_all_rows[entry.index] self.responses_statuses[index] = entry.status if self._is_retryable(entry.status): num_retryable_responses += 1 if entry.status.code == 0: self.rows[index].clear() - index += 1 if num_retryable_responses: raise _MutateRowsRetryableError() diff --git a/bigtable/tests/unit/test_table.py b/bigtable/tests/unit/test_table.py index 70ba5780919f..946fb0d5b181 100644 --- a/bigtable/tests/unit/test_table.py +++ b/bigtable/tests/unit/test_table.py @@ -623,23 +623,6 @@ def _make_responses_statuses(self, codes): response = [Status(code=code) for code in codes] return response - def test_next_retryable_row_index_empty_rows(self): - worker = self._make_worker(mock.MagicMock(), mock.MagicMock(), []) - worker.responses_statuses = self._make_responses_statuses([]) - self.assertEqual(worker._next_retryable_row_index(0), -1) - self.assertEqual(worker._next_retryable_row_index(1), -1) - - def test_next_retryable_row_index(self): - worker = self._make_worker(mock.MagicMock(), mock.MagicMock(), []) - worker.responses_statuses = self._make_responses_statuses( - [4, 10, 14, 0, 4, 1]) - self.assertEqual(worker._next_retryable_row_index(0), 0) - self.assertEqual(worker._next_retryable_row_index(1), 1) - self.assertEqual(worker._next_retryable_row_index(2), 2) - self.assertEqual(worker._next_retryable_row_index(3), 4) - self.assertEqual(worker._next_retryable_row_index(4), 4) - self.assertEqual(worker._next_retryable_row_index(5), -1) - def test_callable_empty_rows(self): client = _Client() instance = _Instance(self.INSTANCE_NAME, client=client) @@ -736,7 +719,7 @@ def test_do_mutate_retryable_rows_retry(self): status=Status(code=4), ), MutateRowsResponse.Entry( - index=1, + index=2, status=Status(code=1), ), ], @@ -793,7 +776,7 @@ def test_do_mutate_retryable_rows_second_try(self): status=Status(code=1), ), MutateRowsResponse.Entry( - index=0, + index=1, status=Status(code=0), ), ], From 94cd58421b37c363c4dfc5b7930e38799c6396b0 Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Wed, 11 Oct 2017 18:07:55 -0700 Subject: [PATCH 09/17] Include retryable responses in the exception raised. --- bigtable/google/cloud/bigtable/table.py | 14 +++-- bigtable/tests/unit/test_table.py | 82 ++++++++++++++++++++++++- 2 files changed, 88 insertions(+), 8 deletions(-) diff --git a/bigtable/google/cloud/bigtable/table.py b/bigtable/google/cloud/bigtable/table.py index 23bcf05d0172..e3f5fb46dc87 100644 --- a/bigtable/google/cloud/bigtable/table.py +++ b/bigtable/google/cloud/bigtable/table.py @@ -377,7 +377,10 @@ def sample_row_keys(self): class _MutateRowsRetryableError(Exception): """A retryable error in Mutate Rows response.""" - pass + + def __init__(self, retryable_responses): + super(_MutateRowsRetryableError, self).__init__() + self.retryable_responses = retryable_responses class _RetryableMutateRowsWorker(object): @@ -419,18 +422,19 @@ def _do_mutate_retryable_rows(self): responses = self.client._data_stub.MutateRows( mutate_rows_request) - num_retryable_responses = 0 + retryable_responses = [] for response in responses: for entry in response.entries: index = index_into_all_rows[entry.index] self.responses_statuses[index] = entry.status if self._is_retryable(entry.status): - num_retryable_responses += 1 + entry.index = index # Override with index into all rows + retryable_responses.append(entry) if entry.status.code == 0: self.rows[index].clear() - if num_retryable_responses: - raise _MutateRowsRetryableError() + if retryable_responses: + raise _MutateRowsRetryableError(retryable_responses) return self.responses_statuses diff --git a/bigtable/tests/unit/test_table.py b/bigtable/tests/unit/test_table.py index 946fb0d5b181..85d8fc8798cf 100644 --- a/bigtable/tests/unit/test_table.py +++ b/bigtable/tests/unit/test_table.py @@ -478,7 +478,7 @@ def test_retryable_mutate_rows_retry(self): response = [Status(code=0), Status(code=1)] - mock_worker = mock.Mock(side_effect=[_MutateRowsRetryableError, response]) + mock_worker = mock.Mock(side_effect=[_MutateRowsRetryableError([]), response]) with mock.patch( 'google.cloud.bigtable.table._RetryableMutateRowsWorker', new=mock.MagicMock(return_value=mock_worker)): @@ -498,7 +498,7 @@ def test_retryable_mutate_rows_retry_timeout(self): response = [Status(code=0), Status(code=4)] mock_worker = mock.Mock( - side_effect=[_MutateRowsRetryableError, _MutateRowsRetryableError], + side_effect=[_MutateRowsRetryableError([]), _MutateRowsRetryableError([])], responses_statuses=response) # total_timeout_millis = 5 * 60 * 1000 mock_time = mock.Mock(side_effect=[0, 2000, 5 * 60 * 1000]) @@ -731,14 +731,90 @@ def test_do_mutate_retryable_rows_retry(self): worker = self._make_worker(table._instance._client, table.name, [row_1, row_2, row_3]) - with self.assertRaises(_MutateRowsRetryableError): + with self.assertRaises(_MutateRowsRetryableError) as cm: worker._do_mutate_retryable_rows() + + err = cm.exception + self.assertEqual(len(err.retryable_responses), 1) + self.assertEqual(err.retryable_responses[0].index, 1) + self.assertEqual(err.retryable_responses[0].status.code, 4) + statuses = worker.responses_statuses result = [status.code for status in statuses] expected_result = [0, 4, 1] self.assertEqual(result, expected_result) + def test_do_mutate_retryable_rows_second_retry(self): + from google.cloud.bigtable._generated.bigtable_pb2 import ( + MutateRowsResponse) + from google.cloud.bigtable.table import _MutateRowsRetryableError + from google.cloud.bigtable.row import DirectRow + from google.rpc.status_pb2 import Status + from tests.unit._testing import _FakeStub + + # Setup: + # - Mutate 4 rows. + # - First try results: + # [success, retryable, non-retryable, retryable] + # Action: + # - Second try should re-attempt the 'retryable' rows. + # Expectation: + # - After second try: + # [success, success, non-retryable, retryable] + # - One of the rows tried second time returns retryable error code, + # so expect a raise. + # - Exception contains response whose index should be '3' even though + # only two rows were retried. + + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + table = self._make_table(self.TABLE_ID, instance) + + row_1 = DirectRow(row_key=b'row_key', table=table) + row_1.set_cell('cf', b'col', b'value1') + row_2 = DirectRow(row_key=b'row_key_2', table=table) + row_2.set_cell('cf', b'col', b'value2') + row_3 = DirectRow(row_key=b'row_key_3', table=table) + row_3.set_cell('cf', b'col', b'value3') + row_4 = DirectRow(row_key=b'row_key_4', table=table) + row_4.set_cell('cf', b'col', b'value4') + + response = MutateRowsResponse( + entries=[ + MutateRowsResponse.Entry( + index=0, + status=Status(code=0), + ), + MutateRowsResponse.Entry( + index=1, + status=Status(code=4), + ), + ], + ) + + # Patch the stub used by the API method. + client._data_stub = _FakeStub([response]) + + worker = self._make_worker(table._instance._client, + table.name, [row_1, row_2, row_3, row_4]) + worker.responses_statuses = self._make_responses_statuses( + [0, 4, 1, 10]) + + with self.assertRaises(_MutateRowsRetryableError) as cm: + worker._do_mutate_retryable_rows() + + err = cm.exception + self.assertEqual(len(err.retryable_responses), 1) + self.assertEqual(err.retryable_responses[0].index, 3) + self.assertEqual(err.retryable_responses[0].status.code, 4) + + statuses = worker.responses_statuses + result = [status.code for status in statuses] + expected_result = [0, 0, 1, 4] + + self.assertEqual(result, expected_result) + def test_do_mutate_retryable_rows_second_try(self): from google.cloud.bigtable._generated.bigtable_pb2 import ( MutateRowsResponse) From 21cf7d964048c6b3c28eac724a237babcfb3e940 Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Wed, 11 Oct 2017 19:36:34 -0700 Subject: [PATCH 10/17] Assign default value for init argument. --- bigtable/google/cloud/bigtable/table.py | 4 ++-- bigtable/tests/unit/test_table.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/bigtable/google/cloud/bigtable/table.py b/bigtable/google/cloud/bigtable/table.py index e3f5fb46dc87..25ad1dc7979c 100644 --- a/bigtable/google/cloud/bigtable/table.py +++ b/bigtable/google/cloud/bigtable/table.py @@ -378,9 +378,9 @@ def sample_row_keys(self): class _MutateRowsRetryableError(Exception): """A retryable error in Mutate Rows response.""" - def __init__(self, retryable_responses): + def __init__(self, retryable_responses=None): super(_MutateRowsRetryableError, self).__init__() - self.retryable_responses = retryable_responses + self.retryable_responses = retryable_responses or [] class _RetryableMutateRowsWorker(object): diff --git a/bigtable/tests/unit/test_table.py b/bigtable/tests/unit/test_table.py index 85d8fc8798cf..cdf9356d8da4 100644 --- a/bigtable/tests/unit/test_table.py +++ b/bigtable/tests/unit/test_table.py @@ -478,7 +478,7 @@ def test_retryable_mutate_rows_retry(self): response = [Status(code=0), Status(code=1)] - mock_worker = mock.Mock(side_effect=[_MutateRowsRetryableError([]), response]) + mock_worker = mock.Mock(side_effect=[_MutateRowsRetryableError, response]) with mock.patch( 'google.cloud.bigtable.table._RetryableMutateRowsWorker', new=mock.MagicMock(return_value=mock_worker)): @@ -498,7 +498,7 @@ def test_retryable_mutate_rows_retry_timeout(self): response = [Status(code=0), Status(code=4)] mock_worker = mock.Mock( - side_effect=[_MutateRowsRetryableError([]), _MutateRowsRetryableError([])], + side_effect=[_MutateRowsRetryableError, _MutateRowsRetryableError], responses_statuses=response) # total_timeout_millis = 5 * 60 * 1000 mock_time = mock.Mock(side_effect=[0, 2000, 5 * 60 * 1000]) From ca205992300495f9202a31e93f1829bfa579fc82 Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Thu, 12 Oct 2017 11:17:08 -0700 Subject: [PATCH 11/17] Change _MILLIS_PER_SECOND to float for better deadline precision. --- bigtable/google/cloud/bigtable/table.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigtable/google/cloud/bigtable/table.py b/bigtable/google/cloud/bigtable/table.py index 25ad1dc7979c..4b1445d986ba 100644 --- a/bigtable/google/cloud/bigtable/table.py +++ b/bigtable/google/cloud/bigtable/table.py @@ -39,7 +39,7 @@ # https://cloud.google.com/bigtable/docs/reference/data/rpc/google.bigtable.v2#google.bigtable.v2.MutateRowRequest _MAX_BULK_MUTATIONS = 100000 -_MILLIS_PER_SECOND = 1000 +_MILLIS_PER_SECOND = 1000.0 class TableMismatchError(ValueError): From 7ef087feaee39201b365352ab955001955fb60c8 Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Fri, 13 Oct 2017 00:21:59 -0700 Subject: [PATCH 12/17] Use google.api.core.retry.Retry. Allow user-configurable retry-options. --- api_core/google/api_core/retry.py | 29 ++++ bigtable/google/cloud/bigtable/table.py | 48 +++--- bigtable/tests/unit/test_table.py | 191 ++++++++++++++++++------ 3 files changed, 196 insertions(+), 72 deletions(-) diff --git a/api_core/google/api_core/retry.py b/api_core/google/api_core/retry.py index ccbb883ab13e..64e440280026 100644 --- a/api_core/google/api_core/retry.py +++ b/api_core/google/api_core/retry.py @@ -56,6 +56,7 @@ def check_if_exists(): from __future__ import unicode_literals +import collections import datetime import functools import logging @@ -201,6 +202,34 @@ def retry_target(target, predicate, sleep_generator, deadline, on_error=None): raise ValueError('Sleep generator stopped yielding sleep values.') +class RetryOptions( + collections.namedtuple( + 'RetryOptions', + ['initial', + 'maximum', + 'multiplier', + 'deadline'])): + # pylint: disable=too-few-public-methods + + def __new__(cls, + initial=_DEFAULT_INITIAL_DELAY, + maximum=_DEFAULT_MAXIMUM_DELAY, + multiplier=_DEFAULT_DELAY_MULTIPLIER, + deadline=_DEFAULT_DEADLINE): + assert isinstance(initial, float), 'should be a float' + assert isinstance(maximum, float), 'should be a float' + assert isinstance(multiplier, float), 'should be a float' + assert isinstance(deadline, float), 'should be a float' + assert initial <= maximum and initial <= deadline + + return super(cls, RetryOptions).__new__( + cls, + initial, + maximum, + multiplier, + deadline) + + @six.python_2_unicode_compatible class Retry(object): """Exponential retry decorator. diff --git a/bigtable/google/cloud/bigtable/table.py b/bigtable/google/cloud/bigtable/table.py index 4b1445d986ba..a7535c549477 100644 --- a/bigtable/google/cloud/bigtable/table.py +++ b/bigtable/google/cloud/bigtable/table.py @@ -15,10 +15,12 @@ """User-friendly container for Google Cloud Bigtable Table.""" -import random -import time import six +from google.api.core.exceptions import RetryError +from google.api.core.retry import Retry +from google.api.core.retry import RetryOptions +from google.api.core.retry import if_exception_type from google.cloud._helpers import _to_bytes from google.cloud.bigtable._generated import ( bigtable_pb2 as data_messages_v2_pb2) @@ -39,8 +41,6 @@ # https://cloud.google.com/bigtable/docs/reference/data/rpc/google.bigtable.v2#google.bigtable.v2.MutateRowRequest _MAX_BULK_MUTATIONS = 100000 -_MILLIS_PER_SECOND = 1000.0 - class TableMismatchError(ValueError): """Row from another table.""" @@ -301,7 +301,7 @@ def read_rows(self, start_key=None, end_key=None, limit=None, # We expect an iterator of `data_messages_v2_pb2.ReadRowsResponse` return PartialRowsData(response_iterator) - def mutate_rows(self, rows): + def mutate_rows(self, rows, retry_options=None): """Mutates multiple rows in bulk. The method tries to update all specified rows. @@ -312,30 +312,17 @@ def mutate_rows(self, rows): :type rows: list :param rows: List or other iterable of :class:`.DirectRow` instances. + :type retry_options: :class:`~google.api.core.retry.RetryOptions` + :param retry_options: (Optional) Retry delay and deadline arguments. + :rtype: list :returns: A list of response statuses (`google.rpc.status_pb2.Status`) corresponding to success or failure of each row mutation sent. These will be in the same order as the `rows`. """ - delay_millis = 1000 - delay_mult = 1.5 - max_delay_millis = 15 * 1000 - total_timeout_millis = 5 * 60 * 1000 - - now = time.time() - deadline = now + total_timeout_millis / _MILLIS_PER_SECOND - retryable_mutate_rows = _RetryableMutateRowsWorker( - self._instance._client, self.name, rows) - while now < deadline: - try: - return retryable_mutate_rows() - except _MutateRowsRetryableError: - to_sleep = random.uniform(0, delay_millis * 2) - time.sleep(to_sleep / _MILLIS_PER_SECOND) - delay_millis = min(delay_millis * delay_mult, max_delay_millis) - now = time.time() - return retryable_mutate_rows.responses_statuses + self._instance._client, self.name, rows, retry_options or RetryOptions()) + return retryable_mutate_rows() def sample_row_keys(self): """Read a sample of row keys in the table. @@ -391,15 +378,26 @@ class _RetryableMutateRowsWorker(object): StatusCode.UNAVAILABLE.value[0], ) - def __init__(self, client, table_name, rows): + def __init__(self, client, table_name, rows, retry_options=None): self.client = client self.table_name = table_name self.rows = rows + self.retry_options = retry_options or RetryOptions() self.responses_statuses = [ None for _ in six.moves.xrange(len(self.rows))] def __call__(self): - return self._do_mutate_retryable_rows() + retry = Retry(predicate=if_exception_type(_MutateRowsRetryableError), + initial=self.retry_options.initial, + maximum=self.retry_options.maximum, + multiplier=self.retry_options.multiplier, + deadline=self.retry_options.deadline) + try: + retry(self.__class__._do_mutate_retryable_rows)(self) + except (RetryError, ValueError) as err: + # Upon timeout or sleep generator error, return responses_statuses + pass + return self.responses_statuses def _is_retryable(self, status): # pylint: disable=no-self-use return (status is None or diff --git a/bigtable/tests/unit/test_table.py b/bigtable/tests/unit/test_table.py index cdf9356d8da4..ce45cfe58252 100644 --- a/bigtable/tests/unit/test_table.py +++ b/bigtable/tests/unit/test_table.py @@ -451,7 +451,7 @@ def test_read_row_still_partial(self): with self.assertRaises(ValueError): self._read_row_helper(chunks, None) - def test_retryable_mutate_rows_no_retry(self): + def test_mutate_rows(self): from google.rpc.status_pb2 import Status instance = mock.MagicMock() @@ -459,7 +459,7 @@ def test_retryable_mutate_rows_no_retry(self): response = [Status(code=0), Status(code=1)] - mock_worker = mock.Mock(side_effect=[response]) + mock_worker = mock.Mock(return_value=response) with mock.patch( 'google.cloud.bigtable.table._RetryableMutateRowsWorker', new=mock.MagicMock(return_value=mock_worker)): @@ -469,49 +469,6 @@ def test_retryable_mutate_rows_no_retry(self): self.assertEqual(result, expected_result) - def test_retryable_mutate_rows_retry(self): - from google.cloud.bigtable.table import _MutateRowsRetryableError - from google.rpc.status_pb2 import Status - - instance = mock.MagicMock() - table = self._make_one(self.TABLE_ID, instance) - - response = [Status(code=0), Status(code=1)] - - mock_worker = mock.Mock(side_effect=[_MutateRowsRetryableError, response]) - with mock.patch( - 'google.cloud.bigtable.table._RetryableMutateRowsWorker', - new=mock.MagicMock(return_value=mock_worker)): - statuses = table.mutate_rows([mock.MagicMock(), mock.MagicMock()]) - result = [status.code for status in statuses] - expected_result = [0, 1] - - self.assertEqual(result, expected_result) - - def test_retryable_mutate_rows_retry_timeout(self): - from google.cloud.bigtable.table import _MutateRowsRetryableError - from google.rpc.status_pb2 import Status - - instance = mock.MagicMock() - table = self._make_one(self.TABLE_ID, instance) - - response = [Status(code=0), Status(code=4)] - - mock_worker = mock.Mock( - side_effect=[_MutateRowsRetryableError, _MutateRowsRetryableError], - responses_statuses=response) - # total_timeout_millis = 5 * 60 * 1000 - mock_time = mock.Mock(side_effect=[0, 2000, 5 * 60 * 1000]) - with mock.patch( - 'google.cloud.bigtable.table._RetryableMutateRowsWorker', - new=mock.MagicMock(return_value=mock_worker)), mock.patch( - 'time.time', new=mock_time): - statuses = table.mutate_rows([mock.MagicMock(), mock.MagicMock()]) - result = [status.code for status in statuses] - expected_result = [0, 4] - - self.assertEqual(result, expected_result) - def test_read_rows(self): from google.cloud._testing import _Monkey from tests.unit._testing import _FakeStub @@ -633,6 +590,143 @@ def test_callable_empty_rows(self): self.assertEqual(len(statuses), 0) + def test_callable_retry(self): + from google.api.core.retry import RetryOptions + from google.cloud.bigtable._generated.bigtable_pb2 import ( + MutateRowsResponse) + from google.cloud.bigtable.row import DirectRow + from google.rpc.status_pb2 import Status + + # Setup: + # - Mutate 3 rows. + # Action: + # - Initial attempt will mutate all 3 rows. + # Expectation: + # - First attempt will result in one retryable error. + # - Second attempt will result in success for the retry-ed row. + # - Check MutateRows is called twice. + # - State of responses_statuses should be + # [success, success, non-retryable] + + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + table = self._make_table(self.TABLE_ID, instance) + + row_1 = DirectRow(row_key=b'row_key', table=table) + row_1.set_cell('cf', b'col', b'value1') + row_2 = DirectRow(row_key=b'row_key_2', table=table) + row_2.set_cell('cf', b'col', b'value2') + row_3 = DirectRow(row_key=b'row_key_3', table=table) + row_3.set_cell('cf', b'col', b'value3') + + response_1 = MutateRowsResponse( + entries=[ + MutateRowsResponse.Entry( + index=0, + status=Status(code=0), + ), + MutateRowsResponse.Entry( + index=1, + status=Status(code=4), + ), + MutateRowsResponse.Entry( + index=2, + status=Status(code=1), + ), + ], + ) + + response_2 = MutateRowsResponse( + entries=[ + MutateRowsResponse.Entry( + index=0, + status=Status(code=0), + ), + ], + ) + + # Patch the stub used by the API method. + client._data_stub = mock.MagicMock() + client._data_stub.MutateRows.side_effect = [[response_1], [response_2]] + + retry_options = RetryOptions(initial=0.1) + worker = self._make_worker(client, + table.name, [row_1, row_2, row_3], retry_options) + statuses = worker() + + result = [status.code for status in statuses] + expected_result = [0, 0, 1] + + client._data_stub.MutateRows.assert_has_calls([mock.call(mock.ANY), mock.call(mock.ANY)]) + self.assertEqual(client._data_stub.MutateRows.call_count, 2) + self.assertEqual(result, expected_result) + + def test_callable_retry_timeout(self): + from google.api.core.retry import RetryOptions + from google.cloud.bigtable._generated.bigtable_pb2 import ( + MutateRowsResponse) + from google.cloud.bigtable.row import DirectRow + from google.rpc.status_pb2 import Status + + # Setup: + # - Mutate 2 rows. + # Action: + # - Initial attempt will mutate all 2 rows. + # Expectation: + # - Both rows always return retryable errors. + # - google.api.core.Retry should keep retrying. + # - Check MutateRows is called multiple times. + # - By the time deadline is reached, statuses should be + # [retryable, retryable] + + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + table = self._make_table(self.TABLE_ID, instance) + + row_1 = DirectRow(row_key=b'row_key', table=table) + row_1.set_cell('cf', b'col', b'value1') + row_2 = DirectRow(row_key=b'row_key_2', table=table) + row_2.set_cell('cf', b'col', b'value2') + + response = MutateRowsResponse( + entries=[ + MutateRowsResponse.Entry( + index=0, + status=Status(code=4), + ), + MutateRowsResponse.Entry( + index=1, + status=Status(code=4), + ), + ], + ) + + # Patch the stub used by the API method. + client._data_stub = mock.MagicMock() + client._data_stub.MutateRows.return_value = [response] + + retry_options = RetryOptions(initial=0.1, maximum=0.2, + multiplier=2.0, deadline=0.5) + worker = self._make_worker(client, + table.name, [row_1, row_2], retry_options) + statuses = worker() + + result = [status.code for status in statuses] + expected_result = [4, 4] + + self.assertTrue(client._data_stub.MutateRows.call_count > 1) + self.assertEqual(result, expected_result) + + def test_do_mutate_retryable_rows_empty_rows(self): + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + table = self._make_table(self.TABLE_ID, instance) + + worker = self._make_worker(table._instance._client, table.name, []) + statuses = worker._do_mutate_retryable_rows() + + self.assertEqual(len(statuses), 0) + def test_do_mutate_retryable_rows(self): from google.cloud.bigtable._generated.bigtable_pb2 import ( MutateRowsResponse) @@ -672,9 +766,10 @@ def test_do_mutate_retryable_rows(self): # Patch the stub used by the API method. client._data_stub = _FakeStub([response]) - worker = self._make_worker(table._instance._client, table.name, [row_1, row_2]) - + worker = self._make_worker(table._instance._client, + table.name, [row_1, row_2]) statuses = worker._do_mutate_retryable_rows() + result = [status.code for status in statuses] expected_result = [0, 1] @@ -867,6 +962,7 @@ def test_do_mutate_retryable_rows_second_try(self): [0, 4, 1, 10]) statuses = worker._do_mutate_retryable_rows() + result = [status.code for status in statuses] expected_result = [0, 1, 1, 0] @@ -902,6 +998,7 @@ def test_do_mutate_retryable_rows_second_try_no_retryable(self): [0, 1]) statuses = worker._do_mutate_retryable_rows() + result = [status.code for status in statuses] expected_result = [0, 1] From ba6aa7b77c2228b930cffe923785a89bf45a1b03 Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Fri, 13 Oct 2017 14:02:40 -0700 Subject: [PATCH 13/17] Revert retry.py. Modify mutate_rows api to take a Retry instead with a DEFAULT RETRY. --- api_core/google/api_core/retry.py | 29 ---------------- bigtable/google/cloud/bigtable/table.py | 45 +++++++++++++------------ bigtable/tests/unit/test_table.py | 22 ++++++------ 3 files changed, 34 insertions(+), 62 deletions(-) diff --git a/api_core/google/api_core/retry.py b/api_core/google/api_core/retry.py index 64e440280026..ccbb883ab13e 100644 --- a/api_core/google/api_core/retry.py +++ b/api_core/google/api_core/retry.py @@ -56,7 +56,6 @@ def check_if_exists(): from __future__ import unicode_literals -import collections import datetime import functools import logging @@ -202,34 +201,6 @@ def retry_target(target, predicate, sleep_generator, deadline, on_error=None): raise ValueError('Sleep generator stopped yielding sleep values.') -class RetryOptions( - collections.namedtuple( - 'RetryOptions', - ['initial', - 'maximum', - 'multiplier', - 'deadline'])): - # pylint: disable=too-few-public-methods - - def __new__(cls, - initial=_DEFAULT_INITIAL_DELAY, - maximum=_DEFAULT_MAXIMUM_DELAY, - multiplier=_DEFAULT_DELAY_MULTIPLIER, - deadline=_DEFAULT_DEADLINE): - assert isinstance(initial, float), 'should be a float' - assert isinstance(maximum, float), 'should be a float' - assert isinstance(multiplier, float), 'should be a float' - assert isinstance(deadline, float), 'should be a float' - assert initial <= maximum and initial <= deadline - - return super(cls, RetryOptions).__new__( - cls, - initial, - maximum, - multiplier, - deadline) - - @six.python_2_unicode_compatible class Retry(object): """Exponential retry decorator. diff --git a/bigtable/google/cloud/bigtable/table.py b/bigtable/google/cloud/bigtable/table.py index a7535c549477..634df2009fbd 100644 --- a/bigtable/google/cloud/bigtable/table.py +++ b/bigtable/google/cloud/bigtable/table.py @@ -19,7 +19,6 @@ from google.api.core.exceptions import RetryError from google.api.core.retry import Retry -from google.api.core.retry import RetryOptions from google.api.core.retry import if_exception_type from google.cloud._helpers import _to_bytes from google.cloud.bigtable._generated import ( @@ -50,6 +49,22 @@ class TooManyMutationsError(ValueError): """The number of mutations for bulk request is too big.""" +class _MutateRowsRetryableError(Exception): + """A retryable error in Mutate Rows response.""" + + def __init__(self, retryable_responses=None): + super(_MutateRowsRetryableError, self).__init__() + self.retryable_responses = retryable_responses or [] + + +MUTATE_ROWS_DEFAULT_RETRY = Retry( + predicate=if_exception_type(_MutateRowsRetryableError), + initial=1.0, + maximum=15.0, + multiplier=2.0, + deadline=60.0 * 2.0) + + class Table(object): """Representation of a Google Cloud Bigtable Table. @@ -301,7 +316,7 @@ def read_rows(self, start_key=None, end_key=None, limit=None, # We expect an iterator of `data_messages_v2_pb2.ReadRowsResponse` return PartialRowsData(response_iterator) - def mutate_rows(self, rows, retry_options=None): + def mutate_rows(self, rows, retry=MUTATE_ROWS_DEFAULT_RETRY): """Mutates multiple rows in bulk. The method tries to update all specified rows. @@ -312,8 +327,8 @@ def mutate_rows(self, rows, retry_options=None): :type rows: list :param rows: List or other iterable of :class:`.DirectRow` instances. - :type retry_options: :class:`~google.api.core.retry.RetryOptions` - :param retry_options: (Optional) Retry delay and deadline arguments. + :type retry: :class:`~google.api.core.retry.Retry` + :param retry: (Optional) Retry delay and deadline arguments. :rtype: list :returns: A list of response statuses (`google.rpc.status_pb2.Status`) @@ -321,8 +336,8 @@ def mutate_rows(self, rows, retry_options=None): sent. These will be in the same order as the `rows`. """ retryable_mutate_rows = _RetryableMutateRowsWorker( - self._instance._client, self.name, rows, retry_options or RetryOptions()) - return retryable_mutate_rows() + self._instance._client, self.name, rows) + return retryable_mutate_rows(retry=retry) def sample_row_keys(self): """Read a sample of row keys in the table. @@ -362,14 +377,6 @@ def sample_row_keys(self): return response_iterator -class _MutateRowsRetryableError(Exception): - """A retryable error in Mutate Rows response.""" - - def __init__(self, retryable_responses=None): - super(_MutateRowsRetryableError, self).__init__() - self.retryable_responses = retryable_responses or [] - - class _RetryableMutateRowsWorker(object): # pylint: disable=unsubscriptable-object RETRY_CODES = ( @@ -378,20 +385,14 @@ class _RetryableMutateRowsWorker(object): StatusCode.UNAVAILABLE.value[0], ) - def __init__(self, client, table_name, rows, retry_options=None): + def __init__(self, client, table_name, rows): self.client = client self.table_name = table_name self.rows = rows - self.retry_options = retry_options or RetryOptions() self.responses_statuses = [ None for _ in six.moves.xrange(len(self.rows))] - def __call__(self): - retry = Retry(predicate=if_exception_type(_MutateRowsRetryableError), - initial=self.retry_options.initial, - maximum=self.retry_options.maximum, - multiplier=self.retry_options.multiplier, - deadline=self.retry_options.deadline) + def __call__(self, retry=MUTATE_ROWS_DEFAULT_RETRY): try: retry(self.__class__._do_mutate_retryable_rows)(self) except (RetryError, ValueError) as err: diff --git a/bigtable/tests/unit/test_table.py b/bigtable/tests/unit/test_table.py index ce45cfe58252..b1f5d8d3d4aa 100644 --- a/bigtable/tests/unit/test_table.py +++ b/bigtable/tests/unit/test_table.py @@ -591,10 +591,11 @@ def test_callable_empty_rows(self): self.assertEqual(len(statuses), 0) def test_callable_retry(self): - from google.api.core.retry import RetryOptions + from google.api.core.retry import Retry from google.cloud.bigtable._generated.bigtable_pb2 import ( MutateRowsResponse) from google.cloud.bigtable.row import DirectRow + from google.cloud.bigtable.table import MUTATE_ROWS_DEFAULT_RETRY from google.rpc.status_pb2 import Status # Setup: @@ -649,10 +650,9 @@ def test_callable_retry(self): client._data_stub = mock.MagicMock() client._data_stub.MutateRows.side_effect = [[response_1], [response_2]] - retry_options = RetryOptions(initial=0.1) - worker = self._make_worker(client, - table.name, [row_1, row_2, row_3], retry_options) - statuses = worker() + retry = MUTATE_ROWS_DEFAULT_RETRY.with_delay(initial=0.1) + worker = self._make_worker(client, table.name, [row_1, row_2, row_3]) + statuses = worker(retry=retry) result = [status.code for status in statuses] expected_result = [0, 0, 1] @@ -662,10 +662,11 @@ def test_callable_retry(self): self.assertEqual(result, expected_result) def test_callable_retry_timeout(self): - from google.api.core.retry import RetryOptions + from google.api.core.retry import Retry from google.cloud.bigtable._generated.bigtable_pb2 import ( MutateRowsResponse) from google.cloud.bigtable.row import DirectRow + from google.cloud.bigtable.table import MUTATE_ROWS_DEFAULT_RETRY from google.rpc.status_pb2 import Status # Setup: @@ -705,11 +706,10 @@ def test_callable_retry_timeout(self): client._data_stub = mock.MagicMock() client._data_stub.MutateRows.return_value = [response] - retry_options = RetryOptions(initial=0.1, maximum=0.2, - multiplier=2.0, deadline=0.5) - worker = self._make_worker(client, - table.name, [row_1, row_2], retry_options) - statuses = worker() + retry = MUTATE_ROWS_DEFAULT_RETRY.with_delay( + initial=0.1, maximum=0.2, multiplier=2.0).with_deadline(0.5) + worker = self._make_worker(client, table.name, [row_1, row_2]) + statuses = worker(retry=retry) result = [status.code for status in statuses] expected_result = [4, 4] From 761cf84289898e13e8229f5ae9518ea6e27563b9 Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Mon, 16 Oct 2017 18:16:10 -0700 Subject: [PATCH 14/17] Use google.api.core.exceptions for internal retryable worker instead of creating a custom exception. Also update documentation. --- bigtable/google/cloud/bigtable/table.py | 77 +++++++++++++++++-------- bigtable/tests/unit/test_table.py | 26 +++------ 2 files changed, 61 insertions(+), 42 deletions(-) diff --git a/bigtable/google/cloud/bigtable/table.py b/bigtable/google/cloud/bigtable/table.py index 634df2009fbd..339a11f8e3d9 100644 --- a/bigtable/google/cloud/bigtable/table.py +++ b/bigtable/google/cloud/bigtable/table.py @@ -18,6 +18,10 @@ import six from google.api.core.exceptions import RetryError +from google.api.core.exceptions import Aborted +from google.api.core.exceptions import DeadlineExceeded +from google.api.core.exceptions import ServiceUnavailable +from google.api.core.exceptions import from_grpc_status from google.api.core.retry import Retry from google.api.core.retry import if_exception_type from google.cloud._helpers import _to_bytes @@ -40,6 +44,15 @@ # https://cloud.google.com/bigtable/docs/reference/data/rpc/google.bigtable.v2#google.bigtable.v2.MutateRowRequest _MAX_BULK_MUTATIONS = 100000 +DEFAULT_RETRY = Retry( + predicate=if_exception_type((Aborted, + DeadlineExceeded, + ServiceUnavailable)), + initial=1.0, + maximum=15.0, + multiplier=2.0, + deadline=60.0 * 2.0) + class TableMismatchError(ValueError): """Row from another table.""" @@ -49,22 +62,6 @@ class TooManyMutationsError(ValueError): """The number of mutations for bulk request is too big.""" -class _MutateRowsRetryableError(Exception): - """A retryable error in Mutate Rows response.""" - - def __init__(self, retryable_responses=None): - super(_MutateRowsRetryableError, self).__init__() - self.retryable_responses = retryable_responses or [] - - -MUTATE_ROWS_DEFAULT_RETRY = Retry( - predicate=if_exception_type(_MutateRowsRetryableError), - initial=1.0, - maximum=15.0, - multiplier=2.0, - deadline=60.0 * 2.0) - - class Table(object): """Representation of a Google Cloud Bigtable Table. @@ -316,19 +313,23 @@ def read_rows(self, start_key=None, end_key=None, limit=None, # We expect an iterator of `data_messages_v2_pb2.ReadRowsResponse` return PartialRowsData(response_iterator) - def mutate_rows(self, rows, retry=MUTATE_ROWS_DEFAULT_RETRY): + def mutate_rows(self, rows, retry=DEFAULT_RETRY): """Mutates multiple rows in bulk. The method tries to update all specified rows. If some of the rows weren't updated, it would not remove mutations. They can be applied to the row separately. If row mutations finished successfully, they would be cleaned up. + Optionally specify a `retry` to re-attempt rows that return transient + errors, until all rows succeed or the deadline is reached. :type rows: list :param rows: List or other iterable of :class:`.DirectRow` instances. :type retry: :class:`~google.api.core.retry.Retry` - :param retry: (Optional) Retry delay and deadline arguments. + :param retry: (Optional) Retry delay and deadline arguments. Can be + specified using ``DEFAULT_RETRY.with_delay`` and/or + ``DEFAULT_RETRY.with_deadline``. :rtype: list :returns: A list of response statuses (`google.rpc.status_pb2.Status`) @@ -378,6 +379,13 @@ def sample_row_keys(self): class _RetryableMutateRowsWorker(object): + """A callable worker that can retry to mutate rows with transient errors. + + This class is a callable that can retry mutating rows that result in + transient errors. After all rows are successful or none of the rows + are retryable, any subsequent call on this callable will be a no-op. + """ + # pylint: disable=unsubscriptable-object RETRY_CODES = ( StatusCode.DEADLINE_EXCEEDED.value[0], @@ -392,7 +400,17 @@ def __init__(self, client, table_name, rows): self.responses_statuses = [ None for _ in six.moves.xrange(len(self.rows))] - def __call__(self, retry=MUTATE_ROWS_DEFAULT_RETRY): + def __call__(self, retry=DEFAULT_RETRY): + """Attempt to mutate all rows and retry rows with transient errors. + + Will retry the rows with transient errors until all rows succeed or + ``deadline`` specified in the `retry` is reached. + + :rtype: list + :returns: A list of response statuses (`google.rpc.status_pb2.Status`) + corresponding to success or failure of each row mutation + sent. These will be in the same order as the ``rows``. + """ try: retry(self.__class__._do_mutate_retryable_rows)(self) except (RetryError, ValueError) as err: @@ -405,6 +423,17 @@ def _is_retryable(self, status): # pylint: disable=no-self-use status.code in _RetryableMutateRowsWorker.RETRY_CODES) def _do_mutate_retryable_rows(self): + """Mutate all the rows that are eligible for retry. + + A row is eligible for retry if it has not been tried or if it resulted + in a transient error in a previous call. + + :rtype: list + :return: ``responses_statuses`` (`google.rpc.status_pb2.Status`) + :raises: :exc:`~google.api.core.exceptions.ServiceUnavailable` if any + row returned a transient error. An artificial exception + to work with ``DEFAULT_RETRY``. + """ retryable_rows = [] index_into_all_rows = [] for i, status in enumerate(self.responses_statuses): @@ -421,19 +450,19 @@ def _do_mutate_retryable_rows(self): responses = self.client._data_stub.MutateRows( mutate_rows_request) - retryable_responses = [] + num_retryable_responses = 0 for response in responses: for entry in response.entries: index = index_into_all_rows[entry.index] self.responses_statuses[index] = entry.status if self._is_retryable(entry.status): - entry.index = index # Override with index into all rows - retryable_responses.append(entry) + num_retryable_responses += 1 if entry.status.code == 0: self.rows[index].clear() - if retryable_responses: - raise _MutateRowsRetryableError(retryable_responses) + if num_retryable_responses: + raise from_grpc_status(StatusCode.UNAVAILABLE, + 'MutateRows retryable error.') return self.responses_statuses diff --git a/bigtable/tests/unit/test_table.py b/bigtable/tests/unit/test_table.py index b1f5d8d3d4aa..6c476c2c31e6 100644 --- a/bigtable/tests/unit/test_table.py +++ b/bigtable/tests/unit/test_table.py @@ -595,7 +595,7 @@ def test_callable_retry(self): from google.cloud.bigtable._generated.bigtable_pb2 import ( MutateRowsResponse) from google.cloud.bigtable.row import DirectRow - from google.cloud.bigtable.table import MUTATE_ROWS_DEFAULT_RETRY + from google.cloud.bigtable.table import DEFAULT_RETRY from google.rpc.status_pb2 import Status # Setup: @@ -650,7 +650,7 @@ def test_callable_retry(self): client._data_stub = mock.MagicMock() client._data_stub.MutateRows.side_effect = [[response_1], [response_2]] - retry = MUTATE_ROWS_DEFAULT_RETRY.with_delay(initial=0.1) + retry = DEFAULT_RETRY.with_delay(initial=0.1) worker = self._make_worker(client, table.name, [row_1, row_2, row_3]) statuses = worker(retry=retry) @@ -666,7 +666,7 @@ def test_callable_retry_timeout(self): from google.cloud.bigtable._generated.bigtable_pb2 import ( MutateRowsResponse) from google.cloud.bigtable.row import DirectRow - from google.cloud.bigtable.table import MUTATE_ROWS_DEFAULT_RETRY + from google.cloud.bigtable.table import DEFAULT_RETRY from google.rpc.status_pb2 import Status # Setup: @@ -706,7 +706,7 @@ def test_callable_retry_timeout(self): client._data_stub = mock.MagicMock() client._data_stub.MutateRows.return_value = [response] - retry = MUTATE_ROWS_DEFAULT_RETRY.with_delay( + retry = DEFAULT_RETRY.with_delay( initial=0.1, maximum=0.2, multiplier=2.0).with_deadline(0.5) worker = self._make_worker(client, table.name, [row_1, row_2]) statuses = worker(retry=retry) @@ -776,9 +776,9 @@ def test_do_mutate_retryable_rows(self): self.assertEqual(result, expected_result) def test_do_mutate_retryable_rows_retry(self): + from google.api.core.exceptions import ServiceUnavailable from google.cloud.bigtable._generated.bigtable_pb2 import ( MutateRowsResponse) - from google.cloud.bigtable.table import _MutateRowsRetryableError from google.cloud.bigtable.row import DirectRow from google.rpc.status_pb2 import Status from tests.unit._testing import _FakeStub @@ -826,14 +826,9 @@ def test_do_mutate_retryable_rows_retry(self): worker = self._make_worker(table._instance._client, table.name, [row_1, row_2, row_3]) - with self.assertRaises(_MutateRowsRetryableError) as cm: + with self.assertRaises(ServiceUnavailable): worker._do_mutate_retryable_rows() - err = cm.exception - self.assertEqual(len(err.retryable_responses), 1) - self.assertEqual(err.retryable_responses[0].index, 1) - self.assertEqual(err.retryable_responses[0].status.code, 4) - statuses = worker.responses_statuses result = [status.code for status in statuses] expected_result = [0, 4, 1] @@ -841,9 +836,9 @@ def test_do_mutate_retryable_rows_retry(self): self.assertEqual(result, expected_result) def test_do_mutate_retryable_rows_second_retry(self): + from google.api.core.exceptions import ServiceUnavailable from google.cloud.bigtable._generated.bigtable_pb2 import ( MutateRowsResponse) - from google.cloud.bigtable.table import _MutateRowsRetryableError from google.cloud.bigtable.row import DirectRow from google.rpc.status_pb2 import Status from tests.unit._testing import _FakeStub @@ -896,14 +891,9 @@ def test_do_mutate_retryable_rows_second_retry(self): worker.responses_statuses = self._make_responses_statuses( [0, 4, 1, 10]) - with self.assertRaises(_MutateRowsRetryableError) as cm: + with self.assertRaises(ServiceUnavailable): worker._do_mutate_retryable_rows() - err = cm.exception - self.assertEqual(len(err.retryable_responses), 1) - self.assertEqual(err.retryable_responses[0].index, 3) - self.assertEqual(err.retryable_responses[0].status.code, 4) - statuses = worker.responses_statuses result = [status.code for status in statuses] expected_result = [0, 0, 1, 4] From f04213f775aea90680c65468c392db4d27524081 Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Mon, 16 Oct 2017 20:38:28 -0700 Subject: [PATCH 15/17] Add sanity check for rows requested to mutate and number of responses coming back. --- bigtable/google/cloud/bigtable/table.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/bigtable/google/cloud/bigtable/table.py b/bigtable/google/cloud/bigtable/table.py index 339a11f8e3d9..224cd0e1e10b 100644 --- a/bigtable/google/cloud/bigtable/table.py +++ b/bigtable/google/cloud/bigtable/table.py @@ -450,9 +450,11 @@ def _do_mutate_retryable_rows(self): responses = self.client._data_stub.MutateRows( mutate_rows_request) + num_responses = 0 num_retryable_responses = 0 for response in responses: for entry in response.entries: + num_responses += 1 index = index_into_all_rows[entry.index] self.responses_statuses[index] = entry.status if self._is_retryable(entry.status): @@ -460,6 +462,8 @@ def _do_mutate_retryable_rows(self): if entry.status.code == 0: self.rows[index].clear() + assert len(retryable_rows) == num_responses + if num_retryable_responses: raise from_grpc_status(StatusCode.UNAVAILABLE, 'MutateRows retryable error.') From 0d5c751230800132ac943725ee8a7cb0d284ba43 Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Tue, 17 Oct 2017 10:04:44 -0700 Subject: [PATCH 16/17] Add test for assertion check to ensure num of responses matches num of rows requested. --- bigtable/tests/unit/test_table.py | 33 +++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/bigtable/tests/unit/test_table.py b/bigtable/tests/unit/test_table.py index 6c476c2c31e6..a3ec0dd6d07e 100644 --- a/bigtable/tests/unit/test_table.py +++ b/bigtable/tests/unit/test_table.py @@ -994,6 +994,39 @@ def test_do_mutate_retryable_rows_second_try_no_retryable(self): self.assertEqual(result, expected_result) + def test_do_mutate_retryable_rows_mismatch_num_responses(self): + from google.cloud.bigtable._generated.bigtable_pb2 import ( + MutateRowsResponse) + from google.cloud.bigtable.row import DirectRow + from google.rpc.status_pb2 import Status + from tests.unit._testing import _FakeStub + + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + table = self._make_table(self.TABLE_ID, instance) + + row_1 = DirectRow(row_key=b'row_key', table=table) + row_1.set_cell('cf', b'col', b'value1') + row_2 = DirectRow(row_key=b'row_key_2', table=table) + row_2.set_cell('cf', b'col', b'value2') + + response = MutateRowsResponse( + entries=[ + MutateRowsResponse.Entry( + index=0, + status=Status(code=0), + ), + ], + ) + + # Patch the stub used by the API method. + client._data_stub = _FakeStub([response]) + + worker = self._make_worker(table._instance._client, + table.name, [row_1, row_2]) + with self.assertRaises(AssertionError): + statuses = worker._do_mutate_retryable_rows() + class Test__create_row_request(unittest.TestCase): From d7984d0bdb74b727dba19c3cb2ed09b98fff86b1 Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Wed, 25 Oct 2017 14:31:18 -0700 Subject: [PATCH 17/17] Refactor google.api.core to google.api_core. --- bigtable/google/cloud/bigtable/table.py | 18 +++++++++--------- bigtable/tests/unit/test_table.py | 10 +++++----- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/bigtable/google/cloud/bigtable/table.py b/bigtable/google/cloud/bigtable/table.py index 224cd0e1e10b..92f7b7d090ba 100644 --- a/bigtable/google/cloud/bigtable/table.py +++ b/bigtable/google/cloud/bigtable/table.py @@ -17,13 +17,13 @@ import six -from google.api.core.exceptions import RetryError -from google.api.core.exceptions import Aborted -from google.api.core.exceptions import DeadlineExceeded -from google.api.core.exceptions import ServiceUnavailable -from google.api.core.exceptions import from_grpc_status -from google.api.core.retry import Retry -from google.api.core.retry import if_exception_type +from google.api_core.exceptions import RetryError +from google.api_core.exceptions import Aborted +from google.api_core.exceptions import DeadlineExceeded +from google.api_core.exceptions import ServiceUnavailable +from google.api_core.exceptions import from_grpc_status +from google.api_core.retry import Retry +from google.api_core.retry import if_exception_type from google.cloud._helpers import _to_bytes from google.cloud.bigtable._generated import ( bigtable_pb2 as data_messages_v2_pb2) @@ -326,7 +326,7 @@ def mutate_rows(self, rows, retry=DEFAULT_RETRY): :type rows: list :param rows: List or other iterable of :class:`.DirectRow` instances. - :type retry: :class:`~google.api.core.retry.Retry` + :type retry: :class:`~google.api_core.retry.Retry` :param retry: (Optional) Retry delay and deadline arguments. Can be specified using ``DEFAULT_RETRY.with_delay`` and/or ``DEFAULT_RETRY.with_deadline``. @@ -430,7 +430,7 @@ def _do_mutate_retryable_rows(self): :rtype: list :return: ``responses_statuses`` (`google.rpc.status_pb2.Status`) - :raises: :exc:`~google.api.core.exceptions.ServiceUnavailable` if any + :raises: :exc:`~google.api_core.exceptions.ServiceUnavailable` if any row returned a transient error. An artificial exception to work with ``DEFAULT_RETRY``. """ diff --git a/bigtable/tests/unit/test_table.py b/bigtable/tests/unit/test_table.py index a3ec0dd6d07e..0567c8d28710 100644 --- a/bigtable/tests/unit/test_table.py +++ b/bigtable/tests/unit/test_table.py @@ -591,7 +591,7 @@ def test_callable_empty_rows(self): self.assertEqual(len(statuses), 0) def test_callable_retry(self): - from google.api.core.retry import Retry + from google.api_core.retry import Retry from google.cloud.bigtable._generated.bigtable_pb2 import ( MutateRowsResponse) from google.cloud.bigtable.row import DirectRow @@ -662,7 +662,7 @@ def test_callable_retry(self): self.assertEqual(result, expected_result) def test_callable_retry_timeout(self): - from google.api.core.retry import Retry + from google.api_core.retry import Retry from google.cloud.bigtable._generated.bigtable_pb2 import ( MutateRowsResponse) from google.cloud.bigtable.row import DirectRow @@ -675,7 +675,7 @@ def test_callable_retry_timeout(self): # - Initial attempt will mutate all 2 rows. # Expectation: # - Both rows always return retryable errors. - # - google.api.core.Retry should keep retrying. + # - google.api_core.Retry should keep retrying. # - Check MutateRows is called multiple times. # - By the time deadline is reached, statuses should be # [retryable, retryable] @@ -776,7 +776,7 @@ def test_do_mutate_retryable_rows(self): self.assertEqual(result, expected_result) def test_do_mutate_retryable_rows_retry(self): - from google.api.core.exceptions import ServiceUnavailable + from google.api_core.exceptions import ServiceUnavailable from google.cloud.bigtable._generated.bigtable_pb2 import ( MutateRowsResponse) from google.cloud.bigtable.row import DirectRow @@ -836,7 +836,7 @@ def test_do_mutate_retryable_rows_retry(self): self.assertEqual(result, expected_result) def test_do_mutate_retryable_rows_second_retry(self): - from google.api.core.exceptions import ServiceUnavailable + from google.api_core.exceptions import ServiceUnavailable from google.cloud.bigtable._generated.bigtable_pb2 import ( MutateRowsResponse) from google.cloud.bigtable.row import DirectRow