diff --git a/memq-actor/src/main/java/io/appform/memq/ActorSystem.java b/memq-actor/src/main/java/io/appform/memq/ActorSystem.java index 05ad293..986704d 100644 --- a/memq-actor/src/main/java/io/appform/memq/ActorSystem.java +++ b/memq-actor/src/main/java/io/appform/memq/ActorSystem.java @@ -2,6 +2,7 @@ import com.codahale.metrics.MetricRegistry; import io.appform.memq.actor.Actor; +import io.appform.memq.actor.DispatcherType; import io.appform.memq.actor.Message; import io.appform.memq.exceptionhandler.config.DropConfig; import io.appform.memq.exceptionhandler.config.ExceptionHandlerConfigVisitor; @@ -30,6 +31,8 @@ public interface ActorSystem extends AutoCloseable { List registeredObservers(); + DispatcherType registeredDispatcher(String name); + boolean isRunning(); default List observers(String name, HighLevelActorConfig config, List observers) { diff --git a/memq-actor/src/main/java/io/appform/memq/HighLevelActor.java b/memq-actor/src/main/java/io/appform/memq/HighLevelActor.java index cf844a9..2dfbe75 100644 --- a/memq-actor/src/main/java/io/appform/memq/HighLevelActor.java +++ b/memq-actor/src/main/java/io/appform/memq/HighLevelActor.java @@ -91,6 +91,7 @@ protected HighLevelActor(String name, this::sideline, actorSystem.createExceptionHandler(highLevelActorConfig, this::sideline), actorSystem.createRetryer(highLevelActorConfig), + actorSystem.registeredDispatcher(name), highLevelActorConfig.getPartitions(), highLevelActorConfig.getMaxSizePerPartition(), highLevelActorConfig.getMaxConcurrencyPerPartition(), diff --git a/memq-actor/src/main/java/io/appform/memq/actor/Actor.java b/memq-actor/src/main/java/io/appform/memq/actor/Actor.java index 0d07525..d894166 100644 --- a/memq-actor/src/main/java/io/appform/memq/actor/Actor.java +++ b/memq-actor/src/main/java/io/appform/memq/actor/Actor.java @@ -1,5 +1,6 @@ package io.appform.memq.actor; +import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import io.appform.memq.observer.ActorObserver; import io.appform.memq.observer.ActorObserverContext; @@ -11,18 +12,9 @@ import lombok.extern.slf4j.Slf4j; import lombok.val; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; @@ -37,7 +29,7 @@ public class Actor implements AutoCloseable { private final String name; private final ExecutorService executorService; - private final ExecutorService messageDispatcher; //TODO::Separate dispatch and add NoDispatch flow + private final Dispatcher messageDispatcher; private final ToIntFunction partitioner; private final Map> mailboxes; private final BiFunction validationHandler; @@ -57,6 +49,7 @@ public Actor( BiConsumer sidelineHandler, TriConsumer exceptionHandler, RetryStrategy retryer, + DispatcherType dispatcherType, int partitions, long maxSizePerPartition, int maxConcurrencyPerPartition, @@ -69,15 +62,19 @@ public Actor( Objects.requireNonNull(consumerHandler, "ConsumerHandler cannot be null"); Objects.requireNonNull(sidelineHandler, "SidelineHandler cannot be null"); Objects.requireNonNull(exceptionHandler, "ExceptionHandler cannot be null"); + Objects.requireNonNull(dispatcherType,"Dispatcher cannot be null"); + Preconditions.checkArgument( + dispatcherType != DispatcherType.SYNC || maxConcurrencyPerPartition == maxSizePerPartition, + "Max Queue size and max concurrency has to be same for sync dispatcher"); this.name = name; this.executorService = executorService; this.validationHandler = validationHandler; this.consumerHandler = consumerHandler; this.sidelineHandler = sidelineHandler; this.exceptionHandler = exceptionHandler; - this.messageDispatcher = Executors.newFixedThreadPool(partitions); this.retryer = retryer; this.partitioner = partitioner; + this.messageDispatcher = dispatcher(dispatcherType, partitions); this.mailboxes = IntStream.range(0, partitions) .boxed() .collect(Collectors.toMap(Function.identity(), i -> new Mailbox(this, i, maxSizePerPartition, maxConcurrencyPerPartition))); @@ -105,9 +102,7 @@ public final long inFlight() { } public final boolean isRunning() { - return mailboxes.values() - .stream() - .allMatch(Mailbox::isRunning); + return messageDispatcher.isRunning(); } public final void purge() { @@ -131,8 +126,71 @@ public final void start() { @Override public final void close() { mailboxes.values().forEach(Mailbox::close); + messageDispatcher.close(); + } + + private void processWithObserver(final InternalMessage internalMessage) { + val observerMessageMeta = new ObserverMessageMeta(internalMessage.getId(), internalMessage.getPublishedAt(), + internalMessage.getValidTill()); + this.rootObserver.execute(ActorObserverContext.builder() + .messageMeta(observerMessageMeta) + .message(internalMessage.getMessage()) + .operation(ActorOperation.CONSUME) + .actorName(this.name) + .build(), + () -> process(internalMessage)); } + private boolean process(final InternalMessage internalMessage) { + val id = internalMessage.getId(); + val message = internalMessage.getMessage(); + var status = false; + var messageMeta = new MessageMeta(internalMessage.getPublishedAt(), + internalMessage.getValidTill(), + internalMessage.getHeaders()); + try { + val valid = this.rootObserver.execute(ActorObserverContext.builder() + .message(message) + .operation(ActorOperation.VALIDATE) + .actorName(this.name) + .build(), + () -> this.validationHandler.apply(message, messageMeta)); + if (!valid) { + log.debug("Message validation failed for message: {}", message); + return false; + } + else { + status = this.retryer.execute(() -> { + messageMeta.incrementAttempt(); + return this.consumerHandler.apply(message, messageMeta); + }); + if (!status) { + log.debug("Consumer failed for message: {}", message); + this.rootObserver.execute(ActorObserverContext.builder() + .message(message) + .operation(ActorOperation.SIDELINE) + .actorName(this.name) + .build(), + () -> { + this.sidelineHandler.accept(message, messageMeta); + return true; + }); + } + } + } catch (Exception e) { + log.error("Error processing message : " + id, e); + this.rootObserver.execute(ActorObserverContext.builder() + .message(message) + .operation(ActorOperation.HANDLE_EXCEPTION) + .actorName(this.name) + .build(), + () -> { + this.exceptionHandler.accept(message, messageMeta, e); + return true; + }); + } + return status; + } private ActorObserver setupObserver(List observers) { //Terminal observer calls the actual method @@ -150,232 +208,308 @@ private ActorObserver setupObserver(List observers) { return startObserver; } + private Dispatcher dispatcher(final DispatcherType dispatcherType, final int partitions) { + return switch (dispatcherType) { + case SYNC -> new SyncDispatcher<>(partitions); + case ASYNC_ISOLATED -> new AsyncIsolatedThreadpoolDispatcher<>(partitions); + }; + } + private static class Mailbox implements AutoCloseable { private final Actor actor; + private final int partition; private final String name; private final long maxSize; private final int maxConcurrency; private final ReentrantLock lock = new ReentrantLock(); - private final Condition checkCondition = lock.newCondition(); private final Map> messages = new LinkedHashMap<>(); private final Set inFlight = new HashSet<>(); - private final AtomicBoolean stopped = new AtomicBoolean(); - private Future monitorFuture; public Mailbox(Actor actor, int partition, long maxSize, int maxConcurrency) { this.actor = actor; this.maxSize = maxSize; this.maxConcurrency = maxConcurrency; + this.partition = partition; this.name = actor.name + "-" + partition; } public final boolean isEmpty() { - lock.lock(); + acquireLock(); try { return messages.isEmpty(); } finally { - lock.unlock(); + releaseLock(); } } public final long size() { - lock.lock(); + acquireLock(); try { return messages.size(); } finally { - lock.unlock(); + releaseLock(); } } public final int inFlight() { - lock.lock(); + acquireLock(); try { return inFlight.size(); } finally { - lock.unlock(); + releaseLock(); } } public final void purge() { - lock.lock(); + acquireLock(); try { messages.clear(); } finally { - lock.unlock(); + releaseLock(); } } - public final boolean isRunning() { - return !stopped.get(); - } - public final void start() { - monitorFuture = actor.messageDispatcher.submit(this::monitor); + acquireLock(); + try { + actor.messageDispatcher.register(this); + } + finally { + releaseLock(); + } } public final boolean publish(final M message) { - lock.lock(); + acquireLock(); try { val currSize = messages.size(); - if (currSize >= this.maxSize) { + if (currSize >= maxSize) { log.warn("Blocking publish for as curr size:{} is more than specified threshold:{}", - currSize, this.maxSize); + currSize, maxSize); return false; } val internalMessage = new InternalMessage<>(message.id(), message.validTill(), System.currentTimeMillis(), message.headers(), message); messages.putIfAbsent(internalMessage.getId(), internalMessage); - checkCondition.signalAll(); + actor.messageDispatcher.triggerDispatch(this); + return true; } finally { - lock.unlock(); + releaseLock(); } - return true; } @Override public final void close() { - lock.lock(); + acquireLock(); try { - stopped.set(true); - checkCondition.signalAll(); + actor.messageDispatcher.unRegister(this); } finally { - lock.unlock(); + releaseLock(); } - if (null != monitorFuture) { - monitorFuture.cancel(true); - } - actor.messageDispatcher.shutdown(); } - private void monitor() { + private void releaseLock() { + lock.unlock(); + } + + private void acquireLock() { lock.lock(); + } + + private void releaseMessage(String id) { + acquireLock(); try { - while (true) { - //We can do the tests twice or just stop - //waiting after sometime and check the conditions anyway - //The set difference operation _might_ be expensive, hence going for the latter approach for now - //Can be changed in the future if needed - checkCondition.await(100, TimeUnit.MILLISECONDS); - if (stopped.get()) { - log.info("Actor {} monitor thread exiting", name); - return; - } - //Find new messages - val newInOrderedMessages = messages.keySet() - .stream() - .limit(this.maxConcurrency) - .collect(Collectors.toSet()); - val newMessageIds = Set.copyOf(Sets.difference(newInOrderedMessages, inFlight)); - if (newMessageIds.isEmpty()) { - if(inFlight.size() == this.maxConcurrency) { - log.warn("Reached max concurrency:{}. Ignoring consumption till inflight messages are consumed", - this.maxConcurrency); - } - else { - log.debug("No new messages. Neither is actor stopped. Ignoring spurious wakeup."); - } - continue; - } - inFlight.addAll(newMessageIds); - val messagesToBeDelivered = newMessageIds.stream() - .map(messages::get) - .toList(); - messagesToBeDelivered.forEach(internalMessage -> actor.executorService.submit(() -> { - val id = internalMessage.getId(); - try { - val observerMessageMeta = new ObserverMessageMeta(id, internalMessage.getPublishedAt(), - internalMessage.getValidTill()); - actor.rootObserver.execute(ActorObserverContext.builder() - .messageMeta(observerMessageMeta) - .message(internalMessage.getMessage()) - .operation(ActorOperation.CONSUME) - .actorName(actor.name) - .build(), - () -> process(internalMessage)); - } - catch (Throwable throwable) { - log.error("Error processing internalMessage", throwable); - } - finally { - releaseMessage(id); - } - })); - } - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.warn("Monitor thread stopped for {}", name); + inFlight.remove(id); + messages.remove(id); } finally { - lock.unlock(); + releaseLock(); } } + } - private boolean process(final InternalMessage internalMessage) { - val id = internalMessage.getId(); - val message = internalMessage.getMessage(); - var status = false; - var messageMeta = new MessageMeta(internalMessage.getPublishedAt(), - internalMessage.getValidTill(), - internalMessage.getHeaders()); - try { - val valid = actor.rootObserver.execute(ActorObserverContext.builder() - .message(message) - .operation(ActorOperation.VALIDATE) - .actorName(actor.name) - .build(), - () -> actor.validationHandler.apply(message, messageMeta)); - if (!valid) { - log.debug("Message validation failed for message: {}", message); - return false; + interface Dispatcher { + void register(Mailbox inMailbox); + void unRegister(Mailbox inMailbox); + void triggerDispatch(Mailbox inMailbox); + boolean isRunning(); + void close(); + + //Always executed inside mailbox lock + default void dispatch(final Mailbox mailbox) { + //Find new messages + val newInOrderedMessages = mailbox.messages.keySet() + .stream() + .limit(mailbox.maxConcurrency) + .collect(Collectors.toSet()); + val newMessageIds = Set.copyOf(Sets.difference(newInOrderedMessages, mailbox.inFlight)); + if (newMessageIds.isEmpty()) { + if(mailbox.inFlight.size() == mailbox.maxConcurrency) { + log.warn("Reached max concurrency:{}. Ignoring consumption till inflight messages are consumed", + mailbox.maxConcurrency); } else { - status = actor.retryer.execute(() -> { - messageMeta.incrementAttempt(); - return actor.consumerHandler.apply(message, messageMeta); - }); - if (!status) { - log.debug("Consumer failed for message: {}", message); - actor.rootObserver.execute(ActorObserverContext.builder() - .message(message) - .operation(ActorOperation.SIDELINE) - .actorName(actor.name) - .build(), - () -> { - actor.sidelineHandler.accept(message, messageMeta); - return true; - }); - } + log.debug("No new messages. Neither is actor stopped. Ignoring spurious dispatch."); } - } catch (Exception e) { - log.error("Error processing message : " + id, e); - actor.rootObserver.execute(ActorObserverContext.builder() - .message(message) - .operation(ActorOperation.HANDLE_EXCEPTION) - .actorName(actor.name) - .build(), - () -> { - actor.exceptionHandler.accept(message, messageMeta, e); - return true; - }); + return; } - return status; + mailbox.inFlight.addAll(newMessageIds); + val messagesToBeDelivered = newMessageIds.stream() + .map(mailbox.messages::get) + .toList(); + messagesToBeDelivered.forEach(internalMessage -> mailbox.actor.executorService.submit(() -> { + val id = internalMessage.getId(); + try { + mailbox.actor.processWithObserver(internalMessage); + } + catch (Throwable throwable) { + log.error("Error processing internalMessage", throwable); + } + finally { + mailbox.releaseMessage(id); + } + })); } + } - private void releaseMessage(String id) { - lock.lock(); - try { - inFlight.remove(id); - messages.remove(id); + private static class SyncDispatcher implements Dispatcher { + + private final Map> registeredMailbox; + + public SyncDispatcher(int partition){ + registeredMailbox = new HashMap<>(partition); + } + + @Override + public void register(final Mailbox inMailbox) { + registeredMailbox.putIfAbsent(inMailbox.partition, inMailbox); + } + + @Override + public void unRegister(final Mailbox inMailbox) { + registeredMailbox.remove(inMailbox.partition); + } + + @Override + public void triggerDispatch(final Mailbox inMailbox) { + //Sync dispatch is executed within mailbox lock + dispatch(inMailbox); + } + + @Override + public boolean isRunning() { + return !registeredMailbox.isEmpty(); + } + + @Override + public void close() { + } + } + + private static class AsyncIsolatedThreadpoolDispatcher implements Dispatcher { + private final ExecutorService executorService; + private final Map> registeredMailboxWorker; + + public AsyncIsolatedThreadpoolDispatcher(int inPartitions) { + this.executorService = Executors.newFixedThreadPool(inPartitions); + this.registeredMailboxWorker = new HashMap<>(inPartitions); + } + + @Override + public final void register(final Mailbox inMailbox) { + val mailBoxAsyncDispatcherWorker = new AsyncDispatcherWorker<>(inMailbox, this); + registeredMailboxWorker.putIfAbsent(inMailbox.partition, mailBoxAsyncDispatcherWorker); + mailBoxAsyncDispatcherWorker.start(executorService::submit); + } + + @Override + public final void unRegister(final Mailbox inMailbox) { + if(registeredMailboxWorker.containsKey(inMailbox.partition)) { + registeredMailboxWorker.get(inMailbox.partition).close(); + registeredMailboxWorker.remove(inMailbox.partition); } - finally { - lock.unlock(); + } + + @Override + public final void triggerDispatch(final Mailbox inMailbox) { + registeredMailboxWorker.get(inMailbox.partition).trigger(); + } + + @Override + public final boolean isRunning() { + return !registeredMailboxWorker.isEmpty() + && registeredMailboxWorker.values() + .stream() + .allMatch(AsyncDispatcherWorker::isRunning); + } + + + @Override + public final void close() { + executorService.shutdown(); + } + + public static class AsyncDispatcherWorker implements AutoCloseable { + private final Mailbox mailbox; + private final AtomicBoolean stopped; + private final Condition checkCondition; + private Future monitoredFuture; + + public AsyncDispatcherWorker(Mailbox inMailbox, Dispatcher inDispatcher) { + mailbox = inMailbox; + stopped = new AtomicBoolean(false); + checkCondition = inMailbox.lock.newCondition(); + } + + public final void start(final Function> taskSubmitter) { + monitoredFuture = taskSubmitter.apply(this::monitor); + } + + public final void close() { + stopped.set(true); + checkCondition.signalAll(); + monitoredFuture.cancel(true); + } + + public final void trigger() { + checkCondition.signalAll(); + } + + public final boolean isRunning() { + return !stopped.get(); + } + + private void monitor() { + val name = mailbox.name; + mailbox.acquireLock(); + try { + while (true) { + //We can do the tests twice or just stop + //waiting after sometime and check the conditions anyway + //The set difference operation _might_ be expensive, hence going for the latter approach for now + //Can be changed in the future if needed + checkCondition.await(100, TimeUnit.MILLISECONDS); + if (stopped.get()) { + log.info("Actor {} monitor thread exiting", name); + return; + } + mailbox.actor.messageDispatcher.dispatch(mailbox); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("Monitor thread stopped for {}", name); + } + finally { + mailbox.releaseLock(); + } } } } + } diff --git a/memq-actor/src/main/java/io/appform/memq/actor/DispatcherType.java b/memq-actor/src/main/java/io/appform/memq/actor/DispatcherType.java new file mode 100644 index 0000000..82a5505 --- /dev/null +++ b/memq-actor/src/main/java/io/appform/memq/actor/DispatcherType.java @@ -0,0 +1,6 @@ +package io.appform.memq.actor; + +public enum DispatcherType { + SYNC, + ASYNC_ISOLATED, +} diff --git a/memq-actor/src/test/java/io/appform/memq/ActorConcurrencyTest.java b/memq-actor/src/test/java/io/appform/memq/ActorConcurrencyTest.java index f3f794b..51b4261 100644 --- a/memq-actor/src/test/java/io/appform/memq/ActorConcurrencyTest.java +++ b/memq-actor/src/test/java/io/appform/memq/ActorConcurrencyTest.java @@ -2,16 +2,18 @@ import com.codahale.metrics.Meter; import io.appform.memq.actor.ActorOperation; +import io.appform.memq.actor.DispatcherType; import io.appform.memq.helper.TestUtil; import io.appform.memq.helper.message.TestIntMessage; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import lombok.val; import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; import java.time.Duration; import java.util.List; +import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -21,30 +23,72 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @Slf4j -@ExtendWith(MemQTestExtension.class) public class ActorConcurrencyTest { @Test - public void testMaxConcurrency(ActorSystem actorSystem) { + @SneakyThrows + public void testMaxConcurrencyAsyncDispactcher() { val concurrency = ThreadLocalRandom.current().nextInt(1, 5); val metricPrefix = "actor." + TestUtil.HighLevelActorType.BLOCKING_ACTOR.name() + "."; val counter = new AtomicInteger(); val sideline = new AtomicBoolean(); val blockConsume = new AtomicBoolean(true); - val actorConfig = TestUtil.noRetryActorConfig(Constants.SINGLE_PARTITION, false, Long.MAX_VALUE, concurrency); - val actor = TestUtil.blockingActor(counter, sideline, blockConsume, - actorConfig, actorSystem, List.of()); - IntStream.range(0, 10).boxed().forEach(i -> { - val publish = actor.publish(new TestIntMessage(i)); - assertTrue(publish); - }); - assertEquals(concurrency, actor.inFlight()); - blockConsume.set(false); - Awaitility.await() - .timeout(Duration.ofMinutes(1)) - .catchUncaughtExceptions() - .until(actor::isEmpty); - val metrics = actorSystem.metricRegistry().getMetrics(); - assertEquals(10, ((Meter) metrics.get(metricPrefix + ActorOperation.PUBLISH.name() + ".total")).getCount()); + val actorConfig = TestUtil.noRetryActorConfig(Constants.SINGLE_PARTITION, false, Long.MAX_VALUE, concurrency); + val tc = Executors.newFixedThreadPool(TestUtil.DEFAULT_THREADPOOL_SIZE); + try (val actorSystem = TestUtil.actorSystem(tc, DispatcherType.ASYNC_ISOLATED)) { + val actor = TestUtil.blockingActor(counter, sideline, blockConsume, + actorConfig, actorSystem, List.of()); + IntStream.range(0, 10).boxed().forEach(i -> { + val publish = actor.publish(new TestIntMessage(i)); + assertTrue(publish); + }); + assertEquals(concurrency, actor.inFlight()); + blockConsume.set(false); + Awaitility.await() + .timeout(Duration.ofMinutes(1)) + .catchUncaughtExceptions() + .until(actor::isEmpty); + val metrics = actorSystem.metricRegistry().getMetrics(); + assertEquals(10, ((Meter) metrics.get(metricPrefix + ActorOperation.PUBLISH.name() + ".total")).getCount()); + } + } + + @Test + @SneakyThrows + public void testMaxConcurrencySyncDispatcher() { + val concurrency = ThreadLocalRandom.current().nextInt(1, 5); + val metricPrefix = "actor." + TestUtil.HighLevelActorType.BLOCKING_ACTOR.name() + "."; + val counter = new AtomicInteger(); + val sideline = new AtomicBoolean(); + val blockConsume = new AtomicBoolean(true); + val actorConfig = TestUtil.noRetryActorConfig(Constants.SINGLE_PARTITION, false, concurrency, concurrency); + val tc = Executors.newFixedThreadPool(TestUtil.DEFAULT_THREADPOOL_SIZE); + try (val actorSystem = TestUtil.actorSystem(tc, DispatcherType.SYNC)) { + val actor = TestUtil.blockingActor(counter, sideline, blockConsume, + actorConfig, actorSystem, List.of()); + val concurrencyBreached = new AtomicInteger(0); + IntStream.range(0, 10).boxed().forEach(i -> { + val message = new TestIntMessage(i); + while (!actor.publish(message)) { + log.debug("Publish failed, retrying for message:{}", message); + if (actor.inFlight() == actorConfig.getMaxConcurrencyPerPartition()) { + log.debug("Unlocking consume as max currency is achieved while publishing message:{}", message); + concurrencyBreached.incrementAndGet(); + blockConsume.set(false); + } + } + if (!blockConsume.get()) { + blockConsume.set(true); + } + }); + assertTrue(concurrencyBreached.get() > 0); + blockConsume.set(false); + Awaitility.await() + .timeout(Duration.ofMinutes(1)) + .catchUncaughtExceptions() + .until(actor::isEmpty); + val metrics = actorSystem.metricRegistry().getMetrics(); + assertEquals(10, ((Meter) metrics.get(metricPrefix + ActorOperation.PUBLISH.name() + ".success")).getCount()); + } } } diff --git a/memq-actor/src/test/java/io/appform/memq/HighLevelActorTest.java b/memq-actor/src/test/java/io/appform/memq/HighLevelActorTest.java index eb146c6..e96e04d 100644 --- a/memq-actor/src/test/java/io/appform/memq/HighLevelActorTest.java +++ b/memq-actor/src/test/java/io/appform/memq/HighLevelActorTest.java @@ -10,7 +10,7 @@ import lombok.extern.slf4j.Slf4j; import lombok.val; import org.awaitility.Awaitility; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; import java.time.Duration; @@ -35,19 +35,19 @@ enum HighLevelActorType { static final int THREAD_POOL_SIZE = 10; - @Test + @TestTemplate @SneakyThrows void testSuccessSinglePartition(ActorSystem actorSystem) { testSuccess(1, actorSystem); } - @Test + @TestTemplate @SneakyThrows void testSuccessMultiPartition(ActorSystem actorSystem) { testSuccess(4, actorSystem); } - @Test + @TestTemplate @SneakyThrows void testBoundedMailboxActorTest(ActorSystem actorSystem) { val metricPrefix = "actor." + TestUtil.HighLevelActorType.BLOCKING_ACTOR.name() + "."; @@ -55,7 +55,7 @@ void testBoundedMailboxActorTest(ActorSystem actorSystem) { val sideline = new AtomicBoolean(); val blockConsume = new AtomicBoolean(true); val actorConfig = TestUtil.noRetryActorConfig(Constants.SINGLE_PARTITION, false, - 1L); + 1); val actor = TestUtil.blockingActor(counter, sideline, blockConsume, actorConfig, actorSystem, List.of()); val publish = actor.publish(new TestIntMessage(1)); @@ -74,7 +74,7 @@ void testBoundedMailboxActorTest(ActorSystem actorSystem) { val metrics = actorSystem.metricRegistry().getMetrics(); assertEquals(11, ((Meter) metrics.get(metricPrefix + ActorOperation.PUBLISH.name() + ".total")).getCount()); assertEquals(1, ((Meter) metrics.get(metricPrefix + ActorOperation.PUBLISH.name() + ".success")).getCount()); - assertEquals(10, ((Meter) metrics.get(metricPrefix + ActorOperation.PUBLISH.name() + ".failed")).getCount()); + assertEquals(10, ((Meter) metrics.get(metricPrefix + ActorOperation.PUBLISH.name() + ".failed")).getCount()); } @@ -88,9 +88,12 @@ void testSuccess(int partition, ActorSystem actorSystem) { IntStream.rangeClosed(1, 10) .forEach(i -> IntStream.rangeClosed(1, 1000).forEach(j -> tp.submit(() -> a.publish(new TestIntMessage(1))))); Awaitility.await() - .timeout(Duration.ofMinutes(1)) + .timeout(Duration.ofSeconds(10)) .catchUncaughtExceptions() - .until(() -> sum.get() == 10_000); + .until(() -> { + log.info("Test adder sum:{}", sum.get()); + return sum.get() == 10_000; + }); log.info("Test took {} ms", s.elapsed().toMillis()); assertEquals(10_000, sum.get()); @@ -99,13 +102,13 @@ void testSuccess(int partition, ActorSystem actorSystem) { } } - @Test + @TestTemplate void testPurge(ActorSystem actorSystem) { val counter = new AtomicInteger(); val sideline = new AtomicBoolean(); val blockConsume = new AtomicBoolean(true); val actorConfig = TestUtil.noRetryActorConfig(Constants.SINGLE_PARTITION, false, - 1L); + 1); val actor = TestUtil.blockingActor(counter, sideline, blockConsume, actorConfig, actorSystem, List.of()); val publish = actor.publish(new TestIntMessage(1)); diff --git a/memq-actor/src/test/java/io/appform/memq/MemQTestExtension.java b/memq-actor/src/test/java/io/appform/memq/MemQTestExtension.java index 6348f0f..63bb108 100644 --- a/memq-actor/src/test/java/io/appform/memq/MemQTestExtension.java +++ b/memq-actor/src/test/java/io/appform/memq/MemQTestExtension.java @@ -1,16 +1,20 @@ package io.appform.memq; +import io.appform.memq.actor.DispatcherType; import io.appform.memq.helper.TestUtil; import org.awaitility.Awaitility; import org.junit.jupiter.api.extension.*; import java.time.Duration; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.Executors; +import java.util.stream.Stream; /** * */ -public class MemQTestExtension implements BeforeEachCallback, AfterEachCallback, ParameterResolver { +public class MemQTestExtension implements BeforeEachCallback, AfterEachCallback, TestTemplateInvocationContextProvider { private ActorSystem actorSystem; @Override @@ -24,20 +28,46 @@ public void afterEach(ExtensionContext extensionContext) throws Exception { @Override public void beforeEach(ExtensionContext extensionContext) throws Exception { - actorSystem = TestUtil.actorSystem(Executors.newFixedThreadPool(TestUtil.DEFAULT_THREADPOOL_SIZE)); } @Override - public boolean supportsParameter( - ParameterContext parameterContext, - ExtensionContext extensionContext) throws ParameterResolutionException { - return parameterContext.getParameter().getType().equals(ActorSystem.class); + public boolean supportsTestTemplate(ExtensionContext extensionContext) { + return true; } @Override - public Object resolveParameter( - ParameterContext parameterContext, - ExtensionContext extensionContext) throws ParameterResolutionException { - return actorSystem; + public Stream provideTestTemplateInvocationContexts(ExtensionContext extensionContext) { + return Arrays.stream(DispatcherType.values()) + .map(dispatcherType -> createContext(dispatcherType, extensionContext)); + } + + private TestTemplateInvocationContext createContext(DispatcherType dispatcherType, ExtensionContext extensionContext) { + return new TestTemplateInvocationContext() { + @Override + public String getDisplayName(int invocationIndex) { + return "Executing with dispatcherType: " + dispatcherType.name(); + } + + @Override + public List getAdditionalExtensions() { + return List.of(new ParameterResolver() { + @Override + public boolean supportsParameter( + ParameterContext parameterContext, + ExtensionContext extensionContext) throws ParameterResolutionException { + return parameterContext.getParameter().getType().equals(ActorSystem.class); + } + + @Override + public Object resolveParameter( + ParameterContext parameterContext, + ExtensionContext extensionContext) throws ParameterResolutionException { + actorSystem = TestUtil.actorSystem(Executors.newFixedThreadPool(TestUtil.DEFAULT_THREADPOOL_SIZE), + dispatcherType); + return actorSystem; + } + }); + } + }; } } diff --git a/memq-actor/src/test/java/io/appform/memq/MessageMetaTest.java b/memq-actor/src/test/java/io/appform/memq/MessageMetaTest.java index 41bb59d..450ade8 100644 --- a/memq-actor/src/test/java/io/appform/memq/MessageMetaTest.java +++ b/memq-actor/src/test/java/io/appform/memq/MessageMetaTest.java @@ -1,5 +1,6 @@ package io.appform.memq; +import io.appform.memq.actor.DispatcherType; import io.appform.memq.actor.Message; import io.appform.memq.actor.MessageMeta; import io.appform.memq.exceptionhandler.config.SidelineConfig; @@ -36,7 +37,7 @@ void testMessageMetaWithRetry() throws Exception { val recordedPublishTime = new AtomicLong(-1); val publisherTime = new AtomicLong(-1); val tc = Executors.newFixedThreadPool(TestUtil.DEFAULT_THREADPOOL_SIZE); - try (val actorSystem = TestUtil.actorSystem(tc)) { + try (val actorSystem = TestUtil.actorSystem(tc, TestUtil.DEFAULT_DISPATCHER)) { val highLevelActorConfig = HighLevelActorConfig.builder() .partitions(Constants.SINGLE_PARTITION) .maxSizePerPartition(Long.MAX_VALUE) diff --git a/memq-actor/src/test/java/io/appform/memq/actor/ActorTest.java b/memq-actor/src/test/java/io/appform/memq/actor/ActorTest.java index 29e4e1e..adb7d37 100644 --- a/memq-actor/src/test/java/io/appform/memq/actor/ActorTest.java +++ b/memq-actor/src/test/java/io/appform/memq/actor/ActorTest.java @@ -29,23 +29,36 @@ class ActorTest { @Test @SneakyThrows - void testSuccessSinglePartition() { - testSuccess(1); + void testSuccessSinglePartitionAsyncDispacther() { + testSuccess(1, DispatcherType.ASYNC_ISOLATED); } @Test @SneakyThrows - void testSuccessMultiPartition() { - testSuccess(4); + void testSuccessMultiPartitionAsyncDispacther() { + testSuccess(4, DispatcherType.ASYNC_ISOLATED); } + @Test + @SneakyThrows + void testSuccessSinglePartitionSyncDispacther() { + testSuccess(1, DispatcherType.SYNC); + } + + @Test @SneakyThrows - void testSuccess(int partition) { + void testSuccessMultiPartitionSyncDispacther() { + testSuccess(4, DispatcherType.SYNC); + } + + + @SneakyThrows + void testSuccess(int partition, DispatcherType dispatcherType) { val sum = new AtomicInteger(0); val tp = Executors.newFixedThreadPool(THREADPOOL_SIZE); val tc = Executors.newFixedThreadPool(THREADPOOL_SIZE); - try (val a = adder(sum, partition, tc)) { + try (val a = adder(sum, partition, tc, dispatcherType)) { a.start(); val s = Stopwatch.createStarted(); IntStream.rangeClosed(1, 10) @@ -64,7 +77,7 @@ void testSuccess(int partition) { } } - static Actor adder(final AtomicInteger sum, int partition, ExecutorService tc) { + static Actor adder(final AtomicInteger sum, int partition, ExecutorService tc, DispatcherType dispatcherType) { return new Actor<>("Adder", tc, (message, messageMeta) -> true, @@ -77,8 +90,9 @@ static Actor adder(final AtomicInteger sum, int partition, Execu (message, messageMeta, throwable) -> { }, new NoRetryStrategy(new NoRetryConfig()), + dispatcherType, partition, - Long.MAX_VALUE, + Integer.MAX_VALUE, Integer.MAX_VALUE, message -> Math.absExact(message.id().hashCode()) % partition, new ArrayList<>()); diff --git a/memq-actor/src/test/java/io/appform/memq/exceptionHandler/ExceptionHandlingActorTest.java b/memq-actor/src/test/java/io/appform/memq/exceptionHandler/ExceptionHandlingActorTest.java index b36d1ad..72adc01 100644 --- a/memq-actor/src/test/java/io/appform/memq/exceptionHandler/ExceptionHandlingActorTest.java +++ b/memq-actor/src/test/java/io/appform/memq/exceptionHandler/ExceptionHandlingActorTest.java @@ -11,7 +11,7 @@ import lombok.SneakyThrows; import lombok.val; import org.awaitility.Awaitility; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; import java.time.Duration; @@ -24,13 +24,13 @@ @ExtendWith(MemQTestExtension.class) class ExceptionHandlingActorTest { - @Test + @TestTemplate void testSidelineExceptionConfig(ActorSystem actorSystem) { val sideline = triggerMessageToExceptionActor(new SidelineConfig(), actorSystem); assertTrue(sideline.get()); } - @Test + @TestTemplate void testDropExceptionConfig(ActorSystem actorSystem) { val sideline = triggerMessageToExceptionActor(new DropConfig(), actorSystem); assertFalse(sideline.get()); diff --git a/memq-actor/src/test/java/io/appform/memq/helper/TestUtil.java b/memq-actor/src/test/java/io/appform/memq/helper/TestUtil.java index 29ab953..768f018 100644 --- a/memq-actor/src/test/java/io/appform/memq/helper/TestUtil.java +++ b/memq-actor/src/test/java/io/appform/memq/helper/TestUtil.java @@ -6,6 +6,7 @@ import io.appform.memq.HighLevelActor; import io.appform.memq.actor.Actor; import io.appform.memq.HighLevelActorConfig; +import io.appform.memq.actor.DispatcherType; import io.appform.memq.exceptionhandler.config.ExceptionHandlerConfig; import io.appform.memq.exceptionhandler.config.SidelineConfig; import io.appform.memq.helper.message.TestIntMessage; @@ -33,8 +34,9 @@ public enum HighLevelActorType { public static final String GLOBAL_EXECUTOR_SERVICE_GROUP = "global"; public static final int DEFAULT_THREADPOOL_SIZE = 2; + public static final DispatcherType DEFAULT_DISPATCHER = DispatcherType.ASYNC_ISOLATED; - public static ActorSystem actorSystem(ExecutorService tp) { + public static ActorSystem actorSystem(ExecutorService tp, DispatcherType dispatcherType) { val metricRegistry = new MetricRegistry(); return new ActorSystem() { private final RetryStrategyFactory retryStrategyFactory = new RetryStrategyFactory(); @@ -66,6 +68,11 @@ public List registeredObservers() { return List.of(); } + @Override + public DispatcherType registeredDispatcher(String name) { + return dispatcherType; + } + @Override public boolean isRunning() { return !registeredActors.isEmpty() && registeredActors.stream().allMatch(Actor::isRunning); @@ -93,7 +100,7 @@ public static HighLevelActor blockingActor(f protected boolean handle(TestIntMessage message, MessageMeta messageMeta) { counter.addAndGet(message.getValue()); while(blockConsume.get()) { - Awaitility.waitAtMost(Duration.ofMillis(100)); + Awaitility.waitAtMost(Duration.ofMillis(20)); } return true; } @@ -172,13 +179,13 @@ public static HighLevelActorConfig noRetryActorConfig(int partition) { public static HighLevelActorConfig noRetryActorConfig(int partition, boolean metricDisabled, ExceptionHandlerConfig exceptionHandlerConfig) { - return noRetryActorConfig(partition, metricDisabled, exceptionHandlerConfig, Long.MAX_VALUE, Integer.MAX_VALUE); + return noRetryActorConfig(partition, metricDisabled, exceptionHandlerConfig, Integer.MAX_VALUE, Integer.MAX_VALUE); } public static HighLevelActorConfig noRetryActorConfig(int partition, boolean metricDisabled, - long maxSizePerPartition) { - return noRetryActorConfig(partition, metricDisabled, new SidelineConfig(), maxSizePerPartition, Integer.MAX_VALUE); + int maxSizePerPartition) { + return noRetryActorConfig(partition, metricDisabled, new SidelineConfig(), maxSizePerPartition, maxSizePerPartition); } public static HighLevelActorConfig noRetryActorConfig(int partition, diff --git a/memq-actor/src/test/java/io/appform/memq/observer/MetricObserverActorTest.java b/memq-actor/src/test/java/io/appform/memq/observer/MetricObserverActorTest.java index 56bcba1..90d2224 100644 --- a/memq-actor/src/test/java/io/appform/memq/observer/MetricObserverActorTest.java +++ b/memq-actor/src/test/java/io/appform/memq/observer/MetricObserverActorTest.java @@ -10,7 +10,7 @@ import lombok.SneakyThrows; import lombok.val; import org.awaitility.Awaitility; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; import java.time.Duration; @@ -22,7 +22,7 @@ @ExtendWith(MemQTestExtension.class) class MetricObserverActorTest { - @Test + @TestTemplate @SneakyThrows void testMetrics(ActorSystem actorSystem) { val actorName = TestUtil.HighLevelActorType.EXCEPTION_ACTOR.name(); @@ -57,7 +57,7 @@ void testMetrics(ActorSystem actorSystem) { assertEquals(actorName, actor.getType().name()); } - @Test + @TestTemplate @SneakyThrows void testNoMetrics(ActorSystem actorSystem) { val counter = new AtomicInteger(); diff --git a/memq-actor/src/test/java/io/appform/memq/retry/RetryActorTest.java b/memq-actor/src/test/java/io/appform/memq/retry/RetryActorTest.java index aa5ca2f..19f6e90 100644 --- a/memq-actor/src/test/java/io/appform/memq/retry/RetryActorTest.java +++ b/memq-actor/src/test/java/io/appform/memq/retry/RetryActorTest.java @@ -13,7 +13,6 @@ import org.junit.jupiter.api.Test; import java.time.Duration; -import java.util.Date; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -121,7 +120,7 @@ AtomicInteger triggerMessageToExceptionActor(RetryConfig retryConfig) { val counter = new AtomicInteger(); val sideline = new AtomicBoolean(); val tc = Executors.newFixedThreadPool(TestUtil.DEFAULT_THREADPOOL_SIZE); - try (val actorSystem = TestUtil.actorSystem(tc)) { + try (val actorSystem = TestUtil.actorSystem(tc, TestUtil.DEFAULT_DISPATCHER)) { val highLevelActorConfig = getHighLevelActorConfig(retryConfig); val actor = TestUtil.allExceptionActor(counter, sideline, highLevelActorConfig, actorSystem); @@ -139,14 +138,14 @@ AtomicInteger triggerMessageToSuccessAfterNumberOfExceptionsActor(RetryConfig re val counter = new AtomicInteger(); val sideline = new AtomicBoolean(); val tc = Executors.newFixedThreadPool(TestUtil.DEFAULT_THREADPOOL_SIZE); - try (val actorSystem = TestUtil.actorSystem(tc)) { + try (val actorSystem = TestUtil.actorSystem(tc, TestUtil.DEFAULT_DISPATCHER)) { val highLevelActorConfig = getHighLevelActorConfig(retryConfig); val actor = TestUtil.successAfterNumberOfExceptionsActor(counter, sideline, highLevelActorConfig, actorSystem, MAX_NUMBER_OF_EXCEPTIONS); actor.publish(new TestIntMessage(1)); Awaitility.await() .pollDelay(Duration.ofMillis(5)) - .timeout(Duration.ofMillis(10)) + .timeout(Duration.ofMillis(500)) .catchUncaughtExceptions() .until(actor::isEmpty); return counter; diff --git a/memq-dw-bundle/src/main/java/io/appform/MemqActorSystem.java b/memq-dw-bundle/src/main/java/io/appform/MemqActorSystem.java index 8a752ac..e04f28a 100644 --- a/memq-dw-bundle/src/main/java/io/appform/MemqActorSystem.java +++ b/memq-dw-bundle/src/main/java/io/appform/MemqActorSystem.java @@ -6,6 +6,7 @@ import io.appform.memq.ActorSystem; import io.appform.memq.actor.Actor; import io.appform.memq.HighLevelActorConfig; +import io.appform.memq.actor.DispatcherType; import io.appform.memq.observer.ActorObserver; import io.appform.memq.retry.RetryStrategy; import io.appform.memq.retry.RetryStrategyFactory; @@ -87,6 +88,11 @@ public List registeredObservers() { return List.copyOf(this.actorObservers); } + @Override + public DispatcherType registeredDispatcher(String name) { + return DispatcherType.ASYNC_ISOLATED; + } + @Override public boolean isRunning() { return !registeredActors.isEmpty() && registeredActors.stream().allMatch(Actor::isRunning);