diff --git a/bigtable/google/cloud/bigtable/table.py b/bigtable/google/cloud/bigtable/table.py index d1711f5be704..92f7b7d090ba 100644 --- a/bigtable/google/cloud/bigtable/table.py +++ b/bigtable/google/cloud/bigtable/table.py @@ -17,6 +17,13 @@ import six +from google.api_core.exceptions import RetryError +from google.api_core.exceptions import Aborted +from google.api_core.exceptions import DeadlineExceeded +from google.api_core.exceptions import ServiceUnavailable +from google.api_core.exceptions import from_grpc_status +from google.api_core.retry import Retry +from google.api_core.retry import if_exception_type from google.cloud._helpers import _to_bytes from google.cloud.bigtable._generated import ( bigtable_pb2 as data_messages_v2_pb2) @@ -30,12 +37,22 @@ from google.cloud.bigtable.row import ConditionalRow from google.cloud.bigtable.row import DirectRow from google.cloud.bigtable.row_data import PartialRowsData +from grpc import StatusCode # Maximum number of mutations in bulk (MutateRowsRequest message): # https://cloud.google.com/bigtable/docs/reference/data/rpc/google.bigtable.v2#google.bigtable.v2.MutateRowRequest _MAX_BULK_MUTATIONS = 100000 +DEFAULT_RETRY = Retry( + predicate=if_exception_type((Aborted, + DeadlineExceeded, + ServiceUnavailable)), + initial=1.0, + maximum=15.0, + multiplier=2.0, + deadline=60.0 * 2.0) + class TableMismatchError(ValueError): """Row from another table.""" @@ -296,34 +313,32 @@ def read_rows(self, start_key=None, end_key=None, limit=None, # We expect an iterator of `data_messages_v2_pb2.ReadRowsResponse` return PartialRowsData(response_iterator) - def mutate_rows(self, rows): + def mutate_rows(self, rows, retry=DEFAULT_RETRY): """Mutates multiple rows in bulk. The method tries to update all specified rows. If some of the rows weren't updated, it would not remove mutations. They can be applied to the row separately. If row mutations finished successfully, they would be cleaned up. + Optionally specify a `retry` to re-attempt rows that return transient + errors, until all rows succeed or the deadline is reached. :type rows: list :param rows: List or other iterable of :class:`.DirectRow` instances. + :type retry: :class:`~google.api_core.retry.Retry` + :param retry: (Optional) Retry delay and deadline arguments. Can be + specified using ``DEFAULT_RETRY.with_delay`` and/or + ``DEFAULT_RETRY.with_deadline``. + :rtype: list :returns: A list of response statuses (`google.rpc.status_pb2.Status`) corresponding to success or failure of each row mutation sent. These will be in the same order as the `rows`. """ - mutate_rows_request = _mutate_rows_request(self.name, rows) - client = self._instance._client - responses = client._data_stub.MutateRows(mutate_rows_request) - - responses_statuses = [ - None for _ in six.moves.xrange(len(mutate_rows_request.entries))] - for response in responses: - for entry in response.entries: - responses_statuses[entry.index] = entry.status - if entry.status.code == 0: - rows[entry.index].clear() - return responses_statuses + retryable_mutate_rows = _RetryableMutateRowsWorker( + self._instance._client, self.name, rows) + return retryable_mutate_rows(retry=retry) def sample_row_keys(self): """Read a sample of row keys in the table. @@ -363,6 +378,98 @@ def sample_row_keys(self): return response_iterator +class _RetryableMutateRowsWorker(object): + """A callable worker that can retry to mutate rows with transient errors. + + This class is a callable that can retry mutating rows that result in + transient errors. After all rows are successful or none of the rows + are retryable, any subsequent call on this callable will be a no-op. + """ + + # pylint: disable=unsubscriptable-object + RETRY_CODES = ( + StatusCode.DEADLINE_EXCEEDED.value[0], + StatusCode.ABORTED.value[0], + StatusCode.UNAVAILABLE.value[0], + ) + + def __init__(self, client, table_name, rows): + self.client = client + self.table_name = table_name + self.rows = rows + self.responses_statuses = [ + None for _ in six.moves.xrange(len(self.rows))] + + def __call__(self, retry=DEFAULT_RETRY): + """Attempt to mutate all rows and retry rows with transient errors. + + Will retry the rows with transient errors until all rows succeed or + ``deadline`` specified in the `retry` is reached. + + :rtype: list + :returns: A list of response statuses (`google.rpc.status_pb2.Status`) + corresponding to success or failure of each row mutation + sent. These will be in the same order as the ``rows``. + """ + try: + retry(self.__class__._do_mutate_retryable_rows)(self) + except (RetryError, ValueError) as err: + # Upon timeout or sleep generator error, return responses_statuses + pass + return self.responses_statuses + + def _is_retryable(self, status): # pylint: disable=no-self-use + return (status is None or + status.code in _RetryableMutateRowsWorker.RETRY_CODES) + + def _do_mutate_retryable_rows(self): + """Mutate all the rows that are eligible for retry. + + A row is eligible for retry if it has not been tried or if it resulted + in a transient error in a previous call. + + :rtype: list + :return: ``responses_statuses`` (`google.rpc.status_pb2.Status`) + :raises: :exc:`~google.api_core.exceptions.ServiceUnavailable` if any + row returned a transient error. An artificial exception + to work with ``DEFAULT_RETRY``. + """ + retryable_rows = [] + index_into_all_rows = [] + for i, status in enumerate(self.responses_statuses): + if self._is_retryable(status): + retryable_rows.append(self.rows[i]) + index_into_all_rows.append(i) + + if not retryable_rows: + # All mutations are either successful or non-retryable now. + return self.responses_statuses + + mutate_rows_request = _mutate_rows_request( + self.table_name, retryable_rows) + responses = self.client._data_stub.MutateRows( + mutate_rows_request) + + num_responses = 0 + num_retryable_responses = 0 + for response in responses: + for entry in response.entries: + num_responses += 1 + index = index_into_all_rows[entry.index] + self.responses_statuses[index] = entry.status + if self._is_retryable(entry.status): + num_retryable_responses += 1 + if entry.status.code == 0: + self.rows[index].clear() + + assert len(retryable_rows) == num_responses + + if num_retryable_responses: + raise from_grpc_status(StatusCode.UNAVAILABLE, + 'MutateRows retryable error.') + return self.responses_statuses + + def _create_row_request(table_name, row_key=None, start_key=None, end_key=None, filter_=None, limit=None, end_inclusive=False): """Creates a request to read rows in a table. diff --git a/bigtable/tests/unit/test_table.py b/bigtable/tests/unit/test_table.py index 3890d097f572..0567c8d28710 100644 --- a/bigtable/tests/unit/test_table.py +++ b/bigtable/tests/unit/test_table.py @@ -452,43 +452,23 @@ def test_read_row_still_partial(self): self._read_row_helper(chunks, None) def test_mutate_rows(self): - from google.cloud.bigtable._generated.bigtable_pb2 import ( - MutateRowsResponse) - from google.cloud.bigtable.row import DirectRow from google.rpc.status_pb2 import Status - from tests.unit._testing import _FakeStub - client = _Client() - instance = _Instance(self.INSTANCE_NAME, client=client) + instance = mock.MagicMock() table = self._make_one(self.TABLE_ID, instance) - row_1 = DirectRow(row_key=b'row_key', table=table) - row_1.set_cell('cf', b'col', b'value1') - row_2 = DirectRow(row_key=b'row_key_2', table=table) - row_2.set_cell('cf', b'col', b'value2') + response = [Status(code=0), Status(code=1)] - response = MutateRowsResponse( - entries=[ - MutateRowsResponse.Entry( - index=0, - status=Status(code=0), - ), - MutateRowsResponse.Entry( - index=1, - status=Status(code=1), - ), - ], - ) - - # Patch the stub used by the API method. - client._data_stub = _FakeStub([response]) - statuses = table.mutate_rows([row_1, row_2]) + mock_worker = mock.Mock(return_value=response) + with mock.patch( + 'google.cloud.bigtable.table._RetryableMutateRowsWorker', + new=mock.MagicMock(return_value=mock_worker)): + statuses = table.mutate_rows([mock.MagicMock(), mock.MagicMock()]) result = [status.code for status in statuses] expected_result = [0, 1] self.assertEqual(result, expected_result) - def test_read_rows(self): from google.cloud._testing import _Monkey from tests.unit._testing import _FakeStub @@ -570,6 +550,484 @@ def test_sample_row_keys(self): )]) +class Test__RetryableMutateRowsWorker(unittest.TestCase): + PROJECT_ID = 'project-id' + INSTANCE_ID = 'instance-id' + INSTANCE_NAME = ('projects/' + PROJECT_ID + '/instances/' + INSTANCE_ID) + TABLE_ID = 'table-id' + + @staticmethod + def _get_target_class_for_worker(): + from google.cloud.bigtable.table import _RetryableMutateRowsWorker + + return _RetryableMutateRowsWorker + + def _make_worker(self, *args, **kwargs): + return self._get_target_class_for_worker()(*args, **kwargs) + + @staticmethod + def _get_target_class_for_table(): + from google.cloud.bigtable.table import Table + + return Table + + def _make_table(self, *args, **kwargs): + return self._get_target_class_for_table()(*args, **kwargs) + + def _make_responses_statuses(self, codes): + from google.rpc.status_pb2 import Status + + response = [Status(code=code) for code in codes] + return response + + def test_callable_empty_rows(self): + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + table = self._make_table(self.TABLE_ID, instance) + + worker = self._make_worker(table._instance._client, table.name, []) + statuses = worker() + + self.assertEqual(len(statuses), 0) + + def test_callable_retry(self): + from google.api_core.retry import Retry + from google.cloud.bigtable._generated.bigtable_pb2 import ( + MutateRowsResponse) + from google.cloud.bigtable.row import DirectRow + from google.cloud.bigtable.table import DEFAULT_RETRY + from google.rpc.status_pb2 import Status + + # Setup: + # - Mutate 3 rows. + # Action: + # - Initial attempt will mutate all 3 rows. + # Expectation: + # - First attempt will result in one retryable error. + # - Second attempt will result in success for the retry-ed row. + # - Check MutateRows is called twice. + # - State of responses_statuses should be + # [success, success, non-retryable] + + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + table = self._make_table(self.TABLE_ID, instance) + + row_1 = DirectRow(row_key=b'row_key', table=table) + row_1.set_cell('cf', b'col', b'value1') + row_2 = DirectRow(row_key=b'row_key_2', table=table) + row_2.set_cell('cf', b'col', b'value2') + row_3 = DirectRow(row_key=b'row_key_3', table=table) + row_3.set_cell('cf', b'col', b'value3') + + response_1 = MutateRowsResponse( + entries=[ + MutateRowsResponse.Entry( + index=0, + status=Status(code=0), + ), + MutateRowsResponse.Entry( + index=1, + status=Status(code=4), + ), + MutateRowsResponse.Entry( + index=2, + status=Status(code=1), + ), + ], + ) + + response_2 = MutateRowsResponse( + entries=[ + MutateRowsResponse.Entry( + index=0, + status=Status(code=0), + ), + ], + ) + + # Patch the stub used by the API method. + client._data_stub = mock.MagicMock() + client._data_stub.MutateRows.side_effect = [[response_1], [response_2]] + + retry = DEFAULT_RETRY.with_delay(initial=0.1) + worker = self._make_worker(client, table.name, [row_1, row_2, row_3]) + statuses = worker(retry=retry) + + result = [status.code for status in statuses] + expected_result = [0, 0, 1] + + client._data_stub.MutateRows.assert_has_calls([mock.call(mock.ANY), mock.call(mock.ANY)]) + self.assertEqual(client._data_stub.MutateRows.call_count, 2) + self.assertEqual(result, expected_result) + + def test_callable_retry_timeout(self): + from google.api_core.retry import Retry + from google.cloud.bigtable._generated.bigtable_pb2 import ( + MutateRowsResponse) + from google.cloud.bigtable.row import DirectRow + from google.cloud.bigtable.table import DEFAULT_RETRY + from google.rpc.status_pb2 import Status + + # Setup: + # - Mutate 2 rows. + # Action: + # - Initial attempt will mutate all 2 rows. + # Expectation: + # - Both rows always return retryable errors. + # - google.api_core.Retry should keep retrying. + # - Check MutateRows is called multiple times. + # - By the time deadline is reached, statuses should be + # [retryable, retryable] + + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + table = self._make_table(self.TABLE_ID, instance) + + row_1 = DirectRow(row_key=b'row_key', table=table) + row_1.set_cell('cf', b'col', b'value1') + row_2 = DirectRow(row_key=b'row_key_2', table=table) + row_2.set_cell('cf', b'col', b'value2') + + response = MutateRowsResponse( + entries=[ + MutateRowsResponse.Entry( + index=0, + status=Status(code=4), + ), + MutateRowsResponse.Entry( + index=1, + status=Status(code=4), + ), + ], + ) + + # Patch the stub used by the API method. + client._data_stub = mock.MagicMock() + client._data_stub.MutateRows.return_value = [response] + + retry = DEFAULT_RETRY.with_delay( + initial=0.1, maximum=0.2, multiplier=2.0).with_deadline(0.5) + worker = self._make_worker(client, table.name, [row_1, row_2]) + statuses = worker(retry=retry) + + result = [status.code for status in statuses] + expected_result = [4, 4] + + self.assertTrue(client._data_stub.MutateRows.call_count > 1) + self.assertEqual(result, expected_result) + + def test_do_mutate_retryable_rows_empty_rows(self): + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + table = self._make_table(self.TABLE_ID, instance) + + worker = self._make_worker(table._instance._client, table.name, []) + statuses = worker._do_mutate_retryable_rows() + + self.assertEqual(len(statuses), 0) + + def test_do_mutate_retryable_rows(self): + from google.cloud.bigtable._generated.bigtable_pb2 import ( + MutateRowsResponse) + from google.cloud.bigtable.row import DirectRow + from google.rpc.status_pb2 import Status + from tests.unit._testing import _FakeStub + + # Setup: + # - Mutate 2 rows. + # Action: + # - Initial attempt will mutate all 2 rows. + # Expectation: + # - Expect [success, non-retryable] + + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + table = self._make_table(self.TABLE_ID, instance) + + row_1 = DirectRow(row_key=b'row_key', table=table) + row_1.set_cell('cf', b'col', b'value1') + row_2 = DirectRow(row_key=b'row_key_2', table=table) + row_2.set_cell('cf', b'col', b'value2') + + response = MutateRowsResponse( + entries=[ + MutateRowsResponse.Entry( + index=0, + status=Status(code=0), + ), + MutateRowsResponse.Entry( + index=1, + status=Status(code=1), + ), + ], + ) + + # Patch the stub used by the API method. + client._data_stub = _FakeStub([response]) + + worker = self._make_worker(table._instance._client, + table.name, [row_1, row_2]) + statuses = worker._do_mutate_retryable_rows() + + result = [status.code for status in statuses] + expected_result = [0, 1] + + self.assertEqual(result, expected_result) + + def test_do_mutate_retryable_rows_retry(self): + from google.api_core.exceptions import ServiceUnavailable + from google.cloud.bigtable._generated.bigtable_pb2 import ( + MutateRowsResponse) + from google.cloud.bigtable.row import DirectRow + from google.rpc.status_pb2 import Status + from tests.unit._testing import _FakeStub + + # Setup: + # - Mutate 3 rows. + # Action: + # - Initial attempt will mutate all 3 rows. + # Expectation: + # - Second row returns retryable error code, so expect a raise. + # - State of responses_statuses should be + # [success, retryable, non-retryable] + + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + table = self._make_table(self.TABLE_ID, instance) + + row_1 = DirectRow(row_key=b'row_key', table=table) + row_1.set_cell('cf', b'col', b'value1') + row_2 = DirectRow(row_key=b'row_key_2', table=table) + row_2.set_cell('cf', b'col', b'value2') + row_3 = DirectRow(row_key=b'row_key_3', table=table) + row_3.set_cell('cf', b'col', b'value3') + + response = MutateRowsResponse( + entries=[ + MutateRowsResponse.Entry( + index=0, + status=Status(code=0), + ), + MutateRowsResponse.Entry( + index=1, + status=Status(code=4), + ), + MutateRowsResponse.Entry( + index=2, + status=Status(code=1), + ), + ], + ) + + # Patch the stub used by the API method. + client._data_stub = _FakeStub([response]) + + worker = self._make_worker(table._instance._client, + table.name, [row_1, row_2, row_3]) + + with self.assertRaises(ServiceUnavailable): + worker._do_mutate_retryable_rows() + + statuses = worker.responses_statuses + result = [status.code for status in statuses] + expected_result = [0, 4, 1] + + self.assertEqual(result, expected_result) + + def test_do_mutate_retryable_rows_second_retry(self): + from google.api_core.exceptions import ServiceUnavailable + from google.cloud.bigtable._generated.bigtable_pb2 import ( + MutateRowsResponse) + from google.cloud.bigtable.row import DirectRow + from google.rpc.status_pb2 import Status + from tests.unit._testing import _FakeStub + + # Setup: + # - Mutate 4 rows. + # - First try results: + # [success, retryable, non-retryable, retryable] + # Action: + # - Second try should re-attempt the 'retryable' rows. + # Expectation: + # - After second try: + # [success, success, non-retryable, retryable] + # - One of the rows tried second time returns retryable error code, + # so expect a raise. + # - Exception contains response whose index should be '3' even though + # only two rows were retried. + + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + table = self._make_table(self.TABLE_ID, instance) + + row_1 = DirectRow(row_key=b'row_key', table=table) + row_1.set_cell('cf', b'col', b'value1') + row_2 = DirectRow(row_key=b'row_key_2', table=table) + row_2.set_cell('cf', b'col', b'value2') + row_3 = DirectRow(row_key=b'row_key_3', table=table) + row_3.set_cell('cf', b'col', b'value3') + row_4 = DirectRow(row_key=b'row_key_4', table=table) + row_4.set_cell('cf', b'col', b'value4') + + response = MutateRowsResponse( + entries=[ + MutateRowsResponse.Entry( + index=0, + status=Status(code=0), + ), + MutateRowsResponse.Entry( + index=1, + status=Status(code=4), + ), + ], + ) + + # Patch the stub used by the API method. + client._data_stub = _FakeStub([response]) + + worker = self._make_worker(table._instance._client, + table.name, [row_1, row_2, row_3, row_4]) + worker.responses_statuses = self._make_responses_statuses( + [0, 4, 1, 10]) + + with self.assertRaises(ServiceUnavailable): + worker._do_mutate_retryable_rows() + + statuses = worker.responses_statuses + result = [status.code for status in statuses] + expected_result = [0, 0, 1, 4] + + self.assertEqual(result, expected_result) + + def test_do_mutate_retryable_rows_second_try(self): + from google.cloud.bigtable._generated.bigtable_pb2 import ( + MutateRowsResponse) + from google.cloud.bigtable.row import DirectRow + from google.rpc.status_pb2 import Status + from tests.unit._testing import _FakeStub + + # Setup: + # - Mutate 4 rows. + # - First try results: + # [success, retryable, non-retryable, retryable] + # Action: + # - Second try should re-attempt the 'retryable' rows. + # Expectation: + # - After second try: + # [success, non-retryable, non-retryable, success] + + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + table = self._make_table(self.TABLE_ID, instance) + + row_1 = DirectRow(row_key=b'row_key', table=table) + row_1.set_cell('cf', b'col', b'value1') + row_2 = DirectRow(row_key=b'row_key_2', table=table) + row_2.set_cell('cf', b'col', b'value2') + row_3 = DirectRow(row_key=b'row_key_3', table=table) + row_3.set_cell('cf', b'col', b'value3') + row_4 = DirectRow(row_key=b'row_key_4', table=table) + row_4.set_cell('cf', b'col', b'value4') + + response = MutateRowsResponse( + entries=[ + MutateRowsResponse.Entry( + index=0, + status=Status(code=1), + ), + MutateRowsResponse.Entry( + index=1, + status=Status(code=0), + ), + ], + ) + + # Patch the stub used by the API method. + client._data_stub = _FakeStub([response]) + + worker = self._make_worker(table._instance._client, + table.name, [row_1, row_2, row_3, row_4]) + worker.responses_statuses = self._make_responses_statuses( + [0, 4, 1, 10]) + + statuses = worker._do_mutate_retryable_rows() + + result = [status.code for status in statuses] + expected_result = [0, 1, 1, 0] + + self.assertEqual(result, expected_result) + + def test_do_mutate_retryable_rows_second_try_no_retryable(self): + from google.cloud.bigtable._generated.bigtable_pb2 import ( + MutateRowsResponse) + from google.cloud.bigtable.row import DirectRow + from google.rpc.status_pb2 import Status + from tests.unit._testing import _FakeStub + + # Setup: + # - Mutate 2 rows. + # - First try results: [success, non-retryable] + # Action: + # - Second try has no row to retry. + # Expectation: + # - After second try: [success, non-retryable] + + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + table = self._make_table(self.TABLE_ID, instance) + + row_1 = DirectRow(row_key=b'row_key', table=table) + row_1.set_cell('cf', b'col', b'value1') + row_2 = DirectRow(row_key=b'row_key_2', table=table) + row_2.set_cell('cf', b'col', b'value2') + + worker = self._make_worker(table._instance._client, + table.name, [row_1, row_2]) + worker.responses_statuses = self._make_responses_statuses( + [0, 1]) + + statuses = worker._do_mutate_retryable_rows() + + result = [status.code for status in statuses] + expected_result = [0, 1] + + self.assertEqual(result, expected_result) + + def test_do_mutate_retryable_rows_mismatch_num_responses(self): + from google.cloud.bigtable._generated.bigtable_pb2 import ( + MutateRowsResponse) + from google.cloud.bigtable.row import DirectRow + from google.rpc.status_pb2 import Status + from tests.unit._testing import _FakeStub + + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + table = self._make_table(self.TABLE_ID, instance) + + row_1 = DirectRow(row_key=b'row_key', table=table) + row_1.set_cell('cf', b'col', b'value1') + row_2 = DirectRow(row_key=b'row_key_2', table=table) + row_2.set_cell('cf', b'col', b'value2') + + response = MutateRowsResponse( + entries=[ + MutateRowsResponse.Entry( + index=0, + status=Status(code=0), + ), + ], + ) + + # Patch the stub used by the API method. + client._data_stub = _FakeStub([response]) + + worker = self._make_worker(table._instance._client, + table.name, [row_1, row_2]) + with self.assertRaises(AssertionError): + statuses = worker._do_mutate_retryable_rows() + + class Test__create_row_request(unittest.TestCase): def _call_fut(self, table_name, row_key=None, start_key=None, end_key=None,