Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@
import com.google.cloud.RetryOption;
import com.google.cloud.bigquery.BigQuery.JobOption;
import com.google.cloud.bigquery.BigQuery.QueryResultsOption;
import com.google.cloud.bigquery.BigQuery.TableDataListOption;
import com.google.cloud.bigquery.JobConfiguration.Type;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -62,11 +66,9 @@ public class Job extends JobInfo {
.setMaxRetryDelay(Duration.ofSeconds(3L))
.build();

static final QueryResultsOption[] DEFAULT_QUERY_WAIT_OPTIONS =
new QueryResultsOption[] {
QueryResultsOption.pageSize(0L),
QueryResultsOption.maxWaitTime(Duration.ofMinutes(1).toMillis())
};
static final QueryResultsOption[] DEFAULT_QUERY_WAIT_OPTIONS = {
QueryResultsOption.pageSize(0L),
};

private final BigQueryOptions options;
private transient BigQuery bigquery;
Expand Down Expand Up @@ -272,13 +274,35 @@ public TableResult getQueryResults(QueryResultsOption... options)
}

TableId table = ((QueryJobConfiguration) getConfiguration()).getDestinationTable();
// TODO(pongad): merge options?

List<QueryResultsOption> waitOptions =
new ArrayList<>(Arrays.asList(DEFAULT_QUERY_WAIT_OPTIONS));
List<TableDataListOption> listOptions = new ArrayList<>();
for (QueryResultsOption option : options) {
switch (option.getRpcOption()) {
case MAX_RESULTS:
listOptions.add(TableDataListOption.pageSize((Long) option.getValue()));
break;
case PAGE_TOKEN:
listOptions.add(TableDataListOption.pageToken((String) option.getValue()));
break;
case START_INDEX:
listOptions.add(TableDataListOption.startIndex((Long) option.getValue()));
break;
case TIMEOUT:
waitOptions.add(QueryResultsOption.maxWaitTime((Long) option.getValue()));
break;
}
}

QueryResponse response =
waitForQueryResults(DEFAULT_JOB_WAIT_SETTINGS, DEFAULT_QUERY_WAIT_OPTIONS);
waitForQueryResults(
DEFAULT_JOB_WAIT_SETTINGS, waitOptions.toArray(new QueryResultsOption[0]));
if (response.getSchema() == null) {
throw new JobException(getJobId(), response.getErrors());
}
return bigquery.listTableData(table, response.getSchema());
return bigquery.listTableData(
table, response.getSchema(), listOptions.toArray(new TableDataListOption[0]));
}

private QueryResponse waitForQueryResults(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1127,6 +1127,55 @@ public void testQueryRequestCompleted() throws InterruptedException {
.setTotalRows(BigInteger.valueOf(1L))
.setSchema(TABLE_SCHEMA.toPb());

EasyMock.expect(
bigqueryRpcMock.create(
JOB_INFO.toPb(), Collections.<BigQueryRpc.Option, Object>emptyMap()))
.andReturn(jobResponsePb);
EasyMock.expect(
bigqueryRpcMock.getQueryResults(
PROJECT, JOB, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
.andReturn(responsePb);
EasyMock.expect(
bigqueryRpcMock.listTableData(
PROJECT, DATASET, TABLE, Collections.<BigQueryRpc.Option, Object>emptyMap()))
.andReturn(
new TableDataList()
.setPageToken("")
.setRows(ImmutableList.of(TABLE_ROW))
.setTotalRows(1L));

EasyMock.replay(bigqueryRpcMock);
bigquery = options.getService();
TableResult result = bigquery.query(QUERY_JOB_CONFIGURATION_FOR_QUERY, queryJob);
assertThat(result.getSchema()).isEqualTo(TABLE_SCHEMA);
assertThat(result.getTotalRows()).isEqualTo(1);
for (FieldValueList row : result.getValues()) {
assertThat(row.get(0).getBooleanValue()).isFalse();
assertThat(row.get(1).getLongValue()).isEqualTo(1);
}
}

@Test
public void testQueryRequestCompletedOptions() throws InterruptedException {
JobId queryJob = JobId.of(PROJECT, JOB);
com.google.api.services.bigquery.model.Job jobResponsePb =
new com.google.api.services.bigquery.model.Job()
.setConfiguration(QUERY_JOB_CONFIGURATION_FOR_QUERY.toPb())
.setJobReference(queryJob.toPb())
.setId(JOB)
.setStatus(new com.google.api.services.bigquery.model.JobStatus().setState("DONE"));
jobResponsePb.getConfiguration().getQuery().setDestinationTable(TABLE_ID.toPb());
GetQueryResultsResponse responsePb =
new GetQueryResultsResponse()
.setJobReference(queryJob.toPb())
.setRows(ImmutableList.of(TABLE_ROW))
.setJobComplete(true)
.setCacheHit(false)
.setPageToken(CURSOR)
.setTotalBytesProcessed(42L)
.setTotalRows(BigInteger.valueOf(1L))
.setSchema(TABLE_SCHEMA.toPb());

EasyMock.expect(
bigqueryRpcMock.create(JOB_INFO.toPb(), Collections.<BigQueryRpc.Option, Object>emptyMap()))
.andReturn(jobResponsePb);
Expand All @@ -1139,9 +1188,7 @@ public void testQueryRequestCompleted() throws InterruptedException {
bigqueryRpcMock.getQueryResults(
PROJECT, JOB, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
.andReturn(responsePb);
EasyMock.expect(
bigqueryRpcMock.listTableData(
PROJECT, DATASET, TABLE, Collections.<BigQueryRpc.Option, Object>emptyMap()))
EasyMock.expect(bigqueryRpcMock.listTableData(PROJECT, DATASET, TABLE, optionMap))
.andReturn(
new TableDataList()
.setPageToken("")
Expand All @@ -1151,7 +1198,8 @@ public void testQueryRequestCompleted() throws InterruptedException {
EasyMock.replay(bigqueryRpcMock);
bigquery = options.getService();
// TODO(pongad): pagesize = 42
TableResult result = bigquery.query(QUERY_JOB_CONFIGURATION_FOR_QUERY, queryJob);
Job job = bigquery.create(JobInfo.of(queryJob, QUERY_JOB_CONFIGURATION_FOR_QUERY));
TableResult result = job.getQueryResults(pageSizeOption);
assertThat(result.getSchema()).isEqualTo(TABLE_SCHEMA);
assertThat(result.getTotalRows()).isEqualTo(1);
for (FieldValueList row : result.getValues()) {
Expand Down Expand Up @@ -1196,10 +1244,6 @@ public void testQueryRequestCompletedOnSecondAttempt() throws InterruptedExcepti
JOB_INFO.toPb(), Collections.<BigQueryRpc.Option, Object>emptyMap()))
.andReturn(jobResponsePb1);

QueryResultsOption pageSizeOption = QueryResultsOption.pageSize(42L);
Map<BigQueryRpc.Option, Object> optionMap = Maps.newEnumMap(BigQueryRpc.Option.class);
optionMap.put(pageSizeOption.getRpcOption(), pageSizeOption.getValue());

EasyMock.expect(
bigqueryRpcMock.getQueryResults(
PROJECT, JOB, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
Expand Down