Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class ConsumersCoordinator {
private final NavigableSet<ClientSubscriptionsManager> managers = new ConcurrentSkipListSet<>();
private final AtomicLong trackerIdSequence = new AtomicLong(0);

private final boolean debug = true;
private final boolean debug = false;
private final List<SubscriptionTracker> trackers = new CopyOnWriteArrayList<>();

ConsumersCoordinator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class ProducersCoordinator {
private final AtomicLong managerIdSequence = new AtomicLong(0);
private final NavigableSet<ClientProducersManager> managers = new ConcurrentSkipListSet<>();
private final AtomicLong trackerIdSequence = new AtomicLong(0);
private final boolean debug = true;
private final boolean debug = false;
private final List<ProducerTracker> producerTrackers = new CopyOnWriteArrayList<>();

ProducersCoordinator(
Expand Down
8 changes: 7 additions & 1 deletion src/main/java/com/rabbitmq/stream/impl/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,18 @@ static <T> T callAndMaybeRetry(
String description = format(format, args);
int attempt = 0;
Exception lastException = null;
long startTime = System.nanoTime();
boolean keepTrying = true;
while (keepTrying) {
try {
attempt++;
T result = operation.get();
LOGGER.debug("Operation '{}' completed after {} attempt(s)", description, attempt);
Duration operationDuration = Duration.ofNanos(System.nanoTime() - startTime);
LOGGER.debug(
"Operation '{}' completed in {} ms after {} attempt(s)",
description,
operationDuration.toMillis(),
attempt);
return result;
} catch (Exception e) {
lastException = e;
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/com/rabbitmq/stream/perf/MonitoringContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.sun.net.httpserver.HttpServer;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
Expand All @@ -37,14 +38,19 @@ class MonitoringContext {
private final Environment environment;

private final Collection<Endpoint> endpoints = Collections.synchronizedList(new ArrayList<>());
private final PrintWriter out;

private volatile HttpServer server;

MonitoringContext(
int monitoringPort, CompositeMeterRegistry meterRegistry, Environment environment) {
int monitoringPort,
CompositeMeterRegistry meterRegistry,
Environment environment,
PrintWriter out) {
this.monitoringPort = monitoringPort;
this.meterRegistry = meterRegistry;
this.environment = environment;
this.out = out;
}

void addHttpEndpoint(String path, String description, HttpHandler handler) {
Expand Down Expand Up @@ -88,7 +94,7 @@ void start() throws Exception {
});

server.start();
System.out.println("Monitoring endpoints started on http://localhost:" + this.monitoringPort);
this.out.println("Monitoring endpoints started on http://localhost:" + this.monitoringPort);
}
}

Expand Down
81 changes: 52 additions & 29 deletions src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufAllocatorMetric;
import io.netty.buffer.ByteBufAllocatorMetricProvider;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.internal.PlatformDependent;
Expand Down Expand Up @@ -491,7 +493,7 @@ static int run(
return commandLine.execute(args);
}

static void versionInformation(PrintStream out) {
static void versionInformation(PrintWriter out) {
String lineSeparator = System.getProperty("line.separator");
String version =
format(
Expand Down Expand Up @@ -661,13 +663,16 @@ public Integer call() throws Exception {
}
}

EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

EnvironmentBuilder environmentBuilder =
Environment.builder()
.id("stream-perf-test")
.uris(this.uris)
.addressResolver(addrResolver)
.scheduledExecutorService(envExecutor)
.metricsCollector(metricsCollector)
.eventLoopGroup(eventLoopGroup)
.byteBufAllocator(byteBufAllocator)
.codec(codec)
.maxProducersByConnection(this.producersByConnection)
Expand Down Expand Up @@ -701,10 +706,20 @@ public Integer call() throws Exception {
}

Environment environment = environmentBuilder.channelCustomizer(channelCustomizer).build();
shutdownService.wrap(closeStep("Closing environment(s)", () -> environment.close()));
if (!isRunTimeLimited()) {
shutdownService.wrap(
closeStep(
"Closing Netty event loop group",
() -> {
if (!eventLoopGroup.isShuttingDown() || !eventLoopGroup.isShutdown()) {
eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS);
}
}));
shutdownService.wrap(closeStep("Closing environment", () -> environment.close()));
}

MonitoringContext monitoringContext =
new MonitoringContext(this.monitoringPort, meterRegistry, environment);
new MonitoringContext(this.monitoringPort, meterRegistry, environment, this.out);
this.monitorings.forEach(m -> m.configure(monitoringContext));
monitoringContext.start();

Expand Down Expand Up @@ -958,14 +973,16 @@ public Integer call() throws Exception {
})
.collect(Collectors.toList()));

shutdownService.wrap(
closeStep(
"Closing consumers",
() -> {
for (Consumer consumer : consumers) {
consumer.close();
}
}));
if (!isRunTimeLimited()) {
shutdownService.wrap(
closeStep(
"Closing consumers",
() -> {
for (Consumer consumer : consumers) {
consumer.close();
}
}));
}

ExecutorService executorService;
if (this.producers > 0) {
Expand All @@ -980,23 +997,25 @@ public Integer call() throws Exception {
executorService = null;
}

shutdownService.wrap(
closeStep(
"Closing producers",
() -> {
for (Producer p : producers) {
p.close();
}
}));
if (!isRunTimeLimited()) {
shutdownService.wrap(
closeStep(
"Closing producers",
() -> {
for (Producer p : producers) {
p.close();
}
}));

shutdownService.wrap(
closeStep(
"Closing producers executor service",
() -> {
if (executorService != null) {
executorService.shutdownNow();
}
}));
shutdownService.wrap(
closeStep(
"Closing producers executor service",
() -> {
if (executorService != null) {
executorService.shutdownNow();
}
}));
}

String metricsHeader = "Arguments: " + String.join(" ", arguments);

Expand All @@ -1008,7 +1027,7 @@ public Integer call() throws Exception {
Thread shutdownHook = new Thread(() -> latch.countDown());
Runtime.getRuntime().addShutdownHook(shutdownHook);
try {
if (this.time > 0) {
if (isRunTimeLimited()) {
latch.await(this.time, TimeUnit.SECONDS);
} else {
latch.await();
Expand Down Expand Up @@ -1091,7 +1110,7 @@ private void maybeDisplayEnvironmentVariablesHelp() {

private void maybeDisplayVersion() {
if (this.version) {
versionInformation(System.out);
versionInformation(this.out);
System.exit(0);
}
}
Expand All @@ -1112,6 +1131,10 @@ public String toString() {
};
}

private boolean isRunTimeLimited() {
return this.time > 0;
}

public void monitorings(List<Monitoring> monitorings) {
this.monitorings = monitorings;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.net.HttpURLConnection;
import java.net.ServerSocket;
import java.net.URL;
Expand Down Expand Up @@ -165,7 +166,7 @@ void helpShouldReturnImmediately() throws Exception {

@Test
void versionShouldReturnAppropriateInformation() {
StreamPerfTest.versionInformation(new PrintStream(out, true));
StreamPerfTest.versionInformation(new PrintWriter(out, true));
assertThat(consoleOutput()).contains("RabbitMQ Stream Perf Test");
}

Expand Down