diff --git a/src/main/idls b/src/main/idls index 615f1ca8b..b1a9b9ede 160000 --- a/src/main/idls +++ b/src/main/idls @@ -1 +1 @@ -Subproject commit 615f1ca8b6d101b40b6006bc900adde7f846a9ec +Subproject commit b1a9b9ede5fbd29a5b67cb8e082f602488ce0446 diff --git a/src/main/java/com/uber/cadence/client/WorkflowAlreadyCompletedException.java b/src/main/java/com/uber/cadence/client/WorkflowAlreadyCompletedException.java new file mode 100644 index 000000000..91db29697 --- /dev/null +++ b/src/main/java/com/uber/cadence/client/WorkflowAlreadyCompletedException.java @@ -0,0 +1,33 @@ +/* + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.client; + +import com.uber.cadence.WorkflowExecution; +import java.util.Optional; + +/** + * Thrown when workflow already completed its execution and when the client is trying to run an + * operation on the workflow like signal, terminate, cancel, poll etc. + */ +public final class WorkflowAlreadyCompletedException extends WorkflowException { + + public WorkflowAlreadyCompletedException( + WorkflowExecution execution, Optional workflowType, String message) { + super(message, execution, workflowType, null); + } +} diff --git a/src/main/java/com/uber/cadence/internal/Version.java b/src/main/java/com/uber/cadence/internal/Version.java index ddb833637..6770e6f0a 100644 --- a/src/main/java/com/uber/cadence/internal/Version.java +++ b/src/main/java/com/uber/cadence/internal/Version.java @@ -43,7 +43,7 @@ public class Version { * support. This can be used for client capibility check, on Cadence server, for backward * compatibility Format: MAJOR.MINOR.PATCH */ - public static final String FEATURE_VERSION = "1.3.0"; + public static final String FEATURE_VERSION = "1.4.0"; static { // Load version from version.properties generated by Gradle into build/resources/main directory. diff --git a/src/main/java/com/uber/cadence/internal/common/RpcRetryer.java b/src/main/java/com/uber/cadence/internal/common/RpcRetryer.java index 4f27f6f87..0960273f0 100644 --- a/src/main/java/com/uber/cadence/internal/common/RpcRetryer.java +++ b/src/main/java/com/uber/cadence/internal/common/RpcRetryer.java @@ -25,6 +25,7 @@ import com.uber.cadence.DomainNotActiveError; import com.uber.cadence.EntityNotExistsError; import com.uber.cadence.QueryFailedError; +import com.uber.cadence.WorkflowExecutionAlreadyCompletedError; import com.uber.cadence.WorkflowExecutionAlreadyStartedError; import com.uber.cadence.common.RetryOptions; import java.time.Duration; @@ -59,6 +60,7 @@ public final class RpcRetryer { roBuilder.setDoNotRetry( BadRequestError.class, EntityNotExistsError.class, + WorkflowExecutionAlreadyCompletedError.class, WorkflowExecutionAlreadyStartedError.class, DomainAlreadyExistsError.class, QueryFailedError.class, diff --git a/src/main/java/com/uber/cadence/internal/external/ManualActivityCompletionClientImpl.java b/src/main/java/com/uber/cadence/internal/external/ManualActivityCompletionClientImpl.java index 3367e705a..b41bb0c31 100644 --- a/src/main/java/com/uber/cadence/internal/external/ManualActivityCompletionClientImpl.java +++ b/src/main/java/com/uber/cadence/internal/external/ManualActivityCompletionClientImpl.java @@ -27,6 +27,7 @@ import com.uber.cadence.RespondActivityTaskFailedByIDRequest; import com.uber.cadence.RespondActivityTaskFailedRequest; import com.uber.cadence.WorkflowExecution; +import com.uber.cadence.WorkflowExecutionAlreadyCompletedError; import com.uber.cadence.client.ActivityCancelledException; import com.uber.cadence.client.ActivityCompletionFailureException; import com.uber.cadence.client.ActivityNotExistsException; @@ -99,6 +100,8 @@ public void complete(Object result) { metricsScope.counter(MetricsType.ACTIVITY_TASK_COMPLETED_COUNTER).inc(1); } catch (EntityNotExistsError e) { throw new ActivityNotExistsException(e); + } catch (WorkflowExecutionAlreadyCompletedError e) { + throw new ActivityNotExistsException(e); } catch (TException e) { throw new ActivityCompletionFailureException(e); } @@ -119,6 +122,8 @@ public void complete(Object result) { metricsScope.counter(MetricsType.ACTIVITY_TASK_COMPLETED_BY_ID_COUNTER).inc(1); } catch (EntityNotExistsError e) { throw new ActivityNotExistsException(e); + } catch (WorkflowExecutionAlreadyCompletedError e) { + throw new ActivityNotExistsException(e); } catch (TException e) { throw new ActivityCompletionFailureException(activityId, e); } @@ -141,6 +146,8 @@ public void fail(Throwable failure) { metricsScope.counter(MetricsType.ACTIVITY_TASK_FAILED_COUNTER).inc(1); } catch (EntityNotExistsError e) { throw new ActivityNotExistsException(e); + } catch (WorkflowExecutionAlreadyCompletedError e) { + throw new ActivityNotExistsException(e); } catch (TException e) { throw new ActivityCompletionFailureException(e); } @@ -156,6 +163,8 @@ public void fail(Throwable failure) { metricsScope.counter(MetricsType.ACTIVITY_TASK_FAILED_BY_ID_COUNTER).inc(1); } catch (EntityNotExistsError e) { throw new ActivityNotExistsException(e); + } catch (WorkflowExecutionAlreadyCompletedError e) { + throw new ActivityNotExistsException(e); } catch (TException e) { throw new ActivityCompletionFailureException(activityId, e); } @@ -176,6 +185,8 @@ public void recordHeartbeat(Object details) throws CancellationException { } } catch (EntityNotExistsError e) { throw new ActivityNotExistsException(e); + } catch (WorkflowExecutionAlreadyCompletedError e) { + throw new ActivityNotExistsException(e); } catch (TException e) { throw new ActivityCompletionFailureException(e); } diff --git a/src/main/java/com/uber/cadence/internal/sync/ActivityExecutionContextImpl.java b/src/main/java/com/uber/cadence/internal/sync/ActivityExecutionContextImpl.java index d6f2af461..f71f05f0c 100644 --- a/src/main/java/com/uber/cadence/internal/sync/ActivityExecutionContextImpl.java +++ b/src/main/java/com/uber/cadence/internal/sync/ActivityExecutionContextImpl.java @@ -22,6 +22,7 @@ import com.uber.cadence.RecordActivityTaskHeartbeatRequest; import com.uber.cadence.RecordActivityTaskHeartbeatResponse; import com.uber.cadence.WorkflowExecution; +import com.uber.cadence.WorkflowExecutionAlreadyCompletedError; import com.uber.cadence.activity.ActivityTask; import com.uber.cadence.client.ActivityCancelledException; import com.uber.cadence.client.ActivityCompletionException; @@ -176,6 +177,8 @@ private void sendHeartbeatRequest(Object details) throws TException { } } catch (EntityNotExistsError e) { lastException = new ActivityNotExistsException(task, e); + } catch (WorkflowExecutionAlreadyCompletedError e) { + throw new ActivityNotExistsException(task, e); } catch (BadRequestError e) { lastException = new ActivityCompletionFailureException(task, e); } diff --git a/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java b/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java index 30dcb206d..613f4d776 100644 --- a/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java +++ b/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java @@ -337,7 +337,8 @@ private WorkflowServiceWrapper(IWorkflowService impl) { @Override public RecordActivityTaskHeartbeatResponse RecordActivityTaskHeartbeat( RecordActivityTaskHeartbeatRequest heartbeatRequest) - throws BadRequestError, InternalServiceError, EntityNotExistsError, TException { + throws BadRequestError, InternalServiceError, EntityNotExistsError, + WorkflowExecutionAlreadyCompletedError, TException { if (activityHeartbetListener != null) { Object details = testEnvironmentOptions @@ -355,60 +356,68 @@ public RecordActivityTaskHeartbeatResponse RecordActivityTaskHeartbeat( @Override public RecordActivityTaskHeartbeatResponse RecordActivityTaskHeartbeatByID( RecordActivityTaskHeartbeatByIDRequest heartbeatRequest) - throws BadRequestError, InternalServiceError, EntityNotExistsError, DomainNotActiveError, - LimitExceededError, ServiceBusyError, TException { + throws BadRequestError, InternalServiceError, EntityNotExistsError, + WorkflowExecutionAlreadyCompletedError, DomainNotActiveError, LimitExceededError, + ServiceBusyError, TException { return impl.RecordActivityTaskHeartbeatByID(heartbeatRequest); } @Override public void RespondActivityTaskCompleted(RespondActivityTaskCompletedRequest completeRequest) - throws BadRequestError, InternalServiceError, EntityNotExistsError, TException { + throws BadRequestError, InternalServiceError, EntityNotExistsError, + WorkflowExecutionAlreadyCompletedError, TException { impl.RespondActivityTaskCompleted(completeRequest); } @Override public void RespondActivityTaskCompletedByID( RespondActivityTaskCompletedByIDRequest completeRequest) - throws BadRequestError, InternalServiceError, EntityNotExistsError, TException { + throws BadRequestError, InternalServiceError, EntityNotExistsError, + WorkflowExecutionAlreadyCompletedError, TException { impl.RespondActivityTaskCompletedByID(completeRequest); } @Override public void RespondActivityTaskFailed(RespondActivityTaskFailedRequest failRequest) - throws BadRequestError, InternalServiceError, EntityNotExistsError, TException { + throws BadRequestError, InternalServiceError, EntityNotExistsError, + WorkflowExecutionAlreadyCompletedError, TException { impl.RespondActivityTaskFailed(failRequest); } @Override public void RespondActivityTaskFailedByID(RespondActivityTaskFailedByIDRequest failRequest) - throws BadRequestError, InternalServiceError, EntityNotExistsError, TException { + throws BadRequestError, InternalServiceError, EntityNotExistsError, + WorkflowExecutionAlreadyCompletedError, TException { impl.RespondActivityTaskFailedByID(failRequest); } @Override public void RespondActivityTaskCanceled(RespondActivityTaskCanceledRequest canceledRequest) - throws BadRequestError, InternalServiceError, EntityNotExistsError, TException { + throws BadRequestError, InternalServiceError, EntityNotExistsError, + WorkflowExecutionAlreadyCompletedError, TException { impl.RespondActivityTaskCanceled(canceledRequest); } @Override public void RespondActivityTaskCanceledByID( RespondActivityTaskCanceledByIDRequest canceledRequest) - throws BadRequestError, InternalServiceError, EntityNotExistsError, TException { + throws BadRequestError, InternalServiceError, EntityNotExistsError, + WorkflowExecutionAlreadyCompletedError, TException { impl.RespondActivityTaskCanceledByID(canceledRequest); } @Override public void RequestCancelWorkflowExecution(RequestCancelWorkflowExecutionRequest cancelRequest) throws BadRequestError, InternalServiceError, EntityNotExistsError, - CancellationAlreadyRequestedError, ServiceBusyError, TException { + CancellationAlreadyRequestedError, ServiceBusyError, + WorkflowExecutionAlreadyCompletedError, TException { impl.RequestCancelWorkflowExecution(cancelRequest); } @Override public void SignalWorkflowExecution(SignalWorkflowExecutionRequest signalRequest) - throws BadRequestError, InternalServiceError, EntityNotExistsError, ServiceBusyError, - TException { + throws BadRequestError, InternalServiceError, EntityNotExistsError, + WorkflowExecutionAlreadyCompletedError, ServiceBusyError, TException { impl.SignalWorkflowExecution(signalRequest); } @@ -431,8 +440,8 @@ public ResetWorkflowExecutionResponse ResetWorkflowExecution( @Override public void TerminateWorkflowExecution(TerminateWorkflowExecutionRequest terminateRequest) - throws BadRequestError, InternalServiceError, EntityNotExistsError, ServiceBusyError, - TException { + throws BadRequestError, InternalServiceError, EntityNotExistsError, + WorkflowExecutionAlreadyCompletedError, ServiceBusyError, TException { impl.TerminateWorkflowExecution(terminateRequest); } @@ -492,7 +501,8 @@ public GetSearchAttributesResponse GetSearchAttributes() @Override public void RespondQueryTaskCompleted(RespondQueryTaskCompletedRequest completeRequest) - throws BadRequestError, InternalServiceError, EntityNotExistsError, TException { + throws BadRequestError, InternalServiceError, EntityNotExistsError, + WorkflowExecutionAlreadyCompletedError, TException { impl.RespondQueryTaskCompleted(completeRequest); } @@ -882,13 +892,15 @@ public PollForDecisionTaskResponse PollForDecisionTask(PollForDecisionTaskReques @Override public RespondDecisionTaskCompletedResponse RespondDecisionTaskCompleted( RespondDecisionTaskCompletedRequest completeRequest) - throws BadRequestError, InternalServiceError, EntityNotExistsError, TException { + throws BadRequestError, InternalServiceError, EntityNotExistsError, + WorkflowExecutionAlreadyCompletedError, TException { return impl.RespondDecisionTaskCompleted(completeRequest); } @Override public void RespondDecisionTaskFailed(RespondDecisionTaskFailedRequest failedRequest) - throws BadRequestError, InternalServiceError, EntityNotExistsError, TException { + throws BadRequestError, InternalServiceError, EntityNotExistsError, + WorkflowExecutionAlreadyCompletedError, TException { impl.RespondDecisionTaskFailed(failedRequest); } diff --git a/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java b/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java index 839fc8eac..90a9b00c8 100644 --- a/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java +++ b/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java @@ -82,6 +82,7 @@ import com.uber.cadence.UpdateDomainRequest; import com.uber.cadence.UpdateDomainResponse; import com.uber.cadence.WorkflowExecution; +import com.uber.cadence.WorkflowExecutionAlreadyCompletedError; import com.uber.cadence.WorkflowExecutionAlreadyStartedError; import com.uber.cadence.client.ActivityCompletionClient; import com.uber.cadence.client.WorkflowClient; @@ -764,13 +765,15 @@ public PollForDecisionTaskResponse PollForDecisionTask(PollForDecisionTaskReques @Override public RespondDecisionTaskCompletedResponse RespondDecisionTaskCompleted( RespondDecisionTaskCompletedRequest completeRequest) - throws BadRequestError, InternalServiceError, EntityNotExistsError, TException { + throws BadRequestError, InternalServiceError, EntityNotExistsError, + WorkflowExecutionAlreadyCompletedError, TException { return impl.RespondDecisionTaskCompleted(completeRequest); } @Override public void RespondDecisionTaskFailed(RespondDecisionTaskFailedRequest failedRequest) - throws BadRequestError, InternalServiceError, EntityNotExistsError, TException { + throws BadRequestError, InternalServiceError, EntityNotExistsError, + WorkflowExecutionAlreadyCompletedError, TException { impl.RespondDecisionTaskFailed(failedRequest); } diff --git a/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java b/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java index cc5aa722a..978720a23 100644 --- a/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java +++ b/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java @@ -23,6 +23,7 @@ import com.uber.cadence.QueryRejectCondition; import com.uber.cadence.QueryWorkflowResponse; import com.uber.cadence.WorkflowExecution; +import com.uber.cadence.WorkflowExecutionAlreadyCompletedError; import com.uber.cadence.WorkflowExecutionAlreadyStartedError; import com.uber.cadence.WorkflowType; import com.uber.cadence.client.*; @@ -390,6 +391,9 @@ private R mapToWorkflowFailureException( execution.get(), workflowType, executionFailed.getDecisionTaskCompletedEventId(), cause); } else if (failure instanceof EntityNotExistsError) { throw new WorkflowNotFoundException(execution.get(), workflowType, failure.getMessage()); + } else if (failure instanceof WorkflowExecutionAlreadyCompletedError) { + throw new WorkflowAlreadyCompletedException( + execution.get(), workflowType, failure.getMessage()); } else if (failure instanceof CancellationException) { throw (CancellationException) failure; } else if (failure instanceof WorkflowException) { diff --git a/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableState.java b/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableState.java index 670152227..c14b2081b 100644 --- a/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableState.java +++ b/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableState.java @@ -48,6 +48,7 @@ import com.uber.cadence.StartChildWorkflowExecutionFailedEventAttributes; import com.uber.cadence.StartWorkflowExecutionRequest; import com.uber.cadence.StickyExecutionAttributes; +import com.uber.cadence.WorkflowExecutionAlreadyCompletedError; import com.uber.cadence.WorkflowExecutionCloseStatus; import com.uber.cadence.internal.testservice.TestWorkflowMutableStateImpl.QueryId; import java.util.Optional; @@ -76,7 +77,8 @@ void failSignalExternalWorkflowExecution( throws EntityNotExistsError, InternalServiceError, BadRequestError; void failDecisionTask(RespondDecisionTaskFailedRequest request) - throws InternalServiceError, EntityNotExistsError, BadRequestError; + throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError, + BadRequestError; void childWorkflowStarted(ChildWorkflowExecutionStartedEventAttributes a) throws InternalServiceError, EntityNotExistsError, BadRequestError; @@ -104,34 +106,44 @@ void startActivityTask(PollForActivityTaskResponse task, PollForActivityTaskRequ throws InternalServiceError, EntityNotExistsError, BadRequestError; void completeActivityTask(String activityId, RespondActivityTaskCompletedRequest request) - throws InternalServiceError, EntityNotExistsError, BadRequestError; + throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError, + BadRequestError; void completeActivityTaskById(String activityId, RespondActivityTaskCompletedByIDRequest request) - throws InternalServiceError, EntityNotExistsError, BadRequestError; + throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError, + BadRequestError; void failActivityTask(String activityId, RespondActivityTaskFailedRequest request) - throws InternalServiceError, EntityNotExistsError, BadRequestError; + throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError, + BadRequestError; void failActivityTaskById(String id, RespondActivityTaskFailedByIDRequest failRequest) - throws EntityNotExistsError, InternalServiceError, BadRequestError; + throws EntityNotExistsError, InternalServiceError, WorkflowExecutionAlreadyCompletedError, + BadRequestError; RecordActivityTaskHeartbeatResponse heartbeatActivityTask(String activityId, byte[] details) - throws InternalServiceError, EntityNotExistsError, BadRequestError; + throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError, + BadRequestError; void signal(SignalWorkflowExecutionRequest signalRequest) - throws EntityNotExistsError, InternalServiceError, BadRequestError; + throws EntityNotExistsError, InternalServiceError, WorkflowExecutionAlreadyCompletedError, + BadRequestError; void signalFromWorkflow(SignalExternalWorkflowExecutionDecisionAttributes a) - throws EntityNotExistsError, InternalServiceError, BadRequestError; + throws EntityNotExistsError, InternalServiceError, WorkflowExecutionAlreadyCompletedError, + BadRequestError; void requestCancelWorkflowExecution(RequestCancelWorkflowExecutionRequest cancelRequest) - throws EntityNotExistsError, InternalServiceError, BadRequestError; + throws EntityNotExistsError, InternalServiceError, WorkflowExecutionAlreadyCompletedError, + BadRequestError; void cancelActivityTask(String id, RespondActivityTaskCanceledRequest canceledRequest) - throws EntityNotExistsError, InternalServiceError, BadRequestError; + throws EntityNotExistsError, InternalServiceError, WorkflowExecutionAlreadyCompletedError, + BadRequestError; void cancelActivityTaskById(String id, RespondActivityTaskCanceledByIDRequest canceledRequest) - throws EntityNotExistsError, InternalServiceError, BadRequestError; + throws EntityNotExistsError, InternalServiceError, WorkflowExecutionAlreadyCompletedError, + BadRequestError; QueryWorkflowResponse query(QueryWorkflowRequest queryRequest) throws TException; diff --git a/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java b/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java index c0ab74720..2a6a53a89 100644 --- a/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java +++ b/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java @@ -84,6 +84,7 @@ import com.uber.cadence.UpsertWorkflowSearchAttributesDecisionAttributes; import com.uber.cadence.UpsertWorkflowSearchAttributesEventAttributes; import com.uber.cadence.WorkflowExecution; +import com.uber.cadence.WorkflowExecutionAlreadyCompletedError; import com.uber.cadence.WorkflowExecutionCloseStatus; import com.uber.cadence.WorkflowExecutionContinuedAsNewEventAttributes; import com.uber.cadence.WorkflowExecutionSignaledEventAttributes; @@ -1181,10 +1182,12 @@ public void startActivityTask( }); } - private void checkCompleted() throws EntityNotExistsError { + private void checkCompleted() + throws EntityNotExistsError, WorkflowExecutionAlreadyCompletedError { State workflowState = workflow.getState(); if (isTerminalState(workflowState)) { - throw new EntityNotExistsError("Workflow is already completed: " + workflowState); + throw new WorkflowExecutionAlreadyCompletedError( + "Workflow is already completed: " + workflowState); } } diff --git a/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java b/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java index f73fa3c6f..79f3385fb 100644 --- a/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java +++ b/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java @@ -87,6 +87,7 @@ import com.uber.cadence.UpdateDomainRequest; import com.uber.cadence.UpdateDomainResponse; import com.uber.cadence.WorkflowExecution; +import com.uber.cadence.WorkflowExecutionAlreadyCompletedError; import com.uber.cadence.WorkflowExecutionAlreadyStartedError; import com.uber.cadence.WorkflowExecutionCloseStatus; import com.uber.cadence.WorkflowExecutionContinuedAsNewEventAttributes; @@ -567,7 +568,8 @@ public void signalExternalWorkflowExecution( String signalId, SignalExternalWorkflowExecutionDecisionAttributes a, TestWorkflowMutableState source) - throws InternalServiceError, EntityNotExistsError, BadRequestError { + throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError, + BadRequestError { ExecutionId executionId = new ExecutionId(a.getDomain(), a.getExecution()); TestWorkflowMutableState mutableState = null; try { diff --git a/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java b/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java index 8efa3bbd4..cd9d5c92c 100644 --- a/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java +++ b/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java @@ -85,6 +85,7 @@ import com.uber.cadence.TerminateWorkflowExecutionRequest; import com.uber.cadence.UpdateDomainRequest; import com.uber.cadence.UpdateDomainResponse; +import com.uber.cadence.WorkflowExecutionAlreadyCompletedError; import com.uber.cadence.WorkflowExecutionAlreadyStartedError; import com.uber.cadence.WorkflowService; import com.uber.cadence.WorkflowService.GetWorkflowExecutionHistory_result; @@ -292,6 +293,7 @@ private T measureRemoteCall(String scopeName, RemoteCall call) throws TEx sw.stop(); return resp; } catch (EntityNotExistsError + | WorkflowExecutionAlreadyCompletedError | BadRequestError | DomainAlreadyExistsError | WorkflowExecutionAlreadyStartedError @@ -862,7 +864,7 @@ private RecordActivityTaskHeartbeatResponse recordActivityTaskHeartbeat( public RecordActivityTaskHeartbeatResponse RecordActivityTaskHeartbeatByID( RecordActivityTaskHeartbeatByIDRequest heartbeatRequest) throws BadRequestError, InternalServiceError, EntityNotExistsError, DomainNotActiveError, - LimitExceededError, ServiceBusyError, TException { + WorkflowExecutionAlreadyCompletedError, LimitExceededError, ServiceBusyError, TException { return measureRemoteCall( ServiceMethod.RECORD_ACTIVITY_TASK_HEARTBEAT_BY_ID, () -> recordActivityTaskHeartbeatByID(heartbeatRequest)); @@ -1818,7 +1820,8 @@ private QueryWorkflowResponse queryWorkflow(QueryWorkflowRequest queryRequest) t @Override public ResetStickyTaskListResponse ResetStickyTaskList(ResetStickyTaskListRequest resetRequest) throws BadRequestError, InternalServiceError, EntityNotExistsError, LimitExceededError, - ServiceBusyError, DomainNotActiveError, TException { + WorkflowExecutionAlreadyCompletedError, ServiceBusyError, DomainNotActiveError, + TException { return measureRemoteCall( ServiceMethod.RESET_STICKY_TASK_LIST, () -> resetStickyTaskList(resetRequest)); } diff --git a/src/test/java/com/uber/cadence/internal/common/RetryerTest.java b/src/test/java/com/uber/cadence/internal/common/RetryerTest.java index 612764e85..4121c9a8b 100644 --- a/src/test/java/com/uber/cadence/internal/common/RetryerTest.java +++ b/src/test/java/com/uber/cadence/internal/common/RetryerTest.java @@ -21,8 +21,19 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import com.uber.cadence.BadRequestError; +import com.uber.cadence.CancellationAlreadyRequestedError; +import com.uber.cadence.DomainAlreadyExistsError; +import com.uber.cadence.DomainNotActiveError; +import com.uber.cadence.EntityNotExistsError; +import com.uber.cadence.QueryFailedError; +import com.uber.cadence.WorkflowExecutionAlreadyCompletedError; +import com.uber.cadence.WorkflowExecutionAlreadyStartedError; import com.uber.cadence.common.RetryOptions; import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import org.junit.Test; @@ -115,13 +126,16 @@ public void testAddDoNotRetry() throws InterruptedException { .setExpiration(Duration.ofSeconds(100)) .validateBuildWithDefaults(); options = options.addDoNotRetry(InterruptedException.class); - long start = System.currentTimeMillis(); + // need to use array (or an object) since we cannot change the + // value of the variable inside the lambda function. + int[] numberOfCalls = {0}; try { RpcRetryer.retryWithResultAsync( options, () -> { CompletableFuture result = new CompletableFuture<>(); result.completeExceptionally(new InterruptedException("simulated")); + ++numberOfCalls[0]; return result; }) .get(); @@ -130,7 +144,8 @@ public void testAddDoNotRetry() throws InterruptedException { assertTrue(e.getCause() instanceof InterruptedException); assertEquals("simulated", e.getCause().getMessage()); } - assertTrue(System.currentTimeMillis() - start < 100000); + // Make sure the error wasn't retried + assertTrue(numberOfCalls[0] == 1); } @Test @@ -140,13 +155,16 @@ public void testMaxAttempt() throws InterruptedException { .setInitialInterval(Duration.ofMillis(10)) .setMaximumInterval(Duration.ofMillis(100)) .setExpiration(Duration.ofMillis(500)) - .setMaximumAttempts(1) + .setMaximumAttempts(3) .validateBuildWithDefaults(); - long start = System.currentTimeMillis(); + // need to use array (or an object) since we cannot change the + // value of the variable inside the lambda function. + int[] numberOfCalls = {0}; try { RpcRetryer.retryWithResultAsync( options, () -> { + ++numberOfCalls[0]; throw new IllegalArgumentException("simulated"); }) .get(); @@ -155,6 +173,34 @@ public void testMaxAttempt() throws InterruptedException { assertTrue(e.getCause() instanceof IllegalArgumentException); assertEquals("simulated", e.getCause().getMessage()); } - assertTrue(System.currentTimeMillis() - start < 500); + // Make sure the error wasn't retried + assertTrue(numberOfCalls[0] == 3); + } + + @Test + public void testNonRetriableExceptionList() { + // Since we tested retry and no-retry logic works correctly above, + // In this test we just ensure the default options contain the right + // set of Exceptions as non-retriable so we can make sure that any + // change to that list would be intentional. + + List> noRetryExceptions = + RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS.getDoNotRetry(); + List> expectedList = + new ArrayList<>( + Arrays.asList( + BadRequestError.class, + EntityNotExistsError.class, + WorkflowExecutionAlreadyCompletedError.class, + WorkflowExecutionAlreadyStartedError.class, + DomainAlreadyExistsError.class, + QueryFailedError.class, + DomainNotActiveError.class, + CancellationAlreadyRequestedError.class)); + + assertEquals(expectedList.size(), noRetryExceptions.size()); + for (Class exp : noRetryExceptions) { + assertTrue("Missing no retry exception in default options", expectedList.indexOf(exp) >= 0); + } } }