Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
## Unreleased
* Wrap gRPC StatusRuntimeException across all DurableTaskGrpcClient methods ([#278](https://github.com/microsoft/durabletask-java/pull/278))
* Add work item filtering support for `DurableTaskGrpcWorker` to enable worker-side filtering of orchestration and activity work items ([#275](https://github.com/microsoft/durabletask-java/pull/275))
* Add support for calls to HTTP endpoints ([#271](https://github.com/microsoft/durabletask-java/pull/271))
* Add getSuspendPostUri and getResumePostUri getters to HttpManagementPayload ([#264](https://github.com/microsoft/durabletask-java/pull/264))
Expand Down
1 change: 1 addition & 0 deletions client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ dependencies {
implementation "io.opentelemetry:opentelemetry-api:${openTelemetryVersion}"
implementation "io.opentelemetry:opentelemetry-context:${openTelemetryVersion}"

testImplementation "io.grpc:grpc-inprocess:${grpcVersion}"
testImplementation "io.opentelemetry:opentelemetry-sdk:${openTelemetryVersion}"
testImplementation "io.opentelemetry:opentelemetry-sdk-trace:${openTelemetryVersion}"
testImplementation "io.opentelemetry:opentelemetry-sdk-testing:${openTelemetryVersion}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ public String scheduleNewOrchestrationInstance(
CreateInstanceRequest request = builder.build();
CreateInstanceResponse response = this.sidecarClient.startInstance(request);
return response.getInstanceId();
} catch (StatusRuntimeException e) {
throw StatusRuntimeExceptionHelper.toRuntimeException(e, "scheduleNewOrchestrationInstance");
} finally {
createScope.close();
createSpan.end();
Expand All @@ -184,7 +186,11 @@ public void raiseEvent(String instanceId, String eventName, Object eventPayload)
}

RaiseEventRequest request = builder.build();
this.sidecarClient.raiseEvent(request);
try {
this.sidecarClient.raiseEvent(request);
} catch (StatusRuntimeException e) {
throw StatusRuntimeExceptionHelper.toRuntimeException(e, "raiseEvent");
}
}

@Override
Expand All @@ -193,8 +199,12 @@ public OrchestrationMetadata getInstanceMetadata(String instanceId, boolean getI
.setInstanceId(instanceId)
.setGetInputsAndOutputs(getInputsAndOutputs)
.build();
GetInstanceResponse response = this.sidecarClient.getInstance(request);
return new OrchestrationMetadata(response, this.dataConverter, request.getGetInputsAndOutputs());
try {
GetInstanceResponse response = this.sidecarClient.getInstance(request);
return new OrchestrationMetadata(response, this.dataConverter, request.getGetInputsAndOutputs());
} catch (StatusRuntimeException e) {
throw StatusRuntimeExceptionHelper.toRuntimeException(e, "getInstanceMetadata");
}
}

@Override
Expand All @@ -219,7 +229,13 @@ public OrchestrationMetadata waitForInstanceStart(String instanceId, Duration ti
if (e.getStatus().getCode() == Status.Code.DEADLINE_EXCEEDED) {
throw new TimeoutException("Start orchestration timeout reached.");
}
throw e;
Exception translated = StatusRuntimeExceptionHelper.toException(e, "waitForInstanceStart");
if (translated instanceof TimeoutException) {
throw (TimeoutException) translated;
} else if (translated instanceof RuntimeException) {
throw (RuntimeException) translated;
}
throw new RuntimeException(translated);
}
return new OrchestrationMetadata(response, this.dataConverter, request.getGetInputsAndOutputs());
}
Expand All @@ -246,7 +262,13 @@ public OrchestrationMetadata waitForInstanceCompletion(String instanceId, Durati
if (e.getStatus().getCode() == Status.Code.DEADLINE_EXCEEDED) {
throw new TimeoutException("Orchestration instance completion timeout reached.");
}
throw e;
Exception translated = StatusRuntimeExceptionHelper.toException(e, "waitForInstanceCompletion");
if (translated instanceof TimeoutException) {
throw (TimeoutException) translated;
} else if (translated instanceof RuntimeException) {
throw (RuntimeException) translated;
}
throw new RuntimeException(translated);
}
return new OrchestrationMetadata(response, this.dataConverter, request.getGetInputsAndOutputs());
}
Expand All @@ -263,7 +285,11 @@ public void terminate(String instanceId, @Nullable Object output) {
if (serializeOutput != null){
builder.setOutput(StringValue.of(serializeOutput));
}
this.sidecarClient.terminateInstance(builder.build());
try {
this.sidecarClient.terminateInstance(builder.build());
} catch (StatusRuntimeException e) {
throw StatusRuntimeExceptionHelper.toRuntimeException(e, "terminate");
}
}

@Override
Expand All @@ -277,8 +303,12 @@ public OrchestrationStatusQueryResult queryInstances(OrchestrationStatusQuery qu
instanceQueryBuilder.setMaxInstanceCount(query.getMaxInstanceCount());
query.getRuntimeStatusList().forEach(runtimeStatus -> Optional.ofNullable(runtimeStatus).ifPresent(status -> instanceQueryBuilder.addRuntimeStatus(OrchestrationRuntimeStatus.toProtobuf(status))));
query.getTaskHubNames().forEach(taskHubName -> Optional.ofNullable(taskHubName).ifPresent(name -> instanceQueryBuilder.addTaskHubNames(StringValue.of(name))));
QueryInstancesResponse queryInstancesResponse = this.sidecarClient.queryInstances(QueryInstancesRequest.newBuilder().setQuery(instanceQueryBuilder).build());
return toQueryResult(queryInstancesResponse, query.isFetchInputsAndOutputs());
try {
QueryInstancesResponse queryInstancesResponse = this.sidecarClient.queryInstances(QueryInstancesRequest.newBuilder().setQuery(instanceQueryBuilder).build());
return toQueryResult(queryInstancesResponse, query.isFetchInputsAndOutputs());
} catch (StatusRuntimeException e) {
throw StatusRuntimeExceptionHelper.toRuntimeException(e, "queryInstances");
}
}

private OrchestrationStatusQueryResult toQueryResult(QueryInstancesResponse queryInstancesResponse, boolean fetchInputsAndOutputs){
Expand All @@ -291,12 +321,20 @@ private OrchestrationStatusQueryResult toQueryResult(QueryInstancesResponse quer

@Override
public void createTaskHub(boolean recreateIfExists) {
this.sidecarClient.createTaskHub(CreateTaskHubRequest.newBuilder().setRecreateIfExists(recreateIfExists).build());
try {
this.sidecarClient.createTaskHub(CreateTaskHubRequest.newBuilder().setRecreateIfExists(recreateIfExists).build());
} catch (StatusRuntimeException e) {
throw StatusRuntimeExceptionHelper.toRuntimeException(e, "createTaskHub");
}
}

@Override
public void deleteTaskHub() {
this.sidecarClient.deleteTaskHub(DeleteTaskHubRequest.newBuilder().build());
try {
this.sidecarClient.deleteTaskHub(DeleteTaskHubRequest.newBuilder().build());
} catch (StatusRuntimeException e) {
throw StatusRuntimeExceptionHelper.toRuntimeException(e, "deleteTaskHub");
}
}

@Override
Expand All @@ -305,8 +343,12 @@ public PurgeResult purgeInstance(String instanceId) {
.setInstanceId(instanceId)
.build();

PurgeInstancesResponse response = this.sidecarClient.purgeInstances(request);
return toPurgeResult(response);
try {
PurgeInstancesResponse response = this.sidecarClient.purgeInstances(request);
return toPurgeResult(response);
} catch (StatusRuntimeException e) {
throw StatusRuntimeExceptionHelper.toRuntimeException(e, "purgeInstance");
}
}

@Override
Expand Down Expand Up @@ -334,7 +376,13 @@ public PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) t
String timeOutException = String.format("Purge instances timeout duration of %s reached.", timeout);
throw new TimeoutException(timeOutException);
}
throw e;
Exception translated = StatusRuntimeExceptionHelper.toException(e, "purgeInstances");
if (translated instanceof TimeoutException) {
throw (TimeoutException) translated;
} else if (translated instanceof RuntimeException) {
throw (RuntimeException) translated;
}
throw new RuntimeException(translated);
}
}

Expand All @@ -345,7 +393,11 @@ public void suspendInstance(String instanceId, @Nullable String reason) {
if (reason != null) {
suspendRequestBuilder.setReason(StringValue.of(reason));
}
this.sidecarClient.suspendInstance(suspendRequestBuilder.build());
try {
this.sidecarClient.suspendInstance(suspendRequestBuilder.build());
} catch (StatusRuntimeException e) {
throw StatusRuntimeExceptionHelper.toRuntimeException(e, "suspendInstance");
}
}

@Override
Expand All @@ -355,7 +407,11 @@ public void resumeInstance(String instanceId, @Nullable String reason) {
if (reason != null) {
resumeRequestBuilder.setReason(StringValue.of(reason));
}
this.sidecarClient.resumeInstance(resumeRequestBuilder.build());
try {
this.sidecarClient.resumeInstance(resumeRequestBuilder.build());
} catch (StatusRuntimeException e) {
throw StatusRuntimeExceptionHelper.toRuntimeException(e, "resumeInstance");
}
}

@Override
Expand All @@ -377,7 +433,7 @@ public void rewindInstance(String instanceId, @Nullable String reason) {
throw new IllegalStateException(
"Orchestration instance '" + instanceId + "' is not in a failed state and cannot be rewound.", e);
}
throw e;
throw StatusRuntimeExceptionHelper.toRuntimeException(e, "rewindInstance");
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.durabletask;

import io.grpc.Status;
import io.grpc.StatusRuntimeException;

import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeoutException;

/**
* Utility class to translate gRPC {@link StatusRuntimeException} into SDK-level exceptions.
* This ensures callers do not need to depend on gRPC types directly.
*
* <p>Status code mappings:
* <ul>
* <li>{@code CANCELLED} → {@link CancellationException}</li>
* <li>{@code DEADLINE_EXCEEDED} → {@link TimeoutException} (via {@link #toException})</li>
* <li>{@code INVALID_ARGUMENT} → {@link IllegalArgumentException}</li>
* <li>{@code FAILED_PRECONDITION} → {@link IllegalStateException}</li>
* <li>{@code NOT_FOUND} → {@link IllegalArgumentException}</li>
* <li>{@code UNIMPLEMENTED} → {@link UnsupportedOperationException}</li>
* <li>All other codes → {@link RuntimeException}</li>
* </ul>
*/
final class StatusRuntimeExceptionHelper {

/**
* Translates a {@link StatusRuntimeException} into an appropriate SDK-level unchecked exception.
*
* @param e the gRPC exception to translate
* @param operationName the name of the operation that failed, used in exception messages
* @return a translated RuntimeException (never returns null)
*/
static RuntimeException toRuntimeException(StatusRuntimeException e, String operationName) {
Status.Code code = e.getStatus().getCode();
String message = formatMessage(operationName, code, getDescriptionOrDefault(e));
switch (code) {
case CANCELLED:
return createCancellationException(e, operationName);
case INVALID_ARGUMENT:
return new IllegalArgumentException(message, e);
case FAILED_PRECONDITION:
return new IllegalStateException(message, e);
case NOT_FOUND:
return new IllegalArgumentException(message, e);
case UNIMPLEMENTED:
return new UnsupportedOperationException(message, e);
default:
return new RuntimeException(message, e);
}
}

/**
* Translates a {@link StatusRuntimeException} into an appropriate SDK-level checked exception
* for operations that declare {@code throws TimeoutException}.
* <p>
* Note: The DEADLINE_EXCEEDED case is included for completeness and future-proofing, even
* though current call sites handle DEADLINE_EXCEEDED before falling through to this method.
* This ensures centralized translation if call sites are refactored in the future.
*
* @param e the gRPC exception to translate
* @param operationName the name of the operation that failed, used in exception messages
* @return a translated Exception (never returns null)
*/
static Exception toException(StatusRuntimeException e, String operationName) {
Status.Code code = e.getStatus().getCode();
String message = formatMessage(operationName, code, getDescriptionOrDefault(e));
switch (code) {
case DEADLINE_EXCEEDED:
return new TimeoutException(message);
case CANCELLED:
return createCancellationException(e, operationName);
case INVALID_ARGUMENT:
return new IllegalArgumentException(message, e);
case FAILED_PRECONDITION:
return new IllegalStateException(message, e);
case NOT_FOUND:
return new IllegalArgumentException(message, e);
case UNIMPLEMENTED:
return new UnsupportedOperationException(message, e);
default:
return new RuntimeException(message, e);
}
}

private static CancellationException createCancellationException(
StatusRuntimeException e, String operationName) {
CancellationException ce = new CancellationException(
"The " + operationName + " operation was canceled.");
ce.initCause(e);
return ce;
}

private static String formatMessage(String operationName, Status.Code code, String description) {
return "The " + operationName + " operation failed with a " + code + " gRPC status: " + description;
}

private static String getDescriptionOrDefault(StatusRuntimeException e) {
String description = e.getStatus().getDescription();
return description != null ? description : "(no description)";
}

// Cannot be instantiated
private StatusRuntimeExceptionHelper() {
}
}
Loading
Loading