Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/test-alphas.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ jobs:
name: Test against ${{ matrix.rabbitmq-image }}
steps:
- uses: actions/checkout@v4
- name: Checkout tls-gen
uses: actions/checkout@v4
with:
repository: rabbitmq/tls-gen
path: './tls-gen'
- name: Set up JDK
uses: actions/setup-java@v4
with:
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/test-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ jobs:

steps:
- uses: actions/checkout@v4
- name: Checkout tls-gen
uses: actions/checkout@v4
with:
repository: rabbitmq/tls-gen
path: './tls-gen'
- name: Set up JDK
uses: actions/setup-java@v4
with:
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/test-supported-java-versions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ jobs:
name: Test against Java ${{ matrix.distribution }} ${{ matrix.version }}
steps:
- uses: actions/checkout@v4
- name: Checkout tls-gen
uses: actions/checkout@v4
with:
repository: rabbitmq/tls-gen
path: './tls-gen'
- name: Set up JDK
uses: actions/setup-java@v4
with:
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ jobs:

steps:
- uses: actions/checkout@v4
- name: Checkout tls-gen
uses: actions/checkout@v4
with:
repository: rabbitmq/tls-gen
path: './tls-gen'
- name: Set up JDK
uses: actions/setup-java@v4
with:
Expand Down
21 changes: 21 additions & 0 deletions ci/start-broker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,32 @@ wait_for_message() {
done
}

make -C "${PWD}"/tls-gen/basic

