diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index ce318ed91dc9..271de380096c 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -1108,10 +1108,10 @@ def query_rows(self, query, job_config=None, job_id=None, timeout=None, :type job_id: str :param job_id: (Optional) ID to use for the query job. - :type timeout: int + :type timeout: float :param timeout: - (Optional) How long to wait for job to complete before raising a - :class:`TimeoutError`. + (Optional) How long (in seconds) to wait for job to complete + before raising a :class:`TimeoutError`. :rtype: :class:`~google.api.core.page_iterator.Iterator` :returns: diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 350ad7ce579b..a11bdd42897a 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -44,6 +44,7 @@ _DONE_STATE = 'DONE' _STOPPED_REASON = 'stopped' +_TIMEOUT_BUFFER_SECS = 0.1 _ERROR_REASON_TO_EXCEPTION = { 'accessDenied': http_client.FORBIDDEN, @@ -1522,6 +1523,7 @@ def __init__(self, job_id, query, client, job_config=None): self.query = query self._configuration = job_config self._query_results = None + self._done_timeout = None @property def allow_large_results(self): @@ -1871,7 +1873,7 @@ def query_results(self, retry=DEFAULT_RETRY): """ if not self._query_results: self._query_results = self._client._get_query_results( - self.job_id, retry) + self.job_id, retry, project=self.project) return self._query_results def done(self, retry=DEFAULT_RETRY): @@ -1880,11 +1882,25 @@ def done(self, retry=DEFAULT_RETRY): :rtype: bool :returns: True if the job is complete, False otherwise. """ + # Since the API to getQueryResults can hang up to the timeout value + # (default of 10 seconds), set the timeout parameter to ensure that + # the timeout from the futures API is respected. See: + # https://github.com/GoogleCloudPlatform/google-cloud-python/issues/4135 + timeout_ms = None + if self._done_timeout is not None: + # Subtract a buffer for context switching, network latency, etc. + timeout = self._done_timeout - _TIMEOUT_BUFFER_SECS + timeout = max(min(timeout, 10), 0) + self._done_timeout -= timeout + self._done_timeout = max(0, self._done_timeout) + timeout_ms = int(timeout * 1000) + # Do not refresh is the state is already done, as the job will not # change once complete. if self.state != _DONE_STATE: self._query_results = self._client._get_query_results( - self.job_id, retry) + self.job_id, retry, + project=self.project, timeout_ms=timeout_ms) # Only reload the job once we know the query is complete. # This will ensure that fields such as the destination table are @@ -1894,10 +1910,14 @@ def done(self, retry=DEFAULT_RETRY): return self.state == _DONE_STATE + def _blocking_poll(self, timeout=None): + self._done_timeout = timeout + super(QueryJob, self)._blocking_poll(timeout=timeout) + def result(self, timeout=None, retry=DEFAULT_RETRY): """Start the job and wait for it to complete and get the result. - :type timeout: int + :type timeout: float :param timeout: How long to wait for job to complete before raising a :class:`TimeoutError`. diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index 0e0b667e704d..9e827537abe4 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -2197,6 +2197,34 @@ def test_result_invokes_begins(self): self.assertEqual(query_request['method'], 'GET') self.assertEqual(reload_request['method'], 'GET') + def test_result_w_timeout(self): + begun_resource = self._makeResource() + query_resource = { + 'jobComplete': True, + 'jobReference': { + 'projectId': self.PROJECT, + 'jobId': self.JOB_ID, + }, + } + done_resource = copy.deepcopy(begun_resource) + done_resource['status'] = {'state': 'DONE'} + connection = _Connection( + begun_resource, query_resource, done_resource) + client = _make_client(project=self.PROJECT, connection=connection) + job = self._make_one(self.JOB_ID, self.QUERY, client) + + job.result(timeout=1.0) + + self.assertEqual(len(connection._requested), 3) + begin_request, query_request, reload_request = connection._requested + self.assertEqual(begin_request['method'], 'POST') + self.assertEqual(query_request['method'], 'GET') + self.assertEqual( + query_request['path'], + '/projects/{}/queries/{}'.format(self.PROJECT, self.JOB_ID)) + self.assertEqual(query_request['query_params']['timeoutMs'], 900) + self.assertEqual(reload_request['method'], 'GET') + def test_result_error(self): from google.cloud import exceptions