From b2f83e322cac2eea5ece8fc0a96d29d81b141759 Mon Sep 17 00:00:00 2001 From: Alessio Matricardi Date: Tue, 11 Jun 2024 01:52:54 +0200 Subject: [PATCH 01/14] first implementation --- .../zeromq/dsl/ZeroMqMessageHandlerSpec.java | 22 +++++ .../zeromq/outbound/ZeroMqMessageHandler.java | 91 +++++++++++++++++-- .../outbound/ZeroMqMessageHandlerTests.java | 38 ++++++++ 3 files changed, 143 insertions(+), 8 deletions(-) diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java index b99fe26996c..2b71ff5fd1c 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java @@ -52,6 +52,16 @@ protected ZeroMqMessageHandlerSpec(ZContext context, String connectUrl) { this(context, () -> connectUrl); } + /** + * Create an instance based on the provided {@link ZContext} and binding port. + * @param context the {@link ZContext} to use for creating sockets. + * @param port the port to bind ZeroMq socket to over TCP. + * @since 6.2.6 + */ + public ZeroMqMessageHandlerSpec(ZContext context, int port) { + this(context, port, SocketType.PAIR); + } + /** * Create an instance based on the provided {@link ZContext} and connection string supplier. * @param context the {@link ZContext} to use for creating sockets. @@ -73,6 +83,18 @@ protected ZeroMqMessageHandlerSpec(ZContext context, String connectUrl, SocketTy this(context, () -> connectUrl, socketType); } + /** + * Create an instance based on the provided {@link ZContext}, binding port and {@link SocketType}. + * @param context the {@link ZContext} to use for creating sockets. + * @param port the port to bind ZeroMq socket to over TCP. + * @param socketType the {@link SocketType} to use; + * only {@link SocketType#PAIR}, {@link SocketType#PUB} and {@link SocketType#PUSH} are supported. + * @since 6.2.6 + */ + public ZeroMqMessageHandlerSpec(ZContext context, int port, SocketType socketType) { + super(new ZeroMqMessageHandler(context, port, socketType)); + } + /** * Create an instance based on the provided {@link ZContext}, connection string supplier and {@link SocketType}. * @param context the {@link ZContext} to use for creating sockets. diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java index 2a668148318..46def69b705 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Supplier; @@ -43,6 +44,7 @@ import org.springframework.integration.mapping.OutboundMessageMapper; import org.springframework.integration.support.converter.ConfigurableCompositeMessageConverter; import org.springframework.integration.support.management.ManageableLifecycle; +import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.converter.MessageConverter; import org.springframework.util.Assert; @@ -50,14 +52,14 @@ /** * The {@link AbstractReactiveMessageHandler} implementation for publishing messages over ZeroMq socket. * Only {@link SocketType#PAIR}, {@link SocketType#PUB} and {@link SocketType#PUSH} are supported. - * This component is only connecting (no Binding) to another side, e.g. ZeroMq proxy. + * This component can bind or connect the socket. *

* When the {@link SocketType#PUB} is used, the {@link #topicExpression} is evaluated against a * request message to inject a topic frame into a ZeroMq message if it is not {@code null}. * The subscriber side must receive the topic frame first before parsing the actual data. *

* When the payload of the request message is a {@link ZMsg}, no any conversion and topic extraction happen: - * the {@link ZMsg} is sent into a socket as is and it is not destroyed for possible further reusing. + * the {@link ZMsg} is sent into a socket as is, and it is not destroyed for possible further reusing. * * @author Artem Bilan * @author Alessio Matricardi @@ -74,7 +76,7 @@ public class ZeroMqMessageHandler extends AbstractReactiveMessageHandler private final Scheduler publisherScheduler = Schedulers.newSingle("zeroMqMessageHandlerScheduler"); - private final Mono socketMono; + private volatile Mono socketMono; private OutboundMessageMapper messageMapper; @@ -91,6 +93,23 @@ public class ZeroMqMessageHandler extends AbstractReactiveMessageHandler private volatile boolean wrapTopic = true; + private final ZContext context; + + private final SocketType socketType; + + private final AtomicInteger bindPort = new AtomicInteger(); + + @Nullable + private String connectUrl; + + private ZeroMqMessageHandler(ZContext context, SocketType socketType) { + Assert.notNull(context, "'context' must not be null"); + Assert.state(VALID_SOCKET_TYPES.contains(socketType), + () -> "'socketType' can only be one of the: " + VALID_SOCKET_TYPES); + this.context = context; + this.socketType = socketType; + } + /** * Create an instance based on the provided {@link ZContext} and connection string. * @param context the {@link ZContext} to use for creating sockets. @@ -100,6 +119,16 @@ public ZeroMqMessageHandler(ZContext context, String connectUrl) { this(context, connectUrl, SocketType.PAIR); } + /** + * Create an instance based on the provided {@link ZContext} and binding port. + * @param context the {@link ZContext} to use for creating sockets. + * @param port the port to bind ZeroMq socket to over TCP. + * @since 6.2.6 + */ + public ZeroMqMessageHandler(ZContext context, int port) { + this(context, port, SocketType.PAIR); + } + /** * Create an instance based on the provided {@link ZContext} and connection string supplier. * @param context the {@link ZContext} to use for creating sockets. @@ -122,6 +151,26 @@ public ZeroMqMessageHandler(ZContext context, String connectUrl, SocketType sock Assert.hasText(connectUrl, "'connectUrl' must not be empty"); } + /** + * Create an instance based on the provided {@link ZContext}, binding port and {@link SocketType}. + * @param context the {@link ZContext} to use for creating sockets. + * @param port the port to bind ZeroMq socket to over TCP. + * @param socketType the {@link SocketType} to use; + * only {@link SocketType#PAIR}, {@link SocketType#PUB} and {@link SocketType#PUSH} are supported. + * @since 6.2.6 + */ + public ZeroMqMessageHandler(ZContext context, int port, SocketType socketType) { + this(context, socketType); + this.bindPort.set(port); + this.socketMono = + Mono.just(this.context.createSocket(this.socketType)) + .publishOn(this.publisherScheduler) + .doOnNext((socket) -> this.socketConfigurer.accept(socket)) + .doOnNext((socket) -> this.bindPort.set(bindSocket(socket, this.bindPort.get()))) + .cache() + .publishOn(this.publisherScheduler); + } + /** * Create an instance based on the provided {@link ZContext}, connection string supplier and {@link SocketType}. * @param context the {@link ZContext} to use for creating sockets. @@ -131,15 +180,16 @@ public ZeroMqMessageHandler(ZContext context, String connectUrl, SocketType sock * @since 5.5.9 */ public ZeroMqMessageHandler(ZContext context, Supplier connectUrl, SocketType socketType) { - Assert.notNull(context, "'context' must not be null"); + this(context, socketType); Assert.notNull(connectUrl, "'connectUrl' must not be null"); - Assert.state(VALID_SOCKET_TYPES.contains(socketType), - () -> "'socketType' can only be one of the: " + VALID_SOCKET_TYPES); this.socketMono = - Mono.just(context.createSocket(socketType)) + Mono.just(this.context.createSocket(this.socketType)) .publishOn(this.publisherScheduler) .doOnNext((socket) -> this.socketConfigurer.accept(socket)) - .doOnNext((socket) -> socket.connect(connectUrl.get())) + .doOnNext((socket) -> { + socket.connect(connectUrl.get()); + this.connectUrl = connectUrl.get(); + }) .cache() .publishOn(this.publisherScheduler); } @@ -206,6 +256,16 @@ public void wrapTopic(boolean wrapTopic) { this.wrapTopic = wrapTopic; } + /** + * Return the port a socket is bound or 0 if this message producer has not been started yet + * or the socket is connected - not bound. + * @return the port for a socket or 0. + * @since 6.2.6 + */ + public int getBoundPort() { + return this.bindPort.get(); + } + @Override public String getComponentType() { return "zeromq:outbound-channel-adapter"; @@ -214,6 +274,8 @@ public String getComponentType() { @Override protected void onInit() { super.onInit(); + Assert.state(this.connectUrl == null || this.bindPort.get() == 0, + "Only one of the 'connectUrl' or 'bindPort' must be provided"); BeanFactory beanFactory = getBeanFactory(); this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(beanFactory); if (this.messageMapper == null) { @@ -283,4 +345,17 @@ public void destroy() { this.publisherScheduler.dispose(); } + private static int bindSocket(ZMQ.Socket socket, int port) { + if (port == 0) { + return socket.bindToRandomPort("tcp://*"); + } + else { + boolean bound = socket.bind("tcp://*:" + port); + if (!bound) { + throw new IllegalArgumentException("Cannot bind ZeroMQ socket to port: " + port); + } + return port; + } + } + } diff --git a/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandlerTests.java b/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandlerTests.java index 1431047c437..a7abf452442 100644 --- a/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandlerTests.java +++ b/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandlerTests.java @@ -35,6 +35,7 @@ import org.springframework.messaging.Message; import org.springframework.messaging.converter.ByteArrayMessageConverter; import org.springframework.messaging.support.GenericMessage; +import org.springframework.test.util.TestSocketUtils; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; @@ -187,4 +188,41 @@ void testMessageHandlerForPubSubDisabledWrapTopic() { subSocket.close(); } + @Test + void testMessageHandlerForPubSubWithBind() { + int boundPort = TestSocketUtils.findAvailableTcpPort(); + ZeroMqMessageHandler messageHandler = + new ZeroMqMessageHandler(CONTEXT, boundPort, SocketType.PUB); + messageHandler.setBeanFactory(mock(BeanFactory.class)); + messageHandler.setTopicExpression( + new FunctionExpression>((message) -> message.getHeaders().get("topic"))); + messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper()); + messageHandler.wrapTopic(false); + messageHandler.afterPropertiesSet(); + messageHandler.start(); + + ZMQ.Socket subSocket = CONTEXT.createSocket(SocketType.SUB); + subSocket.setReceiveTimeOut(0); + subSocket.connect("tcp://localhost:" + boundPort); + subSocket.subscribe("test"); + + Message testMessage = MessageBuilder.withPayload("test").setHeader("topic", "testTopic").build(); + + await().atMost(Duration.ofSeconds(20)).pollDelay(Duration.ofMillis(100)) + .untilAsserted(() -> { + subSocket.subscribe("test"); + messageHandler.handleMessage(testMessage).subscribe(); + ZMsg msg = ZMsg.recvMsg(subSocket); + assertThat(msg).isNotNull(); + assertThat(msg.pop().getString(ZMQ.CHARSET)).isEqualTo("testTopic"); + Message capturedMessage = + new EmbeddedJsonHeadersMessageMapper().toMessage(msg.getFirst().getData()); + assertThat(capturedMessage).isEqualTo(testMessage); + msg.destroy(); + }); + + messageHandler.destroy(); + subSocket.close(); + } + } From 4630c865b26cc925f02d69aff1eb2f7562da5e88 Mon Sep 17 00:00:00 2001 From: Alessio Matricardi Date: Tue, 11 Jun 2024 10:28:45 +0200 Subject: [PATCH 02/14] add docs --- .../antora/modules/ROOT/pages/zeromq.adoc | 21 +++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/src/reference/antora/modules/ROOT/pages/zeromq.adoc b/src/reference/antora/modules/ROOT/pages/zeromq.adoc index a0b85efdfee..8d9d8442719 100644 --- a/src/reference/antora/modules/ROOT/pages/zeromq.adoc +++ b/src/reference/antora/modules/ROOT/pages/zeromq.adoc @@ -146,7 +146,9 @@ ZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel out The `ZeroMqMessageHandler` is a `ReactiveMessageHandler` implementation to produce publish messages into a ZeroMQ socket. Only `SocketType.PAIR`, `SocketType.PUSH` and `SocketType.PUB` are supported. -The `ZeroMqMessageHandler` only supports connecting the ZeroMQ socket; binding is not supported. +This component can connect to the remote socket or bind onto TCP protocol with the provided or random port. +The actual port can be obtained via `getBoundPort()` after this component is started and ZeroMQ socket is bound. + When the `SocketType.PUB` is used, the `topicExpression` is evaluated against a request message to inject a topic frame into a ZeroMQ message if it is not null. The subscriber side (`SocketType.SUB`) must receive the topic frame first before parsing the actual data. @@ -158,7 +160,7 @@ Otherwise, an `OutboundMessageMapper` is used to convert a request messa By default, a `ConvertingBytesMessageMapper` is used supplied with a `ConfigurableCompositeMessageConverter`. The socket options (e.g. security or write timeout) can be configured via `setSocketConfigurer(Consumer socketConfigurer)` callback. -Here is a sample of `ZeroMqMessageHandler` configuration: +Here is a sample of `ZeroMqMessageHandler` configuration which connect to a socket: [source,java] ---- @@ -173,6 +175,21 @@ ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) { } ---- +Here is a sample of `ZeroMqMessageHandler` configuration which bind to a provided port: + +[source,java] +---- +@Bean +@ServiceActivator(inputChannel = "zeroMqPublisherChannel") +ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) { + ZeroMqMessageHandler messageHandler = + new ZeroMqMessageHandler(context, 7070, SocketType.PUB); + messageHandler.setTopicExpression( + new FunctionExpression>((message) -> message.getHeaders().get("topic"))); + messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper()); +} +---- + [[zeromq-dsl]] == ZeroMQ Java DSL Support From cecc3c588bfd553f493e8a3e45f72adac471b91d Mon Sep 17 00:00:00 2001 From: Alessio Matricardi Date: Mon, 17 Jun 2024 22:42:56 +0200 Subject: [PATCH 03/14] protected constructor in ZeroMqMessageHandlerSpec, expose them via ZeroMq --- .../integration/zeromq/dsl/ZeroMq.java | 26 +++++++++++++++++++ .../zeromq/dsl/ZeroMqMessageHandlerSpec.java | 8 +++--- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMq.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMq.java index 74a2c530f9f..6b02763da2e 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMq.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMq.java @@ -25,6 +25,7 @@ * Factory class for ZeroMq components DSL. * * @author Artem Bilan + * @author Alessio Matricardi * * @since 5.4 */ @@ -58,6 +59,17 @@ public static ZeroMqMessageHandlerSpec outboundChannelAdapter(ZContext context, return outboundChannelAdapter(context, () -> connectUrl); } + /** + * Create an instance of {@link ZeroMqMessageHandlerSpec} for the provided {@link ZContext} and binding port. + * @param context the {@link ZContext} to use. + * @param port the port to bind ZeroMq socket to over TCP. + * @return the spec. + * @since 6.4 + */ + public static ZeroMqMessageHandlerSpec outboundChannelAdapter(ZContext context, int port) { + return new ZeroMqMessageHandlerSpec(context, port); + } + /** * Create an instance of {@link ZeroMqMessageHandlerSpec} for the provided {@link ZContext} * and connection URL supplier. @@ -84,6 +96,20 @@ public static ZeroMqMessageHandlerSpec outboundChannelAdapter(ZContext context, return new ZeroMqMessageHandlerSpec(context, connectUrl, socketType); } + /** + * Create an instance of {@link ZeroMqMessageHandlerSpec} for the provided {@link ZContext}, binding port + * and {@link SocketType}. + * @param context the {@link ZContext} to use. + * @param port the port to bind ZeroMq socket to over TCP. + * @param socketType the {@link SocketType} for ZeroMq socket. + * @return the spec. + * @since 6.4 + */ + public static ZeroMqMessageHandlerSpec outboundChannelAdapter(ZContext context, int port, + SocketType socketType) { + return new ZeroMqMessageHandlerSpec(context, port, socketType); + } + /** * Create an instance of {@link ZeroMqMessageHandlerSpec} for the provided {@link ZContext}, * connection URL supplier and {@link SocketType}. diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java index 2b71ff5fd1c..570e7531a4c 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java @@ -56,9 +56,9 @@ protected ZeroMqMessageHandlerSpec(ZContext context, String connectUrl) { * Create an instance based on the provided {@link ZContext} and binding port. * @param context the {@link ZContext} to use for creating sockets. * @param port the port to bind ZeroMq socket to over TCP. - * @since 6.2.6 + * @since 6.4 */ - public ZeroMqMessageHandlerSpec(ZContext context, int port) { + protected ZeroMqMessageHandlerSpec(ZContext context, int port) { this(context, port, SocketType.PAIR); } @@ -89,9 +89,9 @@ protected ZeroMqMessageHandlerSpec(ZContext context, String connectUrl, SocketTy * @param port the port to bind ZeroMq socket to over TCP. * @param socketType the {@link SocketType} to use; * only {@link SocketType#PAIR}, {@link SocketType#PUB} and {@link SocketType#PUSH} are supported. - * @since 6.2.6 + * @since 6.4 */ - public ZeroMqMessageHandlerSpec(ZContext context, int port, SocketType socketType) { + protected ZeroMqMessageHandlerSpec(ZContext context, int port, SocketType socketType) { super(new ZeroMqMessageHandler(context, port, socketType)); } From 6e06a0912efed9f01e70ec93e7178e859a13b365 Mon Sep 17 00:00:00 2001 From: Alessio Matricardi Date: Mon, 17 Jun 2024 22:44:03 +0200 Subject: [PATCH 04/14] introduce ZeroMqUtils, for common Zero MQ utilities functions --- .../integration/zeromq/ZeroMqUtils.java | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqUtils.java diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqUtils.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqUtils.java new file mode 100644 index 00000000000..465f8fe3198 --- /dev/null +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqUtils.java @@ -0,0 +1,54 @@ +/* + * Copyright 2020-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.zeromq; + +import org.zeromq.ZMQ; + +/** + * Module that wraps common methods of ZeroMq integration classes + * + * @author Alessio Matricardi + * + * @since 6.4 + */ +public final class ZeroMqUtils { + + /** + * Binds the ZeroMq socket to the given port when different from 0 + * or to a random port if not. + * @param socket the ZeroMq socket + * @param port the port to bind ZeroMq socket to over TCP. + * @return the effectively bound port + * @since 6.4 + */ + public static int bindSocket(ZMQ.Socket socket, int port) { + if (port == 0) { + return socket.bindToRandomPort("tcp://*"); + } + else { + boolean bound = socket.bind("tcp://*:" + port); + if (!bound) { + throw new IllegalArgumentException("Cannot bind ZeroMQ socket to port: " + port); + } + return port; + } + } + + private ZeroMqUtils() { + } + +} From 1604b3c925ba99164232213db7464929376cb5e2 Mon Sep 17 00:00:00 2001 From: Alessio Matricardi Date: Mon, 17 Jun 2024 22:44:30 +0200 Subject: [PATCH 05/14] use ZeroMqUtils.bindSocket in ZeroMqMessageProducer --- .../zeromq/inbound/ZeroMqMessageProducer.java | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.java index d2c484c3e19..1d203315916 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.java @@ -40,6 +40,7 @@ import org.springframework.integration.support.converter.ConfigurableCompositeMessageConverter; import org.springframework.integration.support.management.IntegrationManagedResource; import org.springframework.integration.zeromq.ZeroMqHeaders; +import org.springframework.integration.zeromq.ZeroMqUtils; import org.springframework.jmx.export.annotation.ManagedOperation; import org.springframework.jmx.export.annotation.ManagedResource; import org.springframework.lang.Nullable; @@ -263,7 +264,7 @@ protected void doStart() { socket.connect(this.connectUrl); } else { - this.bindPort.set(bindSocket(socket, this.bindPort.get())); + this.bindPort.set(ZeroMqUtils.bindSocket(socket, this.bindPort.get())); } }) .cache() @@ -319,17 +320,4 @@ public void destroy() { this.socketMono.doOnNext(ZMQ.Socket::close).block(); } - private static int bindSocket(ZMQ.Socket socket, int port) { - if (port == 0) { - return socket.bindToRandomPort("tcp://*"); - } - else { - boolean bound = socket.bind("tcp://*:" + port); - if (!bound) { - throw new IllegalArgumentException("Cannot bind ZeroMQ socket to port: " + port); - } - return port; - } - } - } From 89e78022f10ff6e5e3428b3e27052334976d0810 Mon Sep 17 00:00:00 2001 From: Alessio Matricardi Date: Mon, 17 Jun 2024 22:46:52 +0200 Subject: [PATCH 06/14] refactor ZeroMqMessageHandler providing connectUrl and bindPort setters and simple constructors, following the same logic used for ZeroMqMessageProvider --- .../zeromq/outbound/ZeroMqMessageHandler.java | 92 ++++++++++++------- 1 file changed, 57 insertions(+), 35 deletions(-) diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java index 46def69b705..c2170b91acf 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java @@ -44,6 +44,7 @@ import org.springframework.integration.mapping.OutboundMessageMapper; import org.springframework.integration.support.converter.ConfigurableCompositeMessageConverter; import org.springframework.integration.support.management.ManageableLifecycle; +import org.springframework.integration.zeromq.ZeroMqUtils; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.converter.MessageConverter; @@ -100,9 +101,13 @@ public class ZeroMqMessageHandler extends AbstractReactiveMessageHandler private final AtomicInteger bindPort = new AtomicInteger(); @Nullable - private String connectUrl; + private Supplier connectUrl; - private ZeroMqMessageHandler(ZContext context, SocketType socketType) { + public ZeroMqMessageHandler(ZContext context) { + this(context, SocketType.PAIR); + } + + public ZeroMqMessageHandler(ZContext context, SocketType socketType) { Assert.notNull(context, "'context' must not be null"); Assert.state(VALID_SOCKET_TYPES.contains(socketType), () -> "'socketType' can only be one of the: " + VALID_SOCKET_TYPES); @@ -123,7 +128,7 @@ public ZeroMqMessageHandler(ZContext context, String connectUrl) { * Create an instance based on the provided {@link ZContext} and binding port. * @param context the {@link ZContext} to use for creating sockets. * @param port the port to bind ZeroMq socket to over TCP. - * @since 6.2.6 + * @since 6.4 */ public ZeroMqMessageHandler(ZContext context, int port) { this(context, port, SocketType.PAIR); @@ -157,18 +162,12 @@ public ZeroMqMessageHandler(ZContext context, String connectUrl, SocketType sock * @param port the port to bind ZeroMq socket to over TCP. * @param socketType the {@link SocketType} to use; * only {@link SocketType#PAIR}, {@link SocketType#PUB} and {@link SocketType#PUSH} are supported. - * @since 6.2.6 + * @since 6.4 */ public ZeroMqMessageHandler(ZContext context, int port, SocketType socketType) { this(context, socketType); + Assert.isTrue(port > 0, "'port' must not be zero or negative"); this.bindPort.set(port); - this.socketMono = - Mono.just(this.context.createSocket(this.socketType)) - .publishOn(this.publisherScheduler) - .doOnNext((socket) -> this.socketConfigurer.accept(socket)) - .doOnNext((socket) -> this.bindPort.set(bindSocket(socket, this.bindPort.get()))) - .cache() - .publishOn(this.publisherScheduler); } /** @@ -182,16 +181,7 @@ public ZeroMqMessageHandler(ZContext context, int port, SocketType socketType) { public ZeroMqMessageHandler(ZContext context, Supplier connectUrl, SocketType socketType) { this(context, socketType); Assert.notNull(connectUrl, "'connectUrl' must not be null"); - this.socketMono = - Mono.just(this.context.createSocket(this.socketType)) - .publishOn(this.publisherScheduler) - .doOnNext((socket) -> this.socketConfigurer.accept(socket)) - .doOnNext((socket) -> { - socket.connect(connectUrl.get()); - this.connectUrl = connectUrl.get(); - }) - .cache() - .publishOn(this.publisherScheduler); + this.connectUrl = connectUrl; } /** @@ -256,11 +246,42 @@ public void wrapTopic(boolean wrapTopic) { this.wrapTopic = wrapTopic; } + /** + * Configure an URL for {@link org.zeromq.ZMQ.Socket#connect(String)}. + * Mutually exclusive with the {@link #setBindPort(int)}. + * @param connectUrl the URL to connect ZeroMq socket to. + * @since 6.4 + */ + public void setConnectUrl(@Nullable String connectUrl) { + this.setConnectUrl(() -> connectUrl); + } + + /** + * Configure an URL supplier for {@link org.zeromq.ZMQ.Socket#connect(String)}. + * Mutually exclusive with the {@link #setBindPort(int)}. + * @param connectUrl the supplier for URL to connect the socket to. + * @since 6.4 + */ + public void setConnectUrl(@Nullable Supplier connectUrl) { + this.connectUrl = connectUrl; + } + + /** + * Configure a port for TCP protocol binding via {@link org.zeromq.ZMQ.Socket#bind(String)}. + * Mutually exclusive with the {@link #setConnectUrl(String)}. + * @param port the port to bind ZeroMq socket to over TCP. + * @since 6.4 + */ + public void setBindPort(int port) { + Assert.isTrue(port > 0, "'port' must not be zero or negative"); + this.bindPort.set(port); + } + /** * Return the port a socket is bound or 0 if this message producer has not been started yet * or the socket is connected - not bound. * @return the port for a socket or 0. - * @since 6.2.6 + * @since 6.4 */ public int getBoundPort() { return this.bindPort.get(); @@ -290,6 +311,20 @@ protected void onInit() { @Override public void start() { if (!this.running.getAndSet(true)) { + this.socketMono = + Mono.just(this.context.createSocket(this.socketType)) + .publishOn(this.publisherScheduler) + .doOnNext((socket) -> this.socketConfigurer.accept(socket)) + .doOnNext((socket) -> { + if (this.connectUrl != null) { + socket.connect(this.connectUrl.get()); + } + else { + this.bindPort.set(ZeroMqUtils.bindSocket(socket, this.bindPort.get())); + } + }) + .cache() + .publishOn(this.publisherScheduler); this.socketMonoSubscriber = this.socketMono.subscribe(); } } @@ -345,17 +380,4 @@ public void destroy() { this.publisherScheduler.dispose(); } - private static int bindSocket(ZMQ.Socket socket, int port) { - if (port == 0) { - return socket.bindToRandomPort("tcp://*"); - } - else { - boolean bound = socket.bind("tcp://*:" + port); - if (!bound) { - throw new IllegalArgumentException("Cannot bind ZeroMQ socket to port: " + port); - } - return port; - } - } - } From 71e6276b5cd3130c81e2631e627a491f7b3f509e Mon Sep 17 00:00:00 2001 From: Alessio Matricardi Date: Mon, 17 Jun 2024 22:47:49 +0200 Subject: [PATCH 07/14] fix tests to follow the new ZeroMqMessageHandler implementation --- .../integration/zeromq/outbound/ZeroMqMessageHandlerTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandlerTests.java b/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandlerTests.java index a7abf452442..79920c8b5aa 100644 --- a/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandlerTests.java +++ b/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandlerTests.java @@ -66,12 +66,12 @@ void testMessageHandlerForPair() { messageHandler.setBeanFactory(mock(BeanFactory.class)); messageHandler.setSocketConfigurer(s -> s.setZapDomain("global")); messageHandler.afterPropertiesSet(); + messageHandler.start(); @SuppressWarnings("unchecked") Mono socketMono = TestUtils.getPropertyValue(messageHandler, "socketMono", Mono.class); ZMQ.Socket socketInUse = socketMono.block(Duration.ofSeconds(10)); assertThat(socketInUse.getZapDomain()).isEqualTo("global"); - messageHandler.start(); Message testMessage = new GenericMessage<>("test"); messageHandler.handleMessage(testMessage).subscribe(); From 7937f1c08fe005be02b6963a241311114725106b Mon Sep 17 00:00:00 2001 From: Alessio Matricardi Date: Mon, 17 Jun 2024 22:47:55 +0200 Subject: [PATCH 08/14] fix typo --- .../org/springframework/integration/zeromq/ZeroMqHeaders.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqHeaders.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqHeaders.java index 838ed6be9d5..506e68e00b1 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqHeaders.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqHeaders.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 the original author or authors. + * Copyright 2020-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,7 @@ package org.springframework.integration.zeromq; /** - * The message headers constants to repsent ZeroMq message attributes. + * The message headers constants to represent ZeroMq message attributes. * * @author Artem Bilan * From d0c1e07c541a49df39ac23d5b867e3ba652f2ed5 Mon Sep 17 00:00:00 2001 From: Alessio Matricardi Date: Mon, 17 Jun 2024 22:48:08 +0200 Subject: [PATCH 09/14] add new updates to whats-new.adoc --- src/reference/antora/modules/ROOT/pages/whats-new.adoc | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/reference/antora/modules/ROOT/pages/whats-new.adoc b/src/reference/antora/modules/ROOT/pages/whats-new.adoc index 51e10351291..9328aec8297 100644 --- a/src/reference/antora/modules/ROOT/pages/whats-new.adoc +++ b/src/reference/antora/modules/ROOT/pages/whats-new.adoc @@ -30,4 +30,9 @@ The references stay in cache because polling configuration does not allow to pro The `LobHandler` (and respective API) has been deprecated for removal in Spring Framework `6.2`. Respective option on `JdbcMessageStore` (and similar) have been deprecated as well. -The byte array handling for serialized message is fully deferred to JDBC driver. \ No newline at end of file +The byte array handling for serialized message is fully deferred to JDBC driver. + +[[x6.4-zeromq-changes]] +=== ZeroMQ Changes + +The outbound component `ZeroMqMessageHandler` (and respective API) can now bind a TCP port instead of connecting to a given URL. \ No newline at end of file From f8ecae039a5016f5d6cc8d85ba56a98dc4c43f7a Mon Sep 17 00:00:00 2001 From: Alessio Matricardi Date: Thu, 20 Jun 2024 19:03:44 +0200 Subject: [PATCH 10/14] address ZeroMQUtils comments --- .../springframework/integration/zeromq/ZeroMqUtils.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqUtils.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqUtils.java index 465f8fe3198..4c94ed91655 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqUtils.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqUtils.java @@ -23,17 +23,14 @@ * * @author Alessio Matricardi * - * @since 6.4 */ public final class ZeroMqUtils { /** - * Binds the ZeroMq socket to the given port when different from 0 - * or to a random port if not. + * Bind the ZeroMq socket to the given port over the TCP transport protocol. * @param socket the ZeroMq socket - * @param port the port to bind ZeroMq socket to over TCP. + * @param port the port to bind ZeroMq socket to over TCP. If equal to 0, the socket will bind to a random port. * @return the effectively bound port - * @since 6.4 */ public static int bindSocket(ZMQ.Socket socket, int port) { if (port == 0) { From 7a1f7eebc55af0c149e941f834c10492f2e6c272 Mon Sep 17 00:00:00 2001 From: Alessio Matricardi Date: Thu, 20 Jun 2024 19:04:31 +0200 Subject: [PATCH 11/14] remove connectUrl and boundPort setters, add Javadoc to new constructors --- .../zeromq/outbound/ZeroMqMessageHandler.java | 44 +++++-------------- 1 file changed, 11 insertions(+), 33 deletions(-) diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java index c2170b91acf..8074d860d4a 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java @@ -103,10 +103,21 @@ public class ZeroMqMessageHandler extends AbstractReactiveMessageHandler @Nullable private Supplier connectUrl; + /** + * Create an instance based on the provided {@link ZContext}. + * @param context the {@link ZContext} to use for creating sockets. + * @since 6.4 + */ public ZeroMqMessageHandler(ZContext context) { this(context, SocketType.PAIR); } + /** + * Create an instance based on the provided {@link ZContext} and {@link SocketType}. + * @param context the {@link ZContext} to use for creating sockets. + * @param socketType the {@link SocketType} to use; + * only {@link SocketType#PAIR}, {@link SocketType#PUB} and {@link SocketType#PUSH} are supported. + */ public ZeroMqMessageHandler(ZContext context, SocketType socketType) { Assert.notNull(context, "'context' must not be null"); Assert.state(VALID_SOCKET_TYPES.contains(socketType), @@ -246,37 +257,6 @@ public void wrapTopic(boolean wrapTopic) { this.wrapTopic = wrapTopic; } - /** - * Configure an URL for {@link org.zeromq.ZMQ.Socket#connect(String)}. - * Mutually exclusive with the {@link #setBindPort(int)}. - * @param connectUrl the URL to connect ZeroMq socket to. - * @since 6.4 - */ - public void setConnectUrl(@Nullable String connectUrl) { - this.setConnectUrl(() -> connectUrl); - } - - /** - * Configure an URL supplier for {@link org.zeromq.ZMQ.Socket#connect(String)}. - * Mutually exclusive with the {@link #setBindPort(int)}. - * @param connectUrl the supplier for URL to connect the socket to. - * @since 6.4 - */ - public void setConnectUrl(@Nullable Supplier connectUrl) { - this.connectUrl = connectUrl; - } - - /** - * Configure a port for TCP protocol binding via {@link org.zeromq.ZMQ.Socket#bind(String)}. - * Mutually exclusive with the {@link #setConnectUrl(String)}. - * @param port the port to bind ZeroMq socket to over TCP. - * @since 6.4 - */ - public void setBindPort(int port) { - Assert.isTrue(port > 0, "'port' must not be zero or negative"); - this.bindPort.set(port); - } - /** * Return the port a socket is bound or 0 if this message producer has not been started yet * or the socket is connected - not bound. @@ -295,8 +275,6 @@ public String getComponentType() { @Override protected void onInit() { super.onInit(); - Assert.state(this.connectUrl == null || this.bindPort.get() == 0, - "Only one of the 'connectUrl' or 'bindPort' must be provided"); BeanFactory beanFactory = getBeanFactory(); this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(beanFactory); if (this.messageMapper == null) { From bcac0a753ecd92b0700ac514e712486c1755a4aa Mon Sep 17 00:00:00 2001 From: Alessio Matricardi Date: Thu, 20 Jun 2024 19:05:18 +0200 Subject: [PATCH 12/14] add new DLS constructor for random port --- .../zeromq/dsl/ZeroMqMessageHandlerSpec.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java index 570e7531a4c..ab555823720 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java @@ -52,6 +52,15 @@ protected ZeroMqMessageHandlerSpec(ZContext context, String connectUrl) { this(context, () -> connectUrl); } + /** + * Create an instance based on the provided {@link ZContext}. + * @param context the {@link ZContext} to use for creating sockets. + * @since 6.4 + */ + protected ZeroMqMessageHandlerSpec(ZContext context) { + this(context, SocketType.PAIR); + } + /** * Create an instance based on the provided {@link ZContext} and binding port. * @param context the {@link ZContext} to use for creating sockets. @@ -83,6 +92,17 @@ protected ZeroMqMessageHandlerSpec(ZContext context, String connectUrl, SocketTy this(context, () -> connectUrl, socketType); } + /** + * Create an instance based on the provided {@link ZContext} and {@link SocketType}. + * @param context the {@link ZContext} to use for creating sockets. + * @param socketType the {@link SocketType} to use; + * only {@link SocketType#PAIR}, {@link SocketType#PUB} and {@link SocketType#PUSH} are supported. + * @since 6.4 + */ + protected ZeroMqMessageHandlerSpec(ZContext context, SocketType socketType) { + super(new ZeroMqMessageHandler(context, socketType)); + } + /** * Create an instance based on the provided {@link ZContext}, binding port and {@link SocketType}. * @param context the {@link ZContext} to use for creating sockets. From 1348389bb953e3a3823fb96725913193f834df85 Mon Sep 17 00:00:00 2001 From: Alessio Matricardi Date: Sat, 29 Jun 2024 18:05:00 +0200 Subject: [PATCH 13/14] add since closure in ZeroMqUtils --- .../org/springframework/integration/zeromq/ZeroMqUtils.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqUtils.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqUtils.java index 4c94ed91655..4863f7329b1 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqUtils.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqUtils.java @@ -23,6 +23,8 @@ * * @author Alessio Matricardi * + * @since 6.4 + * */ public final class ZeroMqUtils { From 920c1279475db8897344a907d438f84bb4ad717b Mon Sep 17 00:00:00 2001 From: Alessio Matricardi Date: Sat, 29 Jun 2024 18:16:02 +0200 Subject: [PATCH 14/14] add DSL support methods and specific that, when not defined, the socket will be bound to a random port --- .../integration/zeromq/dsl/ZeroMq.java | 23 +++++++++++++++++++ .../zeromq/dsl/ZeroMqMessageHandlerSpec.java | 2 ++ 2 files changed, 25 insertions(+) diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMq.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMq.java index 6b02763da2e..8aa72615f47 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMq.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMq.java @@ -96,6 +96,29 @@ public static ZeroMqMessageHandlerSpec outboundChannelAdapter(ZContext context, return new ZeroMqMessageHandlerSpec(context, connectUrl, socketType); } + /** + * Create an instance of {@link ZeroMqMessageHandlerSpec} for the provided {@link ZContext}. + * The created socket will be bound to a random port. + * @param context the {@link ZContext} to use. + * @return the spec. + * @since 6.4 + */ + public static ZeroMqMessageHandlerSpec outboundChannelAdapter(ZContext context) { + return new ZeroMqMessageHandlerSpec(context); + } + + /** + * Create an instance of {@link ZeroMqMessageHandlerSpec} for the provided {@link ZContext} and {@link SocketType}. + * The created socket will be bound to a random port. + * @param context the {@link ZContext} to use. + * @param socketType the {@link SocketType} for ZeroMq socket. + * @return the spec. + * @since 6.4 + */ + public static ZeroMqMessageHandlerSpec outboundChannelAdapter(ZContext context, SocketType socketType) { + return new ZeroMqMessageHandlerSpec(context, socketType); + } + /** * Create an instance of {@link ZeroMqMessageHandlerSpec} for the provided {@link ZContext}, binding port * and {@link SocketType}. diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java index ab555823720..393d1ca3bc4 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java @@ -54,6 +54,7 @@ protected ZeroMqMessageHandlerSpec(ZContext context, String connectUrl) { /** * Create an instance based on the provided {@link ZContext}. + * The created socket will be bound to a random port. * @param context the {@link ZContext} to use for creating sockets. * @since 6.4 */ @@ -94,6 +95,7 @@ protected ZeroMqMessageHandlerSpec(ZContext context, String connectUrl, SocketTy /** * Create an instance based on the provided {@link ZContext} and {@link SocketType}. + * The created socket will be bound to a random port. * @param context the {@link ZContext} to use for creating sockets. * @param socketType the {@link SocketType} to use; * only {@link SocketType#PAIR}, {@link SocketType#PUB} and {@link SocketType#PUSH} are supported.