mkdir -p rabbitmq-configuration/tls
cp -R "${PWD}"/tls-gen/basic/result/* rabbitmq-configuration/tls
chmod o+r rabbitmq-configuration/tls/*
chmod g+r rabbitmq-configuration/tls/*

echo "loopback_users = none

listeners.ssl.default = 5671

ssl_options.cacertfile = /etc/rabbitmq/tls/ca_certificate.pem
ssl_options.certfile = /etc/rabbitmq/tls/server_$(hostname)_certificate.pem
ssl_options.keyfile = /etc/rabbitmq/tls/server_$(hostname)_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = false
ssl_options.depth = 1

auth_mechanisms.1 = PLAIN" >> rabbitmq-configuration/rabbitmq.conf

echo "Running RabbitMQ ${RABBITMQ_IMAGE}"

docker rm -f rabbitmq 2>/dev/null || echo "rabbitmq was not running"
docker run -d --name rabbitmq \
--network host \
-v "${PWD}"/rabbitmq-configuration:/etc/rabbitmq \
"${RABBITMQ_IMAGE}"

wait_for_message rabbitmq "completed with"
Expand Down
15 changes: 14 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,14 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

<spotless.check.skip>true</spotless.check.skip>
<rabbitmq.version>5.26.0</rabbitmq.version>
<rabbitmq.version>5.27.0-SNAPSHOT</rabbitmq.version>
<slf4j.version>2.0.17</slf4j.version>
<commons-cli.version>1.10.0</commons-cli.version>
<metrics.version>4.2.33</metrics.version>
<micrometer.version>1.15.2</micrometer.version>
<jgroups.version>5.4.8.Final</jgroups.version>
<jgroups-kubernetes.version>2.0.2.Final</jgroups-kubernetes.version>
<netty.version>4.2.3.Final</netty.version>
<gson.version>2.13.1</gson.version>
<resilience4j.version>2.1.0</resilience4j.version>
<logback.version>1.3.15</logback.version>
Expand Down Expand Up @@ -169,6 +170,18 @@
<artifactId>jgroups-kubernetes</artifactId>
<version>${jgroups-kubernetes.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>${netty.version}</version>
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<version>${netty.version}</version>
<classifier>osx-aarch_64</classifier>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
Expand Down
34 changes: 13 additions & 21 deletions src/docs/asciidoc/usage-advanced.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ Another way to avoid `java.lang.OutOfMemoryError: unable to create new native th
exceptions is to tune the number of file descriptors allowed per process
at the OS level, as some distributions use very low limits.
Here the recommendations are the same as for the broker, so you
can refer to our https://www.rabbitmq.com/networking.html#os-tuning[networking guide].
can refer to our https://www.rabbitmq.com/docs/networking#os-tuning[networking guide].

[[workloads-with-a-large-number-of-clients]]
== Workloads With a Large Number of Clients
Expand Down Expand Up @@ -386,37 +386,29 @@ These are 1-thread thread pools in this case, so this is 10 threads overall inst
huge resource saving to simulate more clients with a single PerfTest instance for large IoT workloads.

By default, PerfTest uses blocking network socket I/O to communicate with
the broker. This mode works fine for clients in many cases but the RabbitMQ Java client
also supports an https://www.rabbitmq.com/api-guide.html#java-nio[asynchronous I/O mode],
where resources like threads can be easily tuned. The goal here is to use as few
resources as possible to simulate as much load as possible with a single PerfTest instance.
the broker.
This mode works fine for clients in many cases but the RabbitMQ Java client can also use Netty for its network layer.
Netty uses a multithreaded event loop to handle I/O operation and the number of threads can be easily tuned.
The goal here is to use as few resources as possible to simulate as much load as possible with a single PerfTest instance.
In the slow publisher example above, a handful of threads should be enough
to handle the I/O. That's what the
`--nio-threads` flag is for:
to handle the I/O.
That's what the `--netty-threads` flag is for:

.Reducing the number of IO threads by enabling the NIO mode with `--nio-threads`
.Reducing the number of I/O threads by using Netty with `--netty-threads`
[source,bash,indent=0]
--------
java -jar perf-test.jar --queue-pattern 'perf-test-%d' \
--queue-pattern-from 1 --queue-pattern-to 1000 \
--producers 1000 --consumers 1000 \
--heartbeat-sender-threads 10 \
--publishing-interval 60 --producer-random-start-delay 1800 \
--producer-scheduler-threads 10 \
--nio-threads 10
--consumers-thread-pools 10 \
--netty-threads 10
--------

This way PerfTest will use 12 threads for I/O over all the connections.
With the default blocking I/O mode, each producer (or consumer)
uses a thread for the I/O loop, that is 2000 threads to simulate 1000 producers and
1000 consumers. Using NIO in PerfTest can dramatically reduce the resources used
to simulate workloads with a large number of connections with appropriate tuning.

Note that in NIO mode the number of threads used can increase temporarily when connections close
unexpectedly and connection recovery kicks in. This is due to the NIO mode dispatching
connection closing to non-I/O threads to avoid deadlocks. Connection recovery can be disabled
with the `--disable-connection-recovery` flag.

This way PerfTest will use 10 threads for I/O over all the connections.
With the default blocking I/O mode, each producer (or consumer) uses a thread for the I/O loop, that is 2000 threads to simulate 1000 producers and 1000 consumers.
Using Netty in PerfTest can dramatically reduce the resources used to simulate workloads with a large number of connections with appropriate tuning.

== Running Producers and Consumers on Different Machines

Expand Down
7 changes: 5 additions & 2 deletions src/main/java/com/rabbitmq/perf/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -98,6 +99,7 @@ public class Consumer extends AgentBase implements Runnable {

private final Runnable rateLimiterCallback;
private final boolean rateLimitation;
private final PrintStream out;

public Consumer(ConsumerParameters parameters) {
super(
Expand All @@ -124,6 +126,7 @@ public Consumer(ConsumerParameters parameters) {

this.queueNames.set(new ArrayList<>(parameters.getQueueNames()));
this.initialQueueNames = new ArrayList<>(parameters.getQueueNames());
this.out = parameters.getOut();

if (parameters.getConsumerLatenciesIndicator().isVariable()) {
this.consumerLatency =
Expand Down Expand Up @@ -372,7 +375,7 @@ public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig

@Override
public void handleCancel(String consumerTag) {
System.out.printf("Consumer cancelled by broker for tag: %s\n", consumerTag);
out.printf("Consumer cancelled by broker for tag: %s\n", consumerTag);
epochMessageCount.set(0);
if (consumerTagBranchMap.containsKey(consumerTag)) {
String qName = consumerTagBranchMap.get(consumerTag);
Expand All @@ -393,7 +396,7 @@ public void handleCancel(String consumerTag) {
delay.toMillis(),
TimeUnit.MILLISECONDS);
} else {
System.out.printf("Could not find queue for consumer tag: %s\n", consumerTag);
out.printf("Could not find queue for consumer tag: %s\n", consumerTag);
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/com/rabbitmq/perf/ConsumerParameters.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.rabbitmq.client.Channel;
import com.rabbitmq.perf.PerfTest.EXIT_WHEN;
import com.rabbitmq.perf.metrics.PerformanceMetrics;
import java.io.PrintStream;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -60,6 +61,16 @@ public class ConsumerParameters {

private int id;
private FunctionalLogger functionalLogger = FunctionalLogger.NO_OP;
private PrintStream out = System.out;

ConsumerParameters setOut(PrintStream out) {
this.out = out;
return this;
}

PrintStream getOut() {
return out;
}

public Channel getChannel() {
return channel;
Expand Down
23 changes: 22 additions & 1 deletion src/main/java/com/rabbitmq/perf/MulticastParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.rabbitmq.perf.PerfTest.EXIT_WHEN;
import com.rabbitmq.perf.metrics.PerformanceMetrics;
import java.io.IOException;
import java.io.PrintStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -137,6 +138,9 @@ public class MulticastParams {

private FunctionalLogger functionalLogger = FunctionalLogger.NO_OP;

private PrintStream out = System.out;
private boolean netty = false;

public void setExchangeType(String exchangeType) {
this.exchangeType = exchangeType;
}
Expand Down Expand Up @@ -319,6 +323,10 @@ void setConsumerStartDelay(Duration csd) {
this.consumerStartDelay = csd;
}

void setOut(PrintStream out) {
this.out = out;
}

public int getConsumerCount() {
return consumerCount;
}
Expand Down Expand Up @@ -485,6 +493,10 @@ public Duration getConsumerStartDelay() {
return consumerStartDelay;
}

PrintStream getOut() {
return out;
}

public void setPolling(boolean polling) {
this.polling = polling;
}
Expand Down Expand Up @@ -655,7 +667,8 @@ public Consumer createConsumer(
topologyRecordingScheduledExecutorService)
.setStartListener(this.startListener)
.setRateLimiterFactory(this.rateLimiterFactory)
.setFunctionalLogger(this.functionalLogger));
.setFunctionalLogger(this.functionalLogger)
.setOut(this.out));
this.topologyHandler.next();
return consumer;
}
Expand Down Expand Up @@ -761,6 +774,14 @@ public void setProducerSchedulerThreadCount(int producerSchedulerThreadCount) {
this.producerSchedulerThreadCount = producerSchedulerThreadCount;
}

void setNetty(boolean netty) {
this.netty = netty;
}

boolean netty() {
return this.netty;
}

/**
* Contract to handle the creation and configuration of resources. E.g. creation of queues,
* binding exchange to queues.
Expand Down
Loading