From c76b693db0da1017d38669c80ecf6ecb89e8f8bc Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Fri, 15 Dec 2017 18:33:47 +1100 Subject: [PATCH 1/2] bigquery: proper handle options to Job.getQueryResults --- .../java/com/google/cloud/bigquery/Job.java | 36 ++++++++++- .../cloud/bigquery/BigQueryImplTest.java | 60 ++++++++++++++++--- 2 files changed, 85 insertions(+), 11 deletions(-) diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java index b1d589c712c9..67671c18573f 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java @@ -25,9 +25,12 @@ import com.google.cloud.RetryOption; import com.google.cloud.bigquery.BigQuery.JobOption; import com.google.cloud.bigquery.BigQuery.QueryResultsOption; +import com.google.cloud.bigquery.BigQuery.TableDataListOption; import com.google.cloud.bigquery.JobConfiguration.Type; import java.io.IOException; import java.io.ObjectInputStream; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -272,13 +275,40 @@ public TableResult getQueryResults(QueryResultsOption... options) } TableId table = ((QueryJobConfiguration) getConfiguration()).getDestinationTable(); - // TODO(pongad): merge options? + + List waitOptions = new ArrayList<>(); + waitOptions.add(QueryResultsOption.pageSize(0L)); + List listOptions = new ArrayList<>(); + boolean hasWaitTime = false; + for (QueryResultsOption option : options) { + switch (option.getRpcOption()) { + case MAX_RESULTS: + listOptions.add(TableDataListOption.pageSize((Long) option.getValue())); + break; + case PAGE_TOKEN: + listOptions.add(TableDataListOption.pageToken((String) option.getValue())); + break; + case START_INDEX: + listOptions.add(TableDataListOption.startIndex((Long) option.getValue())); + break; + case TIMEOUT: + hasWaitTime = true; + waitOptions.add(QueryResultsOption.maxWaitTime((Long) option.getValue())); + break; + } + } + if (!hasWaitTime) { + waitOptions.add(QueryResultsOption.maxWaitTime(Duration.ofMinutes(1).toMillis())); + } + QueryResponse response = - waitForQueryResults(DEFAULT_JOB_WAIT_SETTINGS, DEFAULT_QUERY_WAIT_OPTIONS); + waitForQueryResults( + DEFAULT_JOB_WAIT_SETTINGS, waitOptions.toArray(new QueryResultsOption[0])); if (response.getSchema() == null) { throw new JobException(getJobId(), response.getErrors()); } - return bigquery.listTableData(table, response.getSchema()); + return bigquery.listTableData( + table, response.getSchema(), listOptions.toArray(new TableDataListOption[0])); } private QueryResponse waitForQueryResults( diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java index bf5ab801aefc..b18ffcccb71a 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java @@ -1127,6 +1127,55 @@ public void testQueryRequestCompleted() throws InterruptedException { .setTotalRows(BigInteger.valueOf(1L)) .setSchema(TABLE_SCHEMA.toPb()); + EasyMock.expect( + bigqueryRpcMock.create( + JOB_INFO.toPb(), Collections.emptyMap())) + .andReturn(jobResponsePb); + EasyMock.expect( + bigqueryRpcMock.getQueryResults( + PROJECT, JOB, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS))) + .andReturn(responsePb); + EasyMock.expect( + bigqueryRpcMock.listTableData( + PROJECT, DATASET, TABLE, Collections.emptyMap())) + .andReturn( + new TableDataList() + .setPageToken("") + .setRows(ImmutableList.of(TABLE_ROW)) + .setTotalRows(1L)); + + EasyMock.replay(bigqueryRpcMock); + bigquery = options.getService(); + TableResult result = bigquery.query(QUERY_JOB_CONFIGURATION_FOR_QUERY, queryJob); + assertThat(result.getSchema()).isEqualTo(TABLE_SCHEMA); + assertThat(result.getTotalRows()).isEqualTo(1); + for (FieldValueList row : result.getValues()) { + assertThat(row.get(0).getBooleanValue()).isFalse(); + assertThat(row.get(1).getLongValue()).isEqualTo(1); + } + } + + @Test + public void testQueryRequestCompletedOptions() throws InterruptedException { + JobId queryJob = JobId.of(PROJECT, JOB); + com.google.api.services.bigquery.model.Job jobResponsePb = + new com.google.api.services.bigquery.model.Job() + .setConfiguration(QUERY_JOB_CONFIGURATION_FOR_QUERY.toPb()) + .setJobReference(queryJob.toPb()) + .setId(JOB) + .setStatus(new com.google.api.services.bigquery.model.JobStatus().setState("DONE")); + jobResponsePb.getConfiguration().getQuery().setDestinationTable(TABLE_ID.toPb()); + GetQueryResultsResponse responsePb = + new GetQueryResultsResponse() + .setJobReference(queryJob.toPb()) + .setRows(ImmutableList.of(TABLE_ROW)) + .setJobComplete(true) + .setCacheHit(false) + .setPageToken(CURSOR) + .setTotalBytesProcessed(42L) + .setTotalRows(BigInteger.valueOf(1L)) + .setSchema(TABLE_SCHEMA.toPb()); + EasyMock.expect( bigqueryRpcMock.create(JOB_INFO.toPb(), Collections.emptyMap())) .andReturn(jobResponsePb); @@ -1139,9 +1188,7 @@ public void testQueryRequestCompleted() throws InterruptedException { bigqueryRpcMock.getQueryResults( PROJECT, JOB, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS))) .andReturn(responsePb); - EasyMock.expect( - bigqueryRpcMock.listTableData( - PROJECT, DATASET, TABLE, Collections.emptyMap())) + EasyMock.expect(bigqueryRpcMock.listTableData(PROJECT, DATASET, TABLE, optionMap)) .andReturn( new TableDataList() .setPageToken("") @@ -1151,7 +1198,8 @@ public void testQueryRequestCompleted() throws InterruptedException { EasyMock.replay(bigqueryRpcMock); bigquery = options.getService(); // TODO(pongad): pagesize = 42 - TableResult result = bigquery.query(QUERY_JOB_CONFIGURATION_FOR_QUERY, queryJob); + Job job = bigquery.create(JobInfo.of(queryJob, QUERY_JOB_CONFIGURATION_FOR_QUERY)); + TableResult result = job.getQueryResults(pageSizeOption); assertThat(result.getSchema()).isEqualTo(TABLE_SCHEMA); assertThat(result.getTotalRows()).isEqualTo(1); for (FieldValueList row : result.getValues()) { @@ -1196,10 +1244,6 @@ public void testQueryRequestCompletedOnSecondAttempt() throws InterruptedExcepti JOB_INFO.toPb(), Collections.emptyMap())) .andReturn(jobResponsePb1); - QueryResultsOption pageSizeOption = QueryResultsOption.pageSize(42L); - Map optionMap = Maps.newEnumMap(BigQueryRpc.Option.class); - optionMap.put(pageSizeOption.getRpcOption(), pageSizeOption.getValue()); - EasyMock.expect( bigqueryRpcMock.getQueryResults( PROJECT, JOB, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS))) From e2fa800403ebaa1180744456158d18db84744a68 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Mon, 18 Dec 2017 14:58:40 +1100 Subject: [PATCH 2/2] pr comment --- .../java/com/google/cloud/bigquery/Job.java | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java index 67671c18573f..e7cc2712ea8a 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.concurrent.Callable; @@ -65,11 +66,9 @@ public class Job extends JobInfo { .setMaxRetryDelay(Duration.ofSeconds(3L)) .build(); - static final QueryResultsOption[] DEFAULT_QUERY_WAIT_OPTIONS = - new QueryResultsOption[] { - QueryResultsOption.pageSize(0L), - QueryResultsOption.maxWaitTime(Duration.ofMinutes(1).toMillis()) - }; + static final QueryResultsOption[] DEFAULT_QUERY_WAIT_OPTIONS = { + QueryResultsOption.pageSize(0L), + }; private final BigQueryOptions options; private transient BigQuery bigquery; @@ -276,10 +275,9 @@ public TableResult getQueryResults(QueryResultsOption... options) TableId table = ((QueryJobConfiguration) getConfiguration()).getDestinationTable(); - List waitOptions = new ArrayList<>(); - waitOptions.add(QueryResultsOption.pageSize(0L)); + List waitOptions = + new ArrayList<>(Arrays.asList(DEFAULT_QUERY_WAIT_OPTIONS)); List listOptions = new ArrayList<>(); - boolean hasWaitTime = false; for (QueryResultsOption option : options) { switch (option.getRpcOption()) { case MAX_RESULTS: @@ -292,14 +290,10 @@ public TableResult getQueryResults(QueryResultsOption... options) listOptions.add(TableDataListOption.startIndex((Long) option.getValue())); break; case TIMEOUT: - hasWaitTime = true; waitOptions.add(QueryResultsOption.maxWaitTime((Long) option.getValue())); break; } } - if (!hasWaitTime) { - waitOptions.add(QueryResultsOption.maxWaitTime(Duration.ofMinutes(1).toMillis())); - } QueryResponse response = waitForQueryResults(