diff --git a/.github/workflows/test-alphas.yml b/.github/workflows/test-alphas.yml index 473aa106..c8c10c39 100644 --- a/.github/workflows/test-alphas.yml +++ b/.github/workflows/test-alphas.yml @@ -19,6 +19,11 @@ jobs: name: Test against ${{ matrix.rabbitmq-image }} steps: - uses: actions/checkout@v4 + - name: Checkout tls-gen + uses: actions/checkout@v4 + with: + repository: rabbitmq/tls-gen + path: './tls-gen' - name: Set up JDK uses: actions/setup-java@v4 with: diff --git a/.github/workflows/test-pr.yml b/.github/workflows/test-pr.yml index 2cdb8b7c..06293d21 100644 --- a/.github/workflows/test-pr.yml +++ b/.github/workflows/test-pr.yml @@ -12,6 +12,11 @@ jobs: steps: - uses: actions/checkout@v4 + - name: Checkout tls-gen + uses: actions/checkout@v4 + with: + repository: rabbitmq/tls-gen + path: './tls-gen' - name: Set up JDK uses: actions/setup-java@v4 with: diff --git a/.github/workflows/test-supported-java-versions.yml b/.github/workflows/test-supported-java-versions.yml index 403952d1..22a9dd33 100644 --- a/.github/workflows/test-supported-java-versions.yml +++ b/.github/workflows/test-supported-java-versions.yml @@ -18,6 +18,11 @@ jobs: name: Test against Java ${{ matrix.distribution }} ${{ matrix.version }} steps: - uses: actions/checkout@v4 + - name: Checkout tls-gen + uses: actions/checkout@v4 + with: + repository: rabbitmq/tls-gen + path: './tls-gen' - name: Set up JDK uses: actions/setup-java@v4 with: diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 30f054f7..93ad92c4 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -12,6 +12,11 @@ jobs: steps: - uses: actions/checkout@v4 + - name: Checkout tls-gen + uses: actions/checkout@v4 + with: + repository: rabbitmq/tls-gen + path: './tls-gen' - name: Set up JDK uses: actions/setup-java@v4 with: diff --git a/ci/start-broker.sh b/ci/start-broker.sh index f3d7fe02..624dd590 100755 --- a/ci/start-broker.sh +++ b/ci/start-broker.sh @@ -10,11 +10,32 @@ wait_for_message() { done } +make -C "${PWD}"/tls-gen/basic + +mkdir -p rabbitmq-configuration/tls +cp -R "${PWD}"/tls-gen/basic/result/* rabbitmq-configuration/tls +chmod o+r rabbitmq-configuration/tls/* +chmod g+r rabbitmq-configuration/tls/* + +echo "loopback_users = none + +listeners.ssl.default = 5671 + +ssl_options.cacertfile = /etc/rabbitmq/tls/ca_certificate.pem +ssl_options.certfile = /etc/rabbitmq/tls/server_$(hostname)_certificate.pem +ssl_options.keyfile = /etc/rabbitmq/tls/server_$(hostname)_key.pem +ssl_options.verify = verify_peer +ssl_options.fail_if_no_peer_cert = false +ssl_options.depth = 1 + +auth_mechanisms.1 = PLAIN" >> rabbitmq-configuration/rabbitmq.conf + echo "Running RabbitMQ ${RABBITMQ_IMAGE}" docker rm -f rabbitmq 2>/dev/null || echo "rabbitmq was not running" docker run -d --name rabbitmq \ --network host \ + -v "${PWD}"/rabbitmq-configuration:/etc/rabbitmq \ "${RABBITMQ_IMAGE}" wait_for_message rabbitmq "completed with" diff --git a/pom.xml b/pom.xml index 3e165b4d..40f7bf52 100644 --- a/pom.xml +++ b/pom.xml @@ -55,13 +55,14 @@ UTF-8 true - 5.26.0 + 5.27.0-SNAPSHOT 2.0.17 1.10.0 4.2.33 1.15.2 5.4.8.Final 2.0.2.Final + 4.2.3.Final 2.13.1 2.1.0 1.3.15 @@ -169,6 +170,18 @@ jgroups-kubernetes ${jgroups-kubernetes.version} + + io.netty + netty-transport-native-epoll + ${netty.version} + linux-x86_64 + + + io.netty + netty-transport-native-kqueue + ${netty.version} + osx-aarch_64 + org.junit.jupiter diff --git a/src/docs/asciidoc/usage-advanced.adoc b/src/docs/asciidoc/usage-advanced.adoc index f609cba6..514c4598 100644 --- a/src/docs/asciidoc/usage-advanced.adoc +++ b/src/docs/asciidoc/usage-advanced.adoc @@ -276,7 +276,7 @@ Another way to avoid `java.lang.OutOfMemoryError: unable to create new native th exceptions is to tune the number of file descriptors allowed per process at the OS level, as some distributions use very low limits. Here the recommendations are the same as for the broker, so you -can refer to our https://www.rabbitmq.com/networking.html#os-tuning[networking guide]. +can refer to our https://www.rabbitmq.com/docs/networking#os-tuning[networking guide]. [[workloads-with-a-large-number-of-clients]] == Workloads With a Large Number of Clients @@ -386,37 +386,29 @@ These are 1-thread thread pools in this case, so this is 10 threads overall inst huge resource saving to simulate more clients with a single PerfTest instance for large IoT workloads. By default, PerfTest uses blocking network socket I/O to communicate with -the broker. This mode works fine for clients in many cases but the RabbitMQ Java client -also supports an https://www.rabbitmq.com/api-guide.html#java-nio[asynchronous I/O mode], -where resources like threads can be easily tuned. The goal here is to use as few -resources as possible to simulate as much load as possible with a single PerfTest instance. +the broker. +This mode works fine for clients in many cases but the RabbitMQ Java client can also use Netty for its network layer. +Netty uses a multithreaded event loop to handle I/O operation and the number of threads can be easily tuned. +The goal here is to use as few resources as possible to simulate as much load as possible with a single PerfTest instance. In the slow publisher example above, a handful of threads should be enough -to handle the I/O. That's what the -`--nio-threads` flag is for: +to handle the I/O. +That's what the `--netty-threads` flag is for: -.Reducing the number of IO threads by enabling the NIO mode with `--nio-threads` +.Reducing the number of I/O threads by using Netty with `--netty-threads` [source,bash,indent=0] -------- java -jar perf-test.jar --queue-pattern 'perf-test-%d' \ --queue-pattern-from 1 --queue-pattern-to 1000 \ --producers 1000 --consumers 1000 \ - --heartbeat-sender-threads 10 \ --publishing-interval 60 --producer-random-start-delay 1800 \ --producer-scheduler-threads 10 \ - --nio-threads 10 + --consumers-thread-pools 10 \ + --netty-threads 10 -------- -This way PerfTest will use 12 threads for I/O over all the connections. -With the default blocking I/O mode, each producer (or consumer) -uses a thread for the I/O loop, that is 2000 threads to simulate 1000 producers and -1000 consumers. Using NIO in PerfTest can dramatically reduce the resources used -to simulate workloads with a large number of connections with appropriate tuning. - -Note that in NIO mode the number of threads used can increase temporarily when connections close -unexpectedly and connection recovery kicks in. This is due to the NIO mode dispatching -connection closing to non-I/O threads to avoid deadlocks. Connection recovery can be disabled -with the `--disable-connection-recovery` flag. - +This way PerfTest will use 10 threads for I/O over all the connections. +With the default blocking I/O mode, each producer (or consumer) uses a thread for the I/O loop, that is 2000 threads to simulate 1000 producers and 1000 consumers. +Using Netty in PerfTest can dramatically reduce the resources used to simulate workloads with a large number of connections with appropriate tuning. == Running Producers and Consumers on Different Machines diff --git a/src/main/java/com/rabbitmq/perf/Consumer.java b/src/main/java/com/rabbitmq/perf/Consumer.java index a11841f1..89e7ef19 100644 --- a/src/main/java/com/rabbitmq/perf/Consumer.java +++ b/src/main/java/com/rabbitmq/perf/Consumer.java @@ -25,6 +25,7 @@ import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; +import java.io.PrintStream; import java.time.Duration; import java.util.*; import java.util.concurrent.Callable; @@ -98,6 +99,7 @@ public class Consumer extends AgentBase implements Runnable { private final Runnable rateLimiterCallback; private final boolean rateLimitation; + private final PrintStream out; public Consumer(ConsumerParameters parameters) { super( @@ -124,6 +126,7 @@ public Consumer(ConsumerParameters parameters) { this.queueNames.set(new ArrayList<>(parameters.getQueueNames())); this.initialQueueNames = new ArrayList<>(parameters.getQueueNames()); + this.out = parameters.getOut(); if (parameters.getConsumerLatenciesIndicator().isVariable()) { this.consumerLatency = @@ -372,7 +375,7 @@ public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig @Override public void handleCancel(String consumerTag) { - System.out.printf("Consumer cancelled by broker for tag: %s\n", consumerTag); + out.printf("Consumer cancelled by broker for tag: %s\n", consumerTag); epochMessageCount.set(0); if (consumerTagBranchMap.containsKey(consumerTag)) { String qName = consumerTagBranchMap.get(consumerTag); @@ -393,7 +396,7 @@ public void handleCancel(String consumerTag) { delay.toMillis(), TimeUnit.MILLISECONDS); } else { - System.out.printf("Could not find queue for consumer tag: %s\n", consumerTag); + out.printf("Could not find queue for consumer tag: %s\n", consumerTag); } } } diff --git a/src/main/java/com/rabbitmq/perf/ConsumerParameters.java b/src/main/java/com/rabbitmq/perf/ConsumerParameters.java index f87c1a1f..9740f8f9 100644 --- a/src/main/java/com/rabbitmq/perf/ConsumerParameters.java +++ b/src/main/java/com/rabbitmq/perf/ConsumerParameters.java @@ -18,6 +18,7 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.perf.PerfTest.EXIT_WHEN; import com.rabbitmq.perf.metrics.PerformanceMetrics; +import java.io.PrintStream; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -60,6 +61,16 @@ public class ConsumerParameters { private int id; private FunctionalLogger functionalLogger = FunctionalLogger.NO_OP; + private PrintStream out = System.out; + + ConsumerParameters setOut(PrintStream out) { + this.out = out; + return this; + } + + PrintStream getOut() { + return out; + } public Channel getChannel() { return channel; diff --git a/src/main/java/com/rabbitmq/perf/MulticastParams.java b/src/main/java/com/rabbitmq/perf/MulticastParams.java index 79b7d520..576cdda7 100644 --- a/src/main/java/com/rabbitmq/perf/MulticastParams.java +++ b/src/main/java/com/rabbitmq/perf/MulticastParams.java @@ -24,6 +24,7 @@ import com.rabbitmq.perf.PerfTest.EXIT_WHEN; import com.rabbitmq.perf.metrics.PerformanceMetrics; import java.io.IOException; +import java.io.PrintStream; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; @@ -137,6 +138,9 @@ public class MulticastParams { private FunctionalLogger functionalLogger = FunctionalLogger.NO_OP; + private PrintStream out = System.out; + private boolean netty = false; + public void setExchangeType(String exchangeType) { this.exchangeType = exchangeType; } @@ -319,6 +323,10 @@ void setConsumerStartDelay(Duration csd) { this.consumerStartDelay = csd; } + void setOut(PrintStream out) { + this.out = out; + } + public int getConsumerCount() { return consumerCount; } @@ -485,6 +493,10 @@ public Duration getConsumerStartDelay() { return consumerStartDelay; } + PrintStream getOut() { + return out; + } + public void setPolling(boolean polling) { this.polling = polling; } @@ -655,7 +667,8 @@ public Consumer createConsumer( topologyRecordingScheduledExecutorService) .setStartListener(this.startListener) .setRateLimiterFactory(this.rateLimiterFactory) - .setFunctionalLogger(this.functionalLogger)); + .setFunctionalLogger(this.functionalLogger) + .setOut(this.out)); this.topologyHandler.next(); return consumer; } @@ -761,6 +774,14 @@ public void setProducerSchedulerThreadCount(int producerSchedulerThreadCount) { this.producerSchedulerThreadCount = producerSchedulerThreadCount; } + void setNetty(boolean netty) { + this.netty = netty; + } + + boolean netty() { + return this.netty; + } + /** * Contract to handle the creation and configuration of resources. E.g. creation of queues, * binding exchange to queues. diff --git a/src/main/java/com/rabbitmq/perf/MulticastSet.java b/src/main/java/com/rabbitmq/perf/MulticastSet.java index 3392e8aa..92e085ca 100644 --- a/src/main/java/com/rabbitmq/perf/MulticastSet.java +++ b/src/main/java/com/rabbitmq/perf/MulticastSet.java @@ -28,6 +28,7 @@ import com.rabbitmq.perf.metrics.PerformanceMetrics; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import java.io.IOException; +import java.io.PrintStream; import java.net.URISyntaxException; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; @@ -80,6 +81,7 @@ public class MulticastSet { private final ConnectionCreator connectionCreator; private final ExpectedMetrics expectedMetrics; private final InstanceSynchronization instanceSynchronization; + private final PrintStream out; public MulticastSet( PerformanceMetrics performanceMetrics, @@ -167,6 +169,7 @@ public MulticastSet( this.connectionCreator = new ConnectionCreator(this.factory, this.uris, connectionAllocation); this.expectedMetrics = expectedMetrics; this.instanceSynchronization = instanceSynchronization; + this.out = params.getOut(); } protected static int nbThreadsForConsumer(MulticastParams params) { @@ -233,10 +236,17 @@ public void run(boolean announceStartup) : params.getServersUpLimit(), uris, factory)) { - ScheduledExecutorService heartbeatSenderExecutorService = - this.threadingHandler.scheduledExecutorService( - "perf-test-heartbeat-sender-", this.params.getHeartbeatSenderThreads()); - factory.setHeartbeatExecutor(heartbeatSenderExecutorService); + // heartbeat sender executor not necessary with Netty + if (!params.netty()) { + ScheduledExecutorService heartbeatSenderExecutorService = + this.threadingHandler.scheduledExecutorService( + "perf-test-heartbeat-sender-", this.params.getHeartbeatSenderThreads()); + factory.setHeartbeatExecutor(heartbeatSenderExecutorService); + if (heartbeatSenderExecutorService != null) { + shutdownService.wrap(heartbeatSenderExecutorService::shutdownNow); + } + } + // use a single-threaded executor for the configuration connection // this way, a default one is not created and this one will shut down // when the run ends. @@ -382,7 +392,7 @@ public void run(boolean announceStartup) executeShutdownSequence.run(); } else { - System.out.println( + out.println( "Could not connect to broker(s) in " + params.getServersStartUpTimeout() + " second(s), exiting."); @@ -475,7 +485,7 @@ private void createConsumers( int consumerIndex = 0; for (int i = 0; i < consumerConnections.length; i++) { if (announceStartup) { - System.out.println("id: " + testID + ", starting consumer #" + i); + out.println("id: " + testID + ", starting consumer #" + i); } ExecutorService executorService = consumersExecutorsFactory.apply(i); factory.setSharedExecutor(executorService); @@ -484,7 +494,7 @@ private void createConsumers( consumerConnections[i] = consumerConnection; for (int j = 0; j < params.getConsumerChannelCount(); j++) { if (announceStartup) { - System.out.println("id: " + testID + ", starting consumer #" + i + ", channel #" + j); + out.println("id: " + testID + ", starting consumer #" + i + ", channel #" + j); } Consumer consumer = params.createConsumer( @@ -507,13 +517,13 @@ private void createProducers( int producerIndex = 0; for (int i = 0; i < producerConnections.length; i++) { if (announceStartup) { - System.out.println("id: " + testID + ", starting producer #" + i); + out.println("id: " + testID + ", starting producer #" + i); } Connection producerConnection = createConnection(PRODUCER_THREAD_PREFIX + i); producerConnections[i] = producerConnection; for (int j = 0; j < params.getProducerChannelCount(); j++) { if (announceStartup) { - System.out.println("id: " + testID + ", starting producer #" + i + ", channel #" + j); + out.println("id: " + testID + ", starting producer #" + i + ", channel #" + j); } AgentState agentState = new AgentState(); agentState.runnable = @@ -538,7 +548,7 @@ private void startConsumers(Runnable[] consumerRunnables) throws InterruptedExce runnable.run(); LOGGER.debug("Consumer runnable started"); if (params.getConsumerSlowStart()) { - System.out.println("Delaying start by 1 second because -S/--slow-start was requested"); + out.println("Delaying start by 1 second because -S/--slow-start was requested"); Thread.sleep(1000); } } @@ -551,8 +561,7 @@ private void startConsumers(Runnable[] consumerRunnables) throws InterruptedExce for (Runnable runnable : consumerRunnables) { runnable.run(); if (params.getConsumerSlowStart()) { - System.out.println( - "Delaying start by 1 second because -S/--slow-start was requested"); + out.println("Delaying start by 1 second because -S/--slow-start was requested"); try { Thread.sleep(1000); } catch (InterruptedException e) { diff --git a/src/main/java/com/rabbitmq/perf/PerfTest.java b/src/main/java/com/rabbitmq/perf/PerfTest.java index afa498f2..8c38610a 100644 --- a/src/main/java/com/rabbitmq/perf/PerfTest.java +++ b/src/main/java/com/rabbitmq/perf/PerfTest.java @@ -17,6 +17,7 @@ import static com.rabbitmq.client.ConnectionFactory.computeDefaultTlsProtocol; import static com.rabbitmq.perf.OptionsUtils.forEach; +import static com.rabbitmq.perf.Tuples.pair; import static com.rabbitmq.perf.Utils.strArg; import static java.lang.String.format; import static java.util.Arrays.asList; @@ -32,8 +33,8 @@ import com.rabbitmq.client.impl.CredentialsRefreshService; import com.rabbitmq.client.impl.DefaultCredentialsRefreshService.DefaultCredentialsRefreshServiceBuilder; import com.rabbitmq.client.impl.DefaultExceptionHandler; -import com.rabbitmq.client.impl.nio.NioParams; import com.rabbitmq.perf.Metrics.ConfigurationContext; +import com.rabbitmq.perf.Tuples.Pair; import com.rabbitmq.perf.Utils.GsonOAuth2ClientCredentialsGrantCredentialsProvider; import com.rabbitmq.perf.metrics.CompositeMetricsFormatter; import com.rabbitmq.perf.metrics.CsvMetricsFormatter; @@ -42,10 +43,22 @@ import com.rabbitmq.perf.metrics.MetricsFormatterFactory; import com.rabbitmq.perf.metrics.MetricsFormatterFactory.Context; import io.micrometer.core.instrument.composite.CompositeMeterRegistry; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.IoHandlerFactory; +import io.netty.channel.MultiThreadIoEventLoopGroup; +import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollIoHandler; +import io.netty.channel.epoll.EpollSocketChannel; +import io.netty.channel.kqueue.KQueue; +import io.netty.channel.kqueue.KQueueIoHandler; +import io.netty.channel.kqueue.KQueueSocketChannel; +import io.netty.channel.nio.NioIoHandler; +import io.netty.channel.socket.nio.NioSocketChannel; import java.io.*; import java.math.BigDecimal; import java.nio.charset.Charset; -import java.security.NoSuchAlgorithmException; import java.text.SimpleDateFormat; import java.time.Duration; import java.time.Instant; @@ -56,7 +69,9 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BooleanSupplier; +import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; @@ -105,7 +120,7 @@ public static void main(String[] args, PerfTestOptions perfTestOptions) { } if (cmd.hasOption('v')) { - versionInformation(); + versionInformation(consoleOut); systemExiter.exit(0); } @@ -134,7 +149,7 @@ public static void main(String[] args, PerfTestOptions perfTestOptions) { String metricsPrefix = strArg(cmd, "mpx", "perftest_"); CompositeMeterRegistry registry = new CompositeMeterRegistry(); - shutdownService.wrap(() -> registry.close()); + shutdownService.wrap(registry::close); List uris; if (urisParameter != null) { @@ -156,14 +171,14 @@ public static void main(String[] args, PerfTestOptions perfTestOptions) { if (recoveryDelayHandler != null) { factory.setRecoveryDelayHandler(recoveryDelayHandler); } - SSLContext sslContext = - perfTestOptions.skipSslContextConfiguration - ? null - : getSslContextIfNecessary(cmd, System.getProperties()); - if (sslContext != null) { + SSLContext sslContext = null; + if (!perfTestOptions.skipSslContextConfiguration + && useDefaultSslContext(cmd, System.getProperties())) { + sslContext = SSLContext.getDefault(); factory.useSslProtocol(sslContext); } + if (saslExternal) { factory.setSaslConfig(DefaultSaslConfig.EXTERNAL); } @@ -171,12 +186,15 @@ public static void main(String[] args, PerfTestOptions perfTestOptions) { factory.setUri(uris.get(0)); factory.setRequestedFrameMax(frameMax); factory.setRequestedHeartbeat(heartbeat); - factory = configureNioIfRequested(cmd, factory); - if (factory.getNioParams().getNioExecutor() != null) { - ExecutorService nioExecutor = factory.getNioParams().getNioExecutor(); - shutdownService.wrap(() -> nioExecutor.shutdownNow()); + + boolean netty = netty(cmd); + if (netty) { + factory.useBlockingIo(); } + configureNioIfRequested(cmd, factory, shutdownService); + configureNettyIfRequested(cmd, factory, shutdownService); + String oauth2TokenEndpoint = strArg(cmd, "o2uri", null); if (oauth2TokenEndpoint != null) { String clientId = strArg(cmd, "o2id", null); @@ -232,9 +250,16 @@ public static void main(String[] args, PerfTestOptions perfTestOptions) { } factory.setSocketConfigurator(Utils.socketConfigurator(cmd)); - if (factory.getNioParams() != null) { - factory.getNioParams().setSocketChannelConfigurator(Utils.socketChannelConfigurator(cmd)); - factory.getNioParams().setSslEngineConfigurator(Utils.sslEngineConfigurator(cmd)); + if (nioParams(factory) != null) { + nioParams(factory).setSocketChannelConfigurator(Utils.socketChannelConfigurator(cmd)); + nioParams(factory).setSslEngineConfigurator(Utils.sslEngineConfigurator(cmd)); + } + + if (netty) { + factory.netty().channelCustomizer(Utils.channelCustomizer(cmd)); + if (factory.isSSL()) { + factory.netty().sslContext(Utils.nettySslContext(sslContext)); + } } metrics.configure( @@ -325,7 +350,7 @@ public static void main(String[] args, PerfTestOptions perfTestOptions) { consoleOut.flush(); } }; - shutdownService.wrap(() -> statsSummary.run()); + shutdownService.wrap(statsSummary::run); Map exposedMetrics = convertKeyValuePairs(strArg(cmd, "em", null)); ExpectedMetrics expectedMetrics = @@ -384,6 +409,7 @@ public static void main(String[] args, PerfTestOptions perfTestOptions) { static MulticastParams multicastParams( CommandLineProxy cmd, List uris, PerfTestOptions perfTestOptions) throws Exception { SystemExiter systemExiter = perfTestOptions.systemExiter; + PrintStream consoleOut = perfTestOptions.consoleOut; PrintStream consoleErr = perfTestOptions.consoleErr; String exchangeType = strArg(cmd, 't', "direct"); @@ -670,6 +696,8 @@ static MulticastParams multicastParams( functionalLogger = new DefaultFunctionalLogger(perfTestOptions.consoleOut, verboseFull); } + boolean netty = netty(cmd); + MulticastParams p = new MulticastParams(); p.setAutoAck(autoAck); p.setAutoDelete(autoDelete); @@ -731,10 +759,12 @@ static MulticastParams multicastParams( p.setConsumerArguments(consumerArguments); p.setQueuesInSequence(queueFile != null); p.setExitWhen(exitWhen); - p.setCluster(uris.size() > 0); + p.setCluster(!uris.isEmpty()); p.setConsumerStartDelay(consumerStartDelay); p.setRateLimiterFactory(rateLimiterFactory); p.setFunctionalLogger(functionalLogger); + p.setOut(consoleOut); + p.setNetty(netty); return p; } @@ -774,12 +804,14 @@ private static PrintWriter openCsvFileForWriting( return output; } - private static ConnectionFactory configureNioIfRequested( - CommandLineProxy cmd, ConnectionFactory factory) { + @SuppressWarnings("deprecation") + private static void configureNioIfRequested( + CommandLineProxy cmd, ConnectionFactory factory, ShutdownService shutdownService) { int nbThreads = Utils.intArg(cmd, "niot", -1); int executorSize = Utils.intArg(cmd, "niotp", -1); if (nbThreads > 0 || executorSize > 0) { - NioParams nioParams = new NioParams(); + com.rabbitmq.client.impl.nio.NioParams nioParams = + new com.rabbitmq.client.impl.nio.NioParams(); int[] nbThreadsAndExecutorSize = getNioNbThreadsAndExecutorSize(nbThreads, executorSize); nioParams.setNbIoThreads(nbThreadsAndExecutorSize[0]); // FIXME we cannot limit the max size of the thread pool because of @@ -789,18 +821,19 @@ private static ConnectionFactory configureNioIfRequested( // the thread pool is busy closing the connections, connection recovery // kicks in the same used threads, and new connection cannot be opened as // there are no available threads anymore in the pool for NIO! - nioParams.setNioExecutor( + ExecutorService nioExecutor = new ThreadPoolExecutor( nbThreadsAndExecutorSize[1], Integer.MAX_VALUE, 30L, TimeUnit.SECONDS, new SynchronousQueue<>(), - new NamedThreadFactory("perf-test-nio-"))); + new NamedThreadFactory("perf-test-nio-")); + nioParams.setNioExecutor(nioExecutor); + shutdownService.wrap(nioExecutor::shutdownNow); factory.useNio(); factory.setNioParams(nioParams); } - return factory; } protected static int[] getNioNbThreadsAndExecutorSize( @@ -826,6 +859,57 @@ protected static int[] getNioNbThreadsAndExecutorSize( }; } + @SuppressWarnings("deprecation") + private static com.rabbitmq.client.impl.nio.NioParams nioParams(ConnectionFactory cf) { + return cf.getNioParams(); + } + + private static void configureNettyIfRequested( + CommandLineProxy cmd, ConnectionFactory factory, ShutdownService shutdownService) { + if (netty(cmd)) { + int nbThreads = Utils.intArg(cmd, "ntyt", -1); + boolean epoll = hasOption(cmd, "ntyep"); + boolean kqueue = hasOption(cmd, "ntykq"); + Pair> io = nettyIo(epoll, kqueue); + IoHandlerFactory ioHandlerFactory = io.v1(); + Consumer bootstrapCustomizer = b -> b.channel(io.v2()); + EventLoopGroup eventLoopGroup = + nbThreads > 0 + ? new MultiThreadIoEventLoopGroup(nbThreads, ioHandlerFactory) + : new MultiThreadIoEventLoopGroup(ioHandlerFactory); + shutdownService.wrap(() -> eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS)); + factory.netty().bootstrapCustomizer(bootstrapCustomizer).eventLoopGroup(eventLoopGroup); + } + } + + private static Pair> nettyIo( + boolean epoll, boolean kqueue) { + Supplier>> result = + () -> pair(NioIoHandler.newFactory(), NioSocketChannel.class); + if (epoll) { + if (Epoll.isAvailable()) { + result = () -> pair(EpollIoHandler.newFactory(), EpollSocketChannel.class); + } else { + LOGGER.warn("epoll not available, using Java NIO instead"); + } + } else if (kqueue) { + if (KQueue.isAvailable()) { + result = () -> pair(KQueueIoHandler.newFactory(), KQueueSocketChannel.class); + } else { + LOGGER.warn("kqueue not available, using Java NIO instead"); + } + } + return result.get(); + } + + private static boolean netty(CommandLineProxy cmd) { + boolean netty = hasOption(cmd, "nty"); + int nbThreads = Utils.intArg(cmd, "ntyt", -1); + boolean epoll = hasOption(cmd, "ntyep"); + boolean kqueue = hasOption(cmd, "ntykq"); + return netty || nbThreads > 0 || epoll || kqueue; + } + static MulticastSet.CompletionHandler getCompletionHandler( MulticastParams p, ConcurrentMap reasons) { MulticastSet.CompletionHandler completionHandler; @@ -865,21 +949,20 @@ public static void main(String[] args) throws IOException { .setExceptionHandler(new RelaxedExceptionHandler())); } - private static SSLContext getSslContextIfNecessary( - CommandLineProxy cmd, Properties systemProperties) throws NoSuchAlgorithmException { - SSLContext sslContext = null; + private static boolean useDefaultSslContext(CommandLineProxy cmd, Properties systemProperties) { + boolean result = false; if (hasOption(cmd, "udsc") || hasOption(cmd, "useDefaultSslContext")) { LOGGER.info("Using default SSL context as per command line option"); - sslContext = SSLContext.getDefault(); + result = true; } for (String propertyName : systemProperties.stringPropertyNames()) { if (propertyName != null && isPropertyTlsRelated(propertyName)) { LOGGER.info("TLS related system properties detected, using default SSL context"); - sslContext = SSLContext.getDefault(); + result = true; break; } } - return sslContext; + return result; } private static boolean isPropertyTlsRelated(String propertyName) { @@ -1079,7 +1162,8 @@ static Options getOptions() { "hst", "heartbeat-sender-threads", true, - "number of threads for producers and consumers heartbeat senders, default is 1 thread per connection")); + "number of threads for producers and consumers heartbeat senders, default is 1 thread per connection " + + "(not necessary when using Netty)")); options.addOption( new Option( "mp", @@ -1120,13 +1204,17 @@ static Options getOptions() { true, "number of threads to use when using --publishing-interval, default is calculated by PerfTest")); options.addOption( - new Option("niot", "nio-threads", true, "number of NIO threads to use, default is 1")); + new Option( + "niot", + "nio-threads", + true, + "number of NIO threads to use, default is 1 (deprecated, use Netty instead)")); options.addOption( new Option( "niotp", "nio-thread-pool", true, - "size of NIO thread pool, should be slightly higher than number of NIO threads, default is --nio-threads + 2")); + "size of NIO thread pool, should be slightly higher than number of NIO threads, default is --nio-threads + 2 (deprecated, use Netty instead)")); options.addOption(new Option("mh", "metrics-help", false, "show metrics usage")); @@ -1389,6 +1477,25 @@ static Options getOptions() { true, "the way to allocate connection across nodes (random or round-robin), default is random.")); + options.addOption(new Option("nty", "netty", false, "use Netty for IO")); + options.addOption( + new Option( + "ntyt", + "netty-threads", + true, + "number of Netty threads to use (default is Netty's default)")); + options.addOption( + new Option( + "ntyep", + "netty-epoll", + false, + "use Netty's native epoll transport (Linux x86-64 only)")); + options.addOption( + new Option( + "ntykq", + "netty-kqueue", + false, + "use Netty's native kqueue transport (macOS aarch64 only)")); return options; } @@ -1490,7 +1597,7 @@ private static String getExchangeName(CommandLineProxy cmd, String def) { return exchangeName; } - private static void versionInformation() { + private static void versionInformation(PrintStream out) { String version = format( "RabbitMQ Perf Test %s (%s; %s)", @@ -1515,8 +1622,8 @@ private static void versionInformation() { System.getProperty("os.name"), System.getProperty("os.version"), System.getProperty("os.arch"))); - System.out.println("\u001B[1m" + version); - System.out.println("\u001B[0m" + info); + out.println("\u001B[1m" + version); + out.println("\u001B[0m" + info); } static Duration parsePublishingInterval(String input) { diff --git a/src/main/java/com/rabbitmq/perf/Tuples.java b/src/main/java/com/rabbitmq/perf/Tuples.java new file mode 100644 index 00000000..b5c2d695 --- /dev/null +++ b/src/main/java/com/rabbitmq/perf/Tuples.java @@ -0,0 +1,44 @@ +// Copyright (c) 2024 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.perf; + +final class Tuples { + + private Tuples() {} + + public static Pair pair(A v1, B v2) { + return new Pair<>(v1, v2); + } + + public static class Pair { + + private final A v1; + private final B v2; + + private Pair(A v1, B v2) { + this.v1 = v1; + this.v2 = v2; + } + + public A v1() { + return this.v1; + } + + public B v2() { + return this.v2; + } + } +} diff --git a/src/main/java/com/rabbitmq/perf/Utils.java b/src/main/java/com/rabbitmq/perf/Utils.java index d3143b09..2ec0d509 100644 --- a/src/main/java/com/rabbitmq/perf/Utils.java +++ b/src/main/java/com/rabbitmq/perf/Utils.java @@ -15,11 +15,20 @@ // info@rabbitmq.com. package com.rabbitmq.perf; +import static com.rabbitmq.client.ConnectionFactory.computeDefaultTlsProtocol; + import com.google.gson.Gson; import com.rabbitmq.client.*; import com.rabbitmq.client.impl.OAuth2ClientCredentialsGrantCredentialsProvider; import com.rabbitmq.client.impl.OAuthTokenManagementException; import com.rabbitmq.client.impl.recovery.AutorecoveringConnection; +import io.netty.channel.ChannelOption; +import io.netty.handler.ssl.ClientAuth; +import io.netty.handler.ssl.IdentityCipherSuiteFilter; +import io.netty.handler.ssl.JdkSslContext; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.SslHandler; import java.io.IOException; import java.io.PrintStream; import java.lang.reflect.Constructor; @@ -27,26 +36,29 @@ import java.net.URISyntaxException; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; +import java.security.cert.X509Certificate; import java.time.Duration; import java.time.Instant; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SNIHostName; import javax.net.ssl.SNIServerName; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLException; import javax.net.ssl.SSLParameters; import javax.net.ssl.SSLSocket; import javax.net.ssl.SSLSocketFactory; +import javax.net.ssl.X509TrustManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,27 +83,26 @@ public boolean isDone() { } @Override - public Object get() throws InterruptedException, ExecutionException { + public Object get() { return null; } @Override - public Object get(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { + public Object get(long timeout, TimeUnit unit) { return null; } }; private static final Logger LOGGER = LoggerFactory.getLogger(Utils.class); - private static final ConnectionFactory CF = new ConnectionFactory(); static boolean isRecoverable(Connection connection) { return connection instanceof AutorecoveringConnection; } - static synchronized Address extract(String uri) + static Address extract(String uri) throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException { - CF.setUri(uri); - return new Address(CF.getHost(), CF.getPort()); + ConnectionFactory cf = new ConnectionFactory(); + cf.setUri(uri); + return new Address(cf.getHost(), cf.getPort()); } /** @@ -129,8 +140,8 @@ static RecoveryDelayHandler getRecoveryDelayHandler(String argument) { static List sniServerNames(String argumentValue) { if (argumentValue != null && !argumentValue.trim().isEmpty()) { return Arrays.stream(argumentValue.split(",")) - .map(s -> s.trim()) - .map(s -> new SNIHostName(s)) + .map(String::trim) + .map(SNIHostName::new) .collect(Collectors.toList()); } else { return Collections.emptyList(); @@ -157,38 +168,82 @@ static SocketConfigurator socketConfigurator(CommandLineProxy cmd) { } }); } - int sendBufferSize = intArg(cmd, "tsbs", 65_536); - int receiveBufferSize = intArg(cmd, "trbs", 65_536); - boolean tcpNoDelay = boolArg(cmd, "tnd", "true"); + TcpSettings settings = new TcpSettings(cmd); socketConfigurator = socketConfigurator.andThen( socket -> { - if (sendBufferSize > 0) { - socket.setSendBufferSize(sendBufferSize); + if (settings.sendBufferSize() > 0) { + socket.setSendBufferSize(settings.sendBufferSize()); } - if (receiveBufferSize > 0) { - socket.setReceiveBufferSize(receiveBufferSize); + if (settings.receiveBufferSize() > 0) { + socket.setReceiveBufferSize(settings.receiveBufferSize()); } - socket.setTcpNoDelay(tcpNoDelay); + socket.setTcpNoDelay(settings.tcpNoDelay()); }); return socketConfigurator; } + static Consumer channelCustomizer(CommandLineProxy cmd) { + TcpSettings settings = new TcpSettings(cmd); + List servers = sniServerNames(strArg(cmd, "sni", null)); + return ch -> { + if (settings.sendBufferSize() > 0) { + ch.setOption(ChannelOption.SO_SNDBUF, settings.sendBufferSize()); + } + if (settings.receiveBufferSize() > 0) { + ch.setOption(ChannelOption.SO_RCVBUF, settings.receiveBufferSize()); + } + ch.setOption(ChannelOption.TCP_NODELAY, settings.tcpNoDelay()); + if (!servers.isEmpty()) { + SslHandler sslHandler = ch.pipeline().get(SslHandler.class); + if (sslHandler == null) { + LOGGER.warn("SNI TLS parameter set but there is no TLS Netty handler"); + } else { + SSLParameters sslParameters = sslHandler.engine().getSSLParameters(); + sslParameters.setServerNames(servers); + sslHandler.engine().setSSLParameters(sslParameters); + } + } + }; + } + + static SslContext nettySslContext(SSLContext defaultSslContext) + throws SSLException, NoSuchAlgorithmException { + if (defaultSslContext == null) { + return SslContextBuilder.forClient() + .protocols( + computeDefaultTlsProtocol( + SSLContext.getDefault().getSupportedSSLParameters().getProtocols())) + .trustManager(new TrustAllTrustManager()) + .build(); + } else { + return new JdkSslContext( + defaultSslContext, + true, + null, + IdentityCipherSuiteFilter.INSTANCE, + null, + ClientAuth.NONE, + null, + false); + } + } + + @SuppressWarnings("deprecation") static SocketChannelConfigurator socketChannelConfigurator(CommandLineProxy cmd) { - int sendBufferSize = intArg(cmd, "tsbs", -1); - int receiveBufferSize = intArg(cmd, "trbs", -1); - boolean tcpNoDelay = boolArg(cmd, "tnd", "true"); + TcpSettings settings = new TcpSettings(cmd); return socketChannel -> { - if (sendBufferSize > 0) { - socketChannel.socket().setSendBufferSize(sendBufferSize); + if (settings.sendBufferSize() > 0) { + socketChannel.socket().setSendBufferSize(settings.sendBufferSize()); } - if (receiveBufferSize > 0) { - socketChannel.socket().setReceiveBufferSize(receiveBufferSize); + if (settings.receiveBufferSize() > 0) { + socketChannel.socket().setReceiveBufferSize(settings.receiveBufferSize()); } - socketChannel.socket().setTcpNoDelay(tcpNoDelay); + socketChannel.socket().setTcpNoDelay(settings.tcpNoDelay()); }; } + @SuppressWarnings("deprecation") static SslEngineConfigurator sslEngineConfigurator(CommandLineProxy cmd) { List serverNames = sniServerNames(strArg(cmd, "sni", null)); if (serverNames.isEmpty()) { @@ -269,13 +324,10 @@ static InstanceSynchronization defaultInstanceSynchronization( throw new IllegalArgumentException("Multi-instance synchronization is not available"); } }; - } catch (NoSuchMethodException e) { - throw new RuntimeException(e); - } catch (InvocationTargetException e) { - throw new RuntimeException(e); - } catch (InstantiationException e) { - throw new RuntimeException(e); - } catch (IllegalAccessException e) { + } catch (NoSuchMethodException + | InvocationTargetException + | InstantiationException + | IllegalAccessException e) { throw new RuntimeException(e); } } @@ -315,4 +367,43 @@ protected Token parseToken(String response) { } } } + + private static class TcpSettings { + + private final int sendBufferSize; + private final int receiveBufferSize; + private final boolean tcpNoDelay; + + private TcpSettings(CommandLineProxy cmd) { + this.sendBufferSize = intArg(cmd, "tsbs", 65_536); + this.receiveBufferSize = intArg(cmd, "trbs", 65_536); + this.tcpNoDelay = boolArg(cmd, "tnd", "true"); + } + + private int sendBufferSize() { + return this.sendBufferSize; + } + + private int receiveBufferSize() { + return this.receiveBufferSize; + } + + private boolean tcpNoDelay() { + return this.tcpNoDelay; + } + } + + static class TrustAllTrustManager implements X509TrustManager { + + @Override + public void checkClientTrusted(X509Certificate[] chain, String authType) {} + + @Override + public void checkServerTrusted(X509Certificate[] chain, String authType) {} + + @Override + public X509Certificate[] getAcceptedIssuers() { + return new X509Certificate[0]; + } + } } diff --git a/src/test/java/com/rabbitmq/perf/it/ConnectionRecoveryIT.java b/src/test/java/com/rabbitmq/perf/it/ConnectionRecoveryIT.java index 63d1007b..f541c7e5 100644 --- a/src/test/java/com/rabbitmq/perf/it/ConnectionRecoveryIT.java +++ b/src/test/java/com/rabbitmq/perf/it/ConnectionRecoveryIT.java @@ -29,10 +29,8 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.impl.nio.NioParams; import com.rabbitmq.perf.MulticastParams; import com.rabbitmq.perf.MulticastSet; -import com.rabbitmq.perf.NamedThreadFactory; import com.rabbitmq.perf.PerformanceMetricsAdapter; import com.rabbitmq.perf.metrics.PerformanceMetrics; import java.io.IOException; @@ -99,18 +97,17 @@ public Duration interval() { }; static Stream configurationArguments() { - return Stream.of(blockingIoAndNio(multicastParamsConfigurers())); + return Stream.of(blockingIoAndNetty(multicastParamsConfigurers())); } - static Arguments[] blockingIoAndNio(List> multicastParamsConfigurers) { + static Arguments[] blockingIoAndNetty( + List> multicastParamsConfigurers) { List arguments = new ArrayList<>(); for (Consumer configurer : multicastParamsConfigurers) { arguments.add( Arguments.of( configurer, namedConsumer("blocking IO", (Consumer) cf -> {}))); - arguments.add( - Arguments.of( - configurer, namedConsumer("NIO", (Consumer) cf -> cf.useNio()))); + arguments.add(Arguments.of(configurer, namedConsumer("Netty", ConnectionFactory::useNetty))); } return arguments.toArray(new Arguments[0]); @@ -146,7 +143,7 @@ static Stream configurationArgumentsForSeveralUris() { namedConsumer("one server-named queue", empty()), namedConsumer("several queues", severalQueues()), namedConsumer("queue sequence", queueSequence())) - .map(configurer -> Arguments.of(configurer)); + .map(Arguments::of); } static Consumer empty() { @@ -299,23 +296,11 @@ public void shouldRecoverWhenConnectionsAreKilledAndUsingPublishingInterval( } @Test - public void shouldRecoverWithNio(TestInfo info) throws Exception { + public void shouldRecoverWithNetty(TestInfo info) throws Exception { params.setQueueNames(Arrays.asList("one", "two", "three")); params.setProducerCount(10); params.setConsumerCount(10); - cf.useNio(); - cf.setNioParams( - new NioParams() - .setNbIoThreads(10) - // see PerfTest#configureNioIfRequested - .setNioExecutor( - new ThreadPoolExecutor( - 10, - params.getProducerCount() + params.getConsumerCount() + 5, - 30L, - TimeUnit.SECONDS, - new SynchronousQueue<>(), - new NamedThreadFactory("perf-test-nio-")))); + cf.useNetty(); int producerConsumerCount = params.getProducerCount(); MulticastSet set = new MulticastSet(performanceMetrics, cf, params, "", URIS, latchCompletionHandler(1, info)); diff --git a/src/test/java/com/rabbitmq/perf/it/PerfTestIT.java b/src/test/java/com/rabbitmq/perf/it/PerfTestIT.java new file mode 100644 index 00000000..00460a8d --- /dev/null +++ b/src/test/java/com/rabbitmq/perf/it/PerfTestIT.java @@ -0,0 +1,263 @@ +// Copyright (c) 2025 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.perf.it; + +import static com.rabbitmq.perf.TestUtils.waitAtMost; +import static org.assertj.core.api.Assertions.assertThat; + +import com.rabbitmq.perf.PerfTest; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; +import org.junit.jupiter.api.condition.EnabledOnOs; +import org.junit.jupiter.api.condition.OS; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +public class PerfTestIT { + + static ExecutorService executor = Executors.newSingleThreadExecutor(); + volatile ByteArrayOutputStream out, err; + volatile RecordingSystemExiter systemExiter; + + @BeforeAll + static void initAll() { + executor = Executors.newSingleThreadExecutor(); + } + + @BeforeEach + void init() { + out = new ByteArrayOutputStream(); + err = new ByteArrayOutputStream(); + systemExiter = new RecordingSystemExiter(); + } + + @AfterAll + static void tearDown() { + executor.shutdownNow(); + } + + @Test + void runWithMessageCountLimitsShouldStop() throws Exception { + int messageCount = 1000; + run(builder().pmessages(messageCount).cmessages(messageCount)); + waitRunEnds(); + assertThat(consoleOutput()).contains("starting", "stopped", "avg"); + } + + @Test + void netty() throws Exception { + int messageCount = 1000; + run(builder().pmessages(messageCount).cmessages(messageCount).nettyThreads(1)); + waitRunEnds(); + assertThat(consoleOutput()).contains("starting", "stopped", "avg"); + } + + @ParameterizedTest + @EnumSource + @Utils.DisabledIfTlsNotEnabled + void tlsShouldConnect(IoLayer ioLayer) throws Exception { + int messageCount = 1000; + run( + builder(ioLayer) + .pmessages(messageCount) + .cmessages(messageCount) + .uri("amqps://localhost") + .sni("localhost")); + waitRunEnds(); + assertThat(consoleOutput()).contains("starting", "stopped", "avg"); + } + + @ParameterizedTest + @EnumSource + @Utils.DisabledIfTlsNotEnabled + void tlsWithDefaultSslContextShouldFail(IoLayer ioLayer) throws Exception { + int messageCount = 1000; + run( + builder(ioLayer) + .pmessages(messageCount) + .cmessages(messageCount) + .uri("amqps://localhost") + .useDefaultSslContext()); + waitRunEnds(1); + assertThat(consoleOutput()).contains("test stopped"); + } + + @Test + @EnabledOnOs(OS.LINUX) + @EnabledIfSystemProperty(named = "os.arch", matches = "amd64") + void nativeEpollWorksOnLinux() throws Exception { + int messageCount = 1000; + run(builder().pmessages(messageCount).cmessages(messageCount).nettyThreads(1).nettyEpoll()); + waitRunEnds(); + assertThat(consoleOutput()).contains("starting", "stopped", "avg"); + } + + @Test + @EnabledOnOs(OS.MAC) + @EnabledIfSystemProperty(named = "os.arch", matches = "aarch64") + void nativeKqueueWorksOnMacOs() throws Exception { + int messageCount = 1000; + run(builder().pmessages(messageCount).cmessages(messageCount).nettyThreads(1).nettyKqueue()); + waitRunEnds(); + assertThat(consoleOutput()).contains("starting", "stopped", "avg"); + } + + private static void waitOneSecond() throws InterruptedException { + wait(Duration.ofSeconds(1)); + } + + private static void wait(Duration duration) throws InterruptedException { + Thread.sleep(duration.toMillis()); + } + + Future run(ArgumentsBuilder builder) { + String[] args = builder.build().split(" "); + return executor.submit( + () -> { + PerfTest.PerfTestOptions options = new PerfTest.PerfTestOptions(); + options.setConsoleOut(new PrintStream(out)); + options.setConsoleErr(new PrintStream(err)); + options.setSystemExiter(systemExiter); + PerfTest.main(args, options); + }); + } + + ArgumentsBuilder builder(IoLayer ioLayer) { + ArgumentsBuilder builder = builder(); + if (ioLayer == IoLayer.NETTY) { + builder.nettyThreads(1); + } + return builder; + } + + ArgumentsBuilder builder() { + return new ArgumentsBuilder(); + } + + static class ArgumentsBuilder { + + private final Map arguments = new HashMap<>(); + + ArgumentsBuilder pmessages(int pmessages) { + return intArgument("pmessages", pmessages); + } + + ArgumentsBuilder cmessages(int cmessages) { + return intArgument("cmessages", cmessages); + } + + ArgumentsBuilder nettyThreads(int threads) { + return intArgument("netty-threads", threads); + } + + ArgumentsBuilder uri(String uri) { + return this.argument("uri", uri); + } + + ArgumentsBuilder sni(String sni) { + return argument("server-name-indication", sni); + } + + ArgumentsBuilder useDefaultSslContext() { + return argument("use-default-ssl-context", "true"); + } + + ArgumentsBuilder nettyEpoll() { + return argument("netty-epoll", "true"); + } + + ArgumentsBuilder nettyKqueue() { + return argument("netty-kqueue", "true"); + } + + ArgumentsBuilder rate(int rate) { + return intArgument("rate", rate); + } + + private ArgumentsBuilder argument(String key, String value) { + this.arguments.put(key, value); + return this; + } + + private ArgumentsBuilder intArgument(String key, int value) { + this.arguments.put(key, String.valueOf(value)); + return this; + } + + String build() { + return this.arguments.entrySet().stream() + .map(e -> "--" + e.getKey() + (e.getValue().isEmpty() ? "" : (" " + e.getValue()))) + .collect(Collectors.joining(" ")); + } + } + + private void waitRunEnds() throws Exception { + waitRunEnds(0); + } + + private void waitRunEnds(int expectedExitCode) throws Exception { + waitAtMost( + (int) Duration.ofSeconds(30).getSeconds(), + () -> systemExiter.status() == expectedExitCode, + () -> "Expected " + expectedExitCode + " exit code, got " + systemExiter.status()); + } + + private String consoleOutput() { + return out.toString(); + } + + private String consoleErrorOutput() { + return err.toString(); + } + + private static class RecordingSystemExiter implements PerfTest.SystemExiter { + + private AtomicInteger exitCount = new AtomicInteger(0); + private AtomicInteger lastStatus = new AtomicInteger(-1); + + @Override + public void exit(int status) { + exitCount.incrementAndGet(); + lastStatus.set(status); + } + + boolean notCalled() { + return exitCount.get() == 0; + } + + int status() { + return lastStatus.get(); + } + } + + private enum IoLayer { + NETTY, + SOCKET + } +} diff --git a/src/test/java/com/rabbitmq/perf/it/Utils.java b/src/test/java/com/rabbitmq/perf/it/Utils.java index a7f1edbd..9ea1ba9d 100644 --- a/src/test/java/com/rabbitmq/perf/it/Utils.java +++ b/src/test/java/com/rabbitmq/perf/it/Utils.java @@ -16,10 +16,19 @@ package com.rabbitmq.perf.it; import com.rabbitmq.perf.MulticastSet; +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; import java.lang.reflect.Method; import java.util.UUID; import java.util.concurrent.CountDownLatch; import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ConditionEvaluationResult; +import org.junit.jupiter.api.extension.ExecutionCondition; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.ExtensionContext; final class Utils { @@ -67,4 +76,32 @@ public void countDown(String reason) { latch.countDown(); } } + + static boolean tlsAvailable() { + try { + Process process = Host.rabbitmqctl("status"); + String output = Host.capture(process.getInputStream()); + return output.contains("amqp/ssl"); + } catch (Exception e) { + throw new RuntimeException("Error while trying to detect TLS: " + e.getMessage()); + } + } + + static class DisabledIfTlsNotEnabledCondition implements ExecutionCondition { + + @Override + public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) { + if (tlsAvailable()) { + return ConditionEvaluationResult.enabled("TLS is enabled"); + } else { + return ConditionEvaluationResult.disabled("TLS is disabled"); + } + } + } + + @Target({ElementType.TYPE, ElementType.METHOD}) + @Retention(RetentionPolicy.RUNTIME) + @Documented + @ExtendWith(DisabledIfTlsNotEnabledCondition.class) + public @interface DisabledIfTlsNotEnabled {} }