diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java index b1d589c712c9..e7cc2712ea8a 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java @@ -25,9 +25,13 @@ import com.google.cloud.RetryOption; import com.google.cloud.bigquery.BigQuery.JobOption; import com.google.cloud.bigquery.BigQuery.QueryResultsOption; +import com.google.cloud.bigquery.BigQuery.TableDataListOption; import com.google.cloud.bigquery.JobConfiguration.Type; import java.io.IOException; import java.io.ObjectInputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -62,11 +66,9 @@ public class Job extends JobInfo { .setMaxRetryDelay(Duration.ofSeconds(3L)) .build(); - static final QueryResultsOption[] DEFAULT_QUERY_WAIT_OPTIONS = - new QueryResultsOption[] { - QueryResultsOption.pageSize(0L), - QueryResultsOption.maxWaitTime(Duration.ofMinutes(1).toMillis()) - }; + static final QueryResultsOption[] DEFAULT_QUERY_WAIT_OPTIONS = { + QueryResultsOption.pageSize(0L), + }; private final BigQueryOptions options; private transient BigQuery bigquery; @@ -272,13 +274,35 @@ public TableResult getQueryResults(QueryResultsOption... options) } TableId table = ((QueryJobConfiguration) getConfiguration()).getDestinationTable(); - // TODO(pongad): merge options? + + List waitOptions = + new ArrayList<>(Arrays.asList(DEFAULT_QUERY_WAIT_OPTIONS)); + List listOptions = new ArrayList<>(); + for (QueryResultsOption option : options) { + switch (option.getRpcOption()) { + case MAX_RESULTS: + listOptions.add(TableDataListOption.pageSize((Long) option.getValue())); + break; + case PAGE_TOKEN: + listOptions.add(TableDataListOption.pageToken((String) option.getValue())); + break; + case START_INDEX: + listOptions.add(TableDataListOption.startIndex((Long) option.getValue())); + break; + case TIMEOUT: + waitOptions.add(QueryResultsOption.maxWaitTime((Long) option.getValue())); + break; + } + } + QueryResponse response = - waitForQueryResults(DEFAULT_JOB_WAIT_SETTINGS, DEFAULT_QUERY_WAIT_OPTIONS); + waitForQueryResults( + DEFAULT_JOB_WAIT_SETTINGS, waitOptions.toArray(new QueryResultsOption[0])); if (response.getSchema() == null) { throw new JobException(getJobId(), response.getErrors()); } - return bigquery.listTableData(table, response.getSchema()); + return bigquery.listTableData( + table, response.getSchema(), listOptions.toArray(new TableDataListOption[0])); } private QueryResponse waitForQueryResults( diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java index bf5ab801aefc..b18ffcccb71a 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java @@ -1127,6 +1127,55 @@ public void testQueryRequestCompleted() throws InterruptedException { .setTotalRows(BigInteger.valueOf(1L)) .setSchema(TABLE_SCHEMA.toPb()); + EasyMock.expect( + bigqueryRpcMock.create( + JOB_INFO.toPb(), Collections.emptyMap())) + .andReturn(jobResponsePb); + EasyMock.expect( + bigqueryRpcMock.getQueryResults( + PROJECT, JOB, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS))) + .andReturn(responsePb); + EasyMock.expect( + bigqueryRpcMock.listTableData( + PROJECT, DATASET, TABLE, Collections.emptyMap())) + .andReturn( + new TableDataList() + .setPageToken("") + .setRows(ImmutableList.of(TABLE_ROW)) + .setTotalRows(1L)); + + EasyMock.replay(bigqueryRpcMock); + bigquery = options.getService(); + TableResult result = bigquery.query(QUERY_JOB_CONFIGURATION_FOR_QUERY, queryJob); + assertThat(result.getSchema()).isEqualTo(TABLE_SCHEMA); + assertThat(result.getTotalRows()).isEqualTo(1); + for (FieldValueList row : result.getValues()) { + assertThat(row.get(0).getBooleanValue()).isFalse(); + assertThat(row.get(1).getLongValue()).isEqualTo(1); + } + } + + @Test + public void testQueryRequestCompletedOptions() throws InterruptedException { + JobId queryJob = JobId.of(PROJECT, JOB); + com.google.api.services.bigquery.model.Job jobResponsePb = + new com.google.api.services.bigquery.model.Job() + .setConfiguration(QUERY_JOB_CONFIGURATION_FOR_QUERY.toPb()) + .setJobReference(queryJob.toPb()) + .setId(JOB) + .setStatus(new com.google.api.services.bigquery.model.JobStatus().setState("DONE")); + jobResponsePb.getConfiguration().getQuery().setDestinationTable(TABLE_ID.toPb()); + GetQueryResultsResponse responsePb = + new GetQueryResultsResponse() + .setJobReference(queryJob.toPb()) + .setRows(ImmutableList.of(TABLE_ROW)) + .setJobComplete(true) + .setCacheHit(false) + .setPageToken(CURSOR) + .setTotalBytesProcessed(42L) + .setTotalRows(BigInteger.valueOf(1L)) + .setSchema(TABLE_SCHEMA.toPb()); + EasyMock.expect( bigqueryRpcMock.create(JOB_INFO.toPb(), Collections.emptyMap())) .andReturn(jobResponsePb); @@ -1139,9 +1188,7 @@ public void testQueryRequestCompleted() throws InterruptedException { bigqueryRpcMock.getQueryResults( PROJECT, JOB, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS))) .andReturn(responsePb); - EasyMock.expect( - bigqueryRpcMock.listTableData( - PROJECT, DATASET, TABLE, Collections.emptyMap())) + EasyMock.expect(bigqueryRpcMock.listTableData(PROJECT, DATASET, TABLE, optionMap)) .andReturn( new TableDataList() .setPageToken("") @@ -1151,7 +1198,8 @@ public void testQueryRequestCompleted() throws InterruptedException { EasyMock.replay(bigqueryRpcMock); bigquery = options.getService(); // TODO(pongad): pagesize = 42 - TableResult result = bigquery.query(QUERY_JOB_CONFIGURATION_FOR_QUERY, queryJob); + Job job = bigquery.create(JobInfo.of(queryJob, QUERY_JOB_CONFIGURATION_FOR_QUERY)); + TableResult result = job.getQueryResults(pageSizeOption); assertThat(result.getSchema()).isEqualTo(TABLE_SCHEMA); assertThat(result.getTotalRows()).isEqualTo(1); for (FieldValueList row : result.getValues()) { @@ -1196,10 +1244,6 @@ public void testQueryRequestCompletedOnSecondAttempt() throws InterruptedExcepti JOB_INFO.toPb(), Collections.emptyMap())) .andReturn(jobResponsePb1); - QueryResultsOption pageSizeOption = QueryResultsOption.pageSize(42L); - Map optionMap = Maps.newEnumMap(BigQueryRpc.Option.class); - optionMap.put(pageSizeOption.getRpcOption(), pageSizeOption.getValue()); - EasyMock.expect( bigqueryRpcMock.getQueryResults( PROJECT, JOB, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))