Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@
package io.serverlessworkflow.impl;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

public abstract class AbstractExecutorServiceHolder implements ExecutorServiceFactory {

protected ExecutorService service;

@Override
public void close() {
public void close() throws InterruptedException {
if (service != null && !service.isShutdown()) {
service.shutdown();
service.awaitTermination(2, TimeUnit.SECONDS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,4 @@
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

public interface ExecutorServiceFactory extends Supplier<ExecutorService>, AutoCloseable {
void close();
}
public interface ExecutorServiceFactory extends Supplier<ExecutorService>, AutoCloseable {}
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@
import static io.serverlessworkflow.impl.lifecycle.LifecycleEventsUtils.publishEvent;

import io.serverlessworkflow.impl.executors.TaskExecutorHelper;
import io.serverlessworkflow.impl.lifecycle.TaskResumedEvent;
import io.serverlessworkflow.impl.lifecycle.TaskSuspendedEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowCancelledEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowResumedEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -47,9 +47,7 @@ public class WorkflowMutableInstance implements WorkflowInstance {
private volatile WorkflowModel output;
private Lock statusLock = new ReentrantLock();
private CompletableFuture<WorkflowModel> completableFuture;
private CompletableFuture<TaskContext> suspended;
private TaskContext suspendedTask;
private CompletableFuture<TaskContext> cancelled;
private Map<CompletableFuture<TaskContext>, TaskContext> suspended;

WorkflowMutableInstance(WorkflowDefinition definition, WorkflowModel input) {
this.id = definition.application().idFactory().get();
Expand Down Expand Up @@ -88,11 +86,7 @@ private void whenFailed(WorkflowModel result, Throwable ex) {
}

private void handleException(Throwable ex) {
if (ex instanceof CancellationException) {
status.set(WorkflowStatus.CANCELLED);
publishEvent(
workflowContext, l -> l.onWorkflowCancelled(new WorkflowCancelledEvent(workflowContext)));
} else {
if (!(ex instanceof CancellationException)) {
status.set(WorkflowStatus.FAULTED);
publishEvent(
workflowContext, l -> l.onWorkflowFailed(new WorkflowFailedEvent(workflowContext, ex)));
Expand All @@ -107,7 +101,7 @@ private WorkflowModel whenSuccess(WorkflowModel node) {
.map(f -> f.apply(workflowContext, null, node))
.orElse(node);
workflowContext.definition().outputSchemaValidator().ifPresent(v -> v.validate(output));
status.compareAndSet(WorkflowStatus.RUNNING, WorkflowStatus.COMPLETED);
status.set(WorkflowStatus.COMPLETED);
publishEvent(
workflowContext, l -> l.onWorkflowCompleted(new WorkflowCompletedEvent(workflowContext)));
return output;
Expand Down Expand Up @@ -174,9 +168,9 @@ public String toString() {
public boolean suspend() {
try {
statusLock.lock();
if (TaskExecutorHelper.isActive(status.get())) {
suspended = new CompletableFuture<TaskContext>();
workflowContext.instance().status(WorkflowStatus.SUSPENDED);
if (TaskExecutorHelper.isActive(status.get()) && suspended == null) {
suspended = new ConcurrentHashMap<>();
status.set(WorkflowStatus.SUSPENDED);
publishEvent(
workflowContext,
l -> l.onWorkflowSuspended(new WorkflowSuspendedEvent(workflowContext)));
Expand All @@ -193,40 +187,27 @@ public boolean suspend() {
public boolean resume() {
try {
statusLock.lock();
if (suspended != null) {
if (suspendedTask != null) {
CompletableFuture<TaskContext> toBeCompleted = suspended;
suspended = null;
toBeCompleted.complete(suspendedTask);
publishEvent(
workflowContext,
l -> l.onTaskResumed(new TaskResumedEvent(workflowContext, suspendedTask)));
} else {
suspended = null;
}
if (TaskExecutorHelper.isActive(status.get()) && suspended != null) {
publishEvent(
workflowContext, l -> l.onWorkflowResumed(new WorkflowResumedEvent(workflowContext)));
suspended.forEach(
(k, v) -> {
k.complete(v);
});
suspended = null;
return true;
} else {
return false;
}
} finally {
statusLock.unlock();
}
return false;
}

public CompletableFuture<TaskContext> completedChecks(TaskContext t) {
public CompletableFuture<TaskContext> cancelCheck(TaskContext t) {
try {
statusLock.lock();
if (suspended != null) {
suspendedTask = t;
publishEvent(
workflowContext, l -> l.onTaskSuspended(new TaskSuspendedEvent(workflowContext, t)));
return suspended;
}
if (cancelled != null) {
cancelled = new CompletableFuture<TaskContext>();
workflowContext.instance().status(WorkflowStatus.CANCELLED);
if (status.get() == WorkflowStatus.CANCELLED) {
CompletableFuture<TaskContext> cancelled = new CompletableFuture<TaskContext>();
cancelled.completeExceptionally(
new CancellationException("Task " + t.taskName() + " has been cancelled"));
return cancelled;
Expand All @@ -237,12 +218,31 @@ public CompletableFuture<TaskContext> completedChecks(TaskContext t) {
return CompletableFuture.completedFuture(t);
}

public CompletableFuture<TaskContext> suspendedCheck(TaskContext t) {
try {
statusLock.lock();
if (suspended != null) {
CompletableFuture<TaskContext> suspendedTask = new CompletableFuture<TaskContext>();
suspended.put(suspendedTask, t);
return suspendedTask;
} else if (TaskExecutorHelper.isActive(status.get())) {
status.set(WorkflowStatus.RUNNING);
}
} finally {
statusLock.unlock();
}
return CompletableFuture.completedFuture(t);
}

@Override
public boolean cancel() {
try {
statusLock.lock();
if (TaskExecutorHelper.isActive(status.get())) {
cancelled = new CompletableFuture<TaskContext>();
status.set(WorkflowStatus.CANCELLED);
publishEvent(
workflowContext,
l -> l.onWorkflowCancelled(new WorkflowCancelledEvent(workflowContext)));
return true;
} else {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,4 @@ public interface EventConsumer<T extends EventRegistration, V extends EventRegis
T register(V builder, Consumer<CloudEvent> consumer);

void unregister(T register);

void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,4 @@

public interface EventPublisher extends AutoCloseable {
CompletableFuture<Void> publish(CloudEvent event);

void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ protected void unregisterFromAll() {
}

@Override
public void close() {
public void close() throws Exception {
topicMap.clear();
serviceFactory.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public CompletableFuture<TaskContext> apply(
if (ifFilter.map(f -> f.test(workflowContext, taskContext, input)).orElse(true)) {
return executeNext(
completable
.thenCompose(workflowContext.instance()::suspendedCheck)
.thenApply(
t -> {
publishEvent(
Expand All @@ -208,16 +209,14 @@ public CompletableFuture<TaskContext> apply(
return t;
})
.thenCompose(t -> execute(workflowContext, t))
.thenCompose(t -> workflowContext.instance().completedChecks(t))
.thenCompose(workflowContext.instance()::cancelCheck)
.whenComplete(
(t, e) -> {
if (e != null) {
handleException(
workflowContext,
taskContext,
e instanceof CompletionException ? e.getCause() : e);
} else {
workflowContext.instance().status(WorkflowStatus.RUNNING);
}
})
.thenApply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ public static boolean isActive(WorkflowContext context) {
}

public static boolean isActive(WorkflowStatus status) {
return status == WorkflowStatus.RUNNING || status == WorkflowStatus.WAITING;
return status == WorkflowStatus.RUNNING
|| status == WorkflowStatus.WAITING
|| status == WorkflowStatus.SUSPENDED;
}

public static TaskExecutor<?> createExecutorList(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.test;

import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath;
import static org.assertj.core.api.Assertions.assertThat;

import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.WorkflowInstance;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.WorkflowStatus;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

class ForkWaitTest {

private static WorkflowApplication appl;

@BeforeAll
static void init() throws IOException {
appl = WorkflowApplication.builder().build();
}

@AfterAll
static void tearDown() throws IOException {
appl.close();
}

@Test
void testForkWait() throws IOException, InterruptedException, ExecutionException {
assertModel(
appl.workflowDefinition(readWorkflowFromClasspath("workflows-samples/fork-wait.yaml"))
.instance(Map.of())
.start()
.join());
}

@Test
void testForkWaitWithSuspend() throws IOException, InterruptedException {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was failing without the fix

Workflow workflow = readWorkflowFromClasspath("workflows-samples/fork-wait.yaml");
WorkflowInstance instance = appl.workflowDefinition(workflow).instance(Map.of());
CompletableFuture<WorkflowModel> future = instance.start();
Thread.sleep(50);
assertThat(instance.status()).isEqualTo(WorkflowStatus.WAITING);
instance.suspend();
assertThat(instance.status()).isEqualTo(WorkflowStatus.SUSPENDED);
Thread.sleep(200);
instance.resume();
WorkflowModel model = future.join();
assertThat(instance.status()).isEqualTo(WorkflowStatus.COMPLETED);
assertModel(model);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd use awaitability here next time we make a change. :p

}

private void assertModel(WorkflowModel current) {
assertThat((Collection<Map<String, Object>>) current.asJavaObject())
.containsExactlyInAnyOrderElementsOf(
List.of(
Map.of("helloBranch", Map.of("value", 1)),
Map.of("byeBranch", Map.of("value", 2))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.stream.Stream;
import org.assertj.core.api.Condition;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand All @@ -40,6 +41,11 @@ static void init() {
appl = WorkflowApplication.builder().build();
}

@AfterAll
static void cleanup() {
appl.close();
}

@ParameterizedTest
@MethodSource("provideParameters")
void testWorkflowExecution(String fileName, Object input, Condition<Object> condition)
Expand Down
Loading