Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions memq-actor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@
<artifactId>jackson-databind</artifactId>
<version>2.16.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<version>2.13.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
Expand Down
6 changes: 4 additions & 2 deletions memq-actor/src/main/java/io/appform/memq/ActorSystem.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package io.appform.memq;

import com.codahale.metrics.MetricRegistry;
import io.appform.memq.actor.Actor;
import io.appform.memq.actor.IActor;
import io.appform.memq.actor.Message;
import io.appform.memq.exceptionhandler.config.DropConfig;
import io.appform.memq.exceptionhandler.config.ExceptionHandlerConfigVisitor;
import io.appform.memq.exceptionhandler.config.SidelineConfig;
import io.appform.memq.actor.MessageMeta;
import io.appform.memq.hierarchical.IHierarchicalActor;
import io.appform.memq.observer.ActorObserver;
import io.appform.memq.retry.RetryStrategy;
import io.appform.memq.stats.ActorMetricObserver;
Expand All @@ -20,7 +21,8 @@

public interface ActorSystem extends AutoCloseable {

void register(Actor<?> actor);
void register(IActor<?> actor);
void register(IHierarchicalActor<?> actor);

ExecutorService createOrGetExecutorService(HighLevelActorConfig config);

Expand Down
3 changes: 2 additions & 1 deletion memq-actor/src/main/java/io/appform/memq/HighLevelActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


import io.appform.memq.actor.Actor;
import io.appform.memq.actor.IActor;
import io.appform.memq.actor.Message;
import io.appform.memq.actor.MessageMeta;
import io.appform.memq.observer.ActorObserver;
Expand All @@ -16,7 +17,7 @@ public abstract class HighLevelActor<MessageType extends Enum<MessageType>, M ex

@Getter
private final MessageType type;
private final Actor<M> actor;
protected final IActor<M> actor;

@SuppressWarnings("unused")
protected HighLevelActor(
Expand Down
22 changes: 11 additions & 11 deletions memq-actor/src/main/java/io/appform/memq/HighLevelActorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,48 +5,48 @@
import io.appform.memq.retry.config.NoRetryConfig;
import io.appform.memq.retry.config.RetryConfig;
import lombok.*;
import lombok.extern.jackson.Jacksonized;

import javax.validation.Valid;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

@Value
@Builder
@Jacksonized
@Data
@EqualsAndHashCode
@ToString
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class HighLevelActorConfig {

@Min(1)
@Max(100)
@Builder.Default
int partitions = 1;
private int partitions = 1;

@Min(1)
@Builder.Default
long maxSizePerPartition = Long.MAX_VALUE;
private long maxSizePerPartition = Long.MAX_VALUE;

@Min(1)
@Builder.Default
int maxConcurrencyPerPartition = Integer.MAX_VALUE;
private int maxConcurrencyPerPartition = Integer.MAX_VALUE;

@Valid
@NotNull
@Builder.Default
RetryConfig retryConfig = new NoRetryConfig();
private RetryConfig retryConfig = new NoRetryConfig();

@Valid
@NotNull
@Builder.Default
ExceptionHandlerConfig exceptionHandlerConfig = new DropConfig();
private ExceptionHandlerConfig exceptionHandlerConfig = new DropConfig();

@NotNull
@Builder.Default
String executorName = "default";
private String executorName = "default";

@Builder.Default
boolean metricDisabled = false;
private boolean metricDisabled = false;

}
9 changes: 8 additions & 1 deletion memq-actor/src/main/java/io/appform/memq/actor/Actor.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import java.util.stream.IntStream;

@Slf4j
public class Actor<M extends Message> implements AutoCloseable {
public class Actor<M extends Message> implements IActor<M> {

private final String name;
private final ExecutorService executorService;
Expand Down Expand Up @@ -84,36 +84,42 @@ public Actor(
this.rootObserver = setupObserver(observers);
}

@Override
public final boolean isEmpty() {
return mailboxes.values()
.stream()
.allMatch(Mailbox::isEmpty);
}

@Override
public final long size() {
return mailboxes.values()
.stream()
.mapToLong(Mailbox::size)
.sum();
}

@Override
public final long inFlight() {
return mailboxes.values()
.stream()
.mapToLong(Mailbox::inFlight)
.sum();
}

@Override
public final boolean isRunning() {
return mailboxes.values()
.stream()
.allMatch(Mailbox::isRunning);
}

@Override
public final void purge() {
mailboxes.values().forEach(Mailbox::purge);
}

@Override
public final boolean publish(final M message) {
return rootObserver.execute(ActorObserverContext.builder()
.operation(ActorOperation.PUBLISH)
Expand All @@ -124,6 +130,7 @@ public final boolean publish(final M message) {
.publish(message));
}

@Override
public final void start() {
mailboxes.values().forEach(Mailbox::start);
}
Expand Down
15 changes: 15 additions & 0 deletions memq-actor/src/main/java/io/appform/memq/actor/IActor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.appform.memq.actor;

public interface IActor<M extends Message> extends AutoCloseable {

void start();
void close();

boolean isEmpty();
long size();
long inFlight();
boolean isRunning();
void purge();

boolean publish(final M message);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.appform.memq.hierarchical;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonUnwrapped;
import io.appform.memq.HighLevelActorConfig;
import io.appform.memq.hierarchical.tree.HierarchicalDataStoreTreeNode;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.ToString;

@JsonInclude(JsonInclude.Include.NON_NULL)
@Data
@EqualsAndHashCode
@ToString
@NoArgsConstructor
public class HierarchialHighLevelActorConfig extends HighLevelActorConfig {

/**
* <p>This param will reused all Parent Level ActorConfig while creating all child actors,
* if marked as false, every children will need tp provide Actor config specific to child</p>
*
*/
private boolean useParentConfigInWorker = true;

@JsonUnwrapped
private HierarchicalDataStoreTreeNode<String, HierarchicalOperationWorkerConfig> childrenData;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package io.appform.memq.hierarchical;

import io.appform.memq.ActorSystem;
import io.appform.memq.actor.Message;
import io.appform.memq.actor.MessageMeta;
import io.appform.memq.hierarchical.tree.HierarchicalDataStoreSupplierTree;
import io.appform.memq.hierarchical.tree.HierarchicalTreeConfig;
import io.appform.memq.hierarchical.tree.key.HierarchicalRoutingKey;
import io.appform.memq.hierarchical.tree.key.RoutingKey;
import io.appform.memq.observer.ActorObserver;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import lombok.val;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.ToIntFunction;

@Slf4j
public class HierarchicalActor<MessageType extends Enum<MessageType>, M extends Message> implements IHierarchicalActor<M> {

public static final RoutingKey EMPTY_ROUTING_KEY = RoutingKey.builder().build();

private final HierarchicalTreeConfig<HierarchialHighLevelActorConfig, String, HierarchicalOperationWorkerConfig> hierarchicalTreeConfig;
private final MessageType messageType;
private final ActorSystem actorSystem;
private final ToIntFunction<M> partitioner;
private final List<ActorObserver> observers;
private final BiFunction<M, MessageMeta, Boolean> messageHandler;
private final BiConsumer<M, MessageMeta> sidelineHandler;

@Getter
private HierarchicalDataStoreSupplierTree<
HierarchicalOperationWorkerConfig,
HierarchialHighLevelActorConfig,
MessageType,
HierarchicalOperationWorker<MessageType, ? extends Message>> worker;


public HierarchicalActor(MessageType messageType,
HierarchialHighLevelActorConfig hierarchicalActorConfig,
ActorSystem actorSystem,
BiFunction<M, MessageMeta, Boolean> messageHandler,
BiConsumer<M, MessageMeta> sidelineHandler,
ToIntFunction<M> partitioner,
List<ActorObserver> observers) {
this.messageType = messageType;
this.hierarchicalTreeConfig = new HierarchicalTreeConfig<>(hierarchicalActorConfig, hierarchicalActorConfig.getChildrenData());
this.actorSystem = actorSystem;
this.messageHandler = messageHandler;
this.sidelineHandler = sidelineHandler;
this.partitioner = partitioner;
this.observers = observers;
}

@Override
public void start() {
log.info("Starting all workers");
this.initializeRouter();
}

@Override
public void close() {
log.info("Closing all workers");
worker.traverse(hierarchicalOperationWorker -> {
log.info("Closing worker: {} {}", hierarchicalOperationWorker.getType(), hierarchicalOperationWorker.getRoutingKey().getRoutingKey());
hierarchicalOperationWorker.close();
});
}

@Override
public void purge() {
log.info("Purging all workers");
worker.traverse(hierarchicalOperationWorker -> {
log.info("Purging worker: {} {}", hierarchicalOperationWorker.getType(), hierarchicalOperationWorker.getRoutingKey().getRoutingKey());
hierarchicalOperationWorker.purge();
});
}

@Override
public boolean publish(final M message) {
return publishActor(EMPTY_ROUTING_KEY).publish(message);
}

@Override
public boolean publish(final HierarchicalRoutingKey<String> routingKey,
final M message) {
return publishActor(routingKey).publish(message);
}

@Override
public long size() {
log.info("Size of all workers");
val atomicLong = new AtomicLong();
worker.traverse(hierarchicalOperationWorker -> {
log.info("Size of worker: {} {}", hierarchicalOperationWorker.getType(), hierarchicalOperationWorker.getRoutingKey().getRoutingKey());
atomicLong.getAndAdd(hierarchicalOperationWorker.size());
});
return atomicLong.get();
}

@Override
public long inFlight() {
log.info("inFlight Size of all workers");
val atomicLong = new AtomicLong();
worker.traverse(hierarchicalOperationWorker -> {
log.info("inFlight Size of worker: {} {}", hierarchicalOperationWorker.getType(), hierarchicalOperationWorker.getRoutingKey().getRoutingKey());
atomicLong.getAndAdd(hierarchicalOperationWorker.inFlight());
});
return atomicLong.get();
}

@Override
public boolean isEmpty() {
log.info("isEmpty all workers");
val atomicBoolean = new AtomicBoolean();
worker.traverse(hierarchicalOperationWorker -> {
log.info("isEmpty worker: {} {}", hierarchicalOperationWorker.getType(), hierarchicalOperationWorker.getRoutingKey().getRoutingKey());
atomicBoolean.set(atomicBoolean.get() && hierarchicalOperationWorker.isEmpty());
});
return atomicBoolean.get();
}

@Override
public boolean isRunning() {
log.info("isRunning all workers");
val atomicBoolean = new AtomicBoolean();
worker.traverse(hierarchicalOperationWorker -> {
log.info("isRunning worker: {} {} {}", hierarchicalOperationWorker.getType(), hierarchicalOperationWorker.getRoutingKey().getRoutingKey(), hierarchicalOperationWorker.isRunning());
atomicBoolean.set(atomicBoolean.get() && hierarchicalOperationWorker.isRunning());
});
return atomicBoolean.get();
}

private HierarchicalOperationWorker<MessageType, Message> publishActor(final HierarchicalRoutingKey<String> routingKey) {
return (HierarchicalOperationWorker<MessageType, Message>) this.worker.get(messageType, routingKey);
}

private void initializeRouter() {
this.worker = new HierarchicalDataStoreSupplierTree<>(
messageType,
hierarchicalTreeConfig,
HierarchicalRouterUtils.actorConfigToWorkerConfigFunc,
(routingKey, messageTypeKey, workerConfig) -> {
log.info("{} -> {}", routingKey.getRoutingKey(), messageTypeKey);
return new HierarchicalOperationWorker<>(
messageType,
workerConfig,
hierarchicalTreeConfig.getDefaultData(),
routingKey,
actorSystem,
messageHandler,
sidelineHandler,
partitioner,
observers);
}
);
}

}
Loading