Skip to content

Commit b874692

Browse files
committed
Simplify ReactorNettyTcpClient input
Create a ReactorNettyCodec to hold the decoding and encoding function and consumer along with a package-private sub-class that delegates to StompDecoder and StompEncoder. Issue: SPR-14531
1 parent 85c93f5 commit b874692

File tree

5 files changed

+141
-117
lines changed

5 files changed

+141
-117
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClient.java

Lines changed: 5 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,9 @@
1616

1717
package org.springframework.messaging.simp.stomp;
1818

19-
import java.util.List;
20-
import java.util.function.BiConsumer;
21-
import java.util.function.Function;
22-
23-
import io.netty.buffer.ByteBuf;
24-
import reactor.core.scheduler.Schedulers;
25-
26-
import org.springframework.messaging.Message;
2719
import org.springframework.messaging.tcp.TcpOperations;
2820
import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient;
21+
import org.springframework.util.Assert;
2922
import org.springframework.util.concurrent.ListenableFuture;
3023

3124
/**
@@ -38,28 +31,29 @@ public class ReactorNettyTcpStompClient extends StompClientSupport {
3831

3932
private final TcpOperations<byte[]> tcpClient;
4033

34+
4135
/**
4236
* Create an instance with host "127.0.0.1" and port 61613.
4337
*/
4438
public ReactorNettyTcpStompClient() {
4539
this("127.0.0.1", 61613);
4640
}
4741

48-
4942
/**
5043
* Create an instance with the given host and port.
5144
* @param host the host
5245
* @param port the port
5346
*/
54-
public ReactorNettyTcpStompClient(final String host, final int port) {
55-
this.tcpClient = create(host, port, new StompDecoder());
47+
public ReactorNettyTcpStompClient(String host, int port) {
48+
this.tcpClient = new ReactorNettyTcpClient<byte[]>(host, port, new StompReactorNettyCodec());
5649
}
5750

5851
/**
5952
* Create an instance with a pre-configured TCP client.
6053
* @param tcpClient the client to use
6154
*/
6255
public ReactorNettyTcpStompClient(TcpOperations<byte[]> tcpClient) {
56+
Assert.notNull(tcpClient, "'tcpClient' is required");
6357
this.tcpClient = tcpClient;
6458
}
6559

@@ -94,49 +88,4 @@ public void shutdown() {
9488
this.tcpClient.shutdown();
9589
}
9690

97-
/**
98-
* Create a new {@link ReactorNettyTcpClient} with Stomp specific configuration for
99-
* encoding, decoding and hand-off.
100-
*
101-
* @param host target host
102-
* @param port target port
103-
* @param decoder {@link StompDecoder} to use
104-
* @return a new {@link TcpOperations}
105-
*/
106-
protected static TcpOperations<byte[]> create(String host, int port, StompDecoder decoder) {
107-
return new ReactorNettyTcpClient<>(host, port,
108-
new ReactorNettyTcpClient.MessageHandlerConfiguration<>(
109-
new DecodingFunction(decoder),
110-
new EncodingConsumer(new StompEncoder()),
111-
128,
112-
Schedulers.newParallel("StompClient")));
113-
}
114-
115-
private static final class EncodingConsumer implements BiConsumer<ByteBuf, Message<byte[]>> {
116-
117-
private final StompEncoder encoder;
118-
119-
public EncodingConsumer(StompEncoder encoder) {
120-
this.encoder = encoder;
121-
}
122-
123-
@Override
124-
public void accept(ByteBuf byteBuf, Message<byte[]> message) {
125-
byteBuf.writeBytes(this.encoder.encode(message));
126-
}
127-
}
128-
129-
private static final class DecodingFunction implements Function<ByteBuf, List<Message<byte[]>>> {
130-
131-
private final StompDecoder decoder;
132-
133-
public DecodingFunction(StompDecoder decoder) {
134-
this.decoder = decoder;
135-
}
136-
137-
@Override
138-
public List<Message<byte[]>> apply(ByteBuf buffer) {
139-
return this.decoder.decode(buffer.nioBuffer());
140-
}
141-
}
14291
}

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.springframework.messaging.tcp.TcpConnection;
4141
import org.springframework.messaging.tcp.TcpConnectionHandler;
4242
import org.springframework.messaging.tcp.TcpOperations;
43+
import org.springframework.messaging.tcp.reactor.ReactorNettyCodec;
4344
import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient;
4445
import org.springframework.util.Assert;
4546
import org.springframework.util.concurrent.ListenableFuture;
@@ -387,8 +388,8 @@ protected void startInternal() {
387388
if (this.tcpClient == null) {
388389
StompDecoder decoder = new StompDecoder();
389390
decoder.setHeaderInitializer(getHeaderInitializer());
390-
391-
this.tcpClient = ReactorNettyTcpStompClient.create(this.relayHost, this.relayPort, decoder);
391+
ReactorNettyCodec<byte[]> codec = new StompReactorNettyCodec(decoder);
392+
this.tcpClient = new ReactorNettyTcpClient<>(this.relayHost, this.relayPort, codec);
392393
}
393394

394395
if (logger.isInfoEnabled()) {
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2002-2016 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.messaging.simp.stomp;
17+
18+
import org.springframework.messaging.tcp.reactor.ReactorNettyCodec;
19+
20+
/**
21+
* {@code ReactorNettyCodec} that delegates to {@link StompDecoder} and
22+
* {@link StompEncoder}.
23+
*
24+
* @author Rossen Stoyanchev
25+
* @since 5.0
26+
*/
27+
class StompReactorNettyCodec extends ReactorNettyCodec<byte[]> {
28+
29+
public StompReactorNettyCodec() {
30+
this(new StompDecoder(), new StompEncoder());
31+
}
32+
33+
public StompReactorNettyCodec(StompDecoder decoder) {
34+
this(decoder, new StompEncoder());
35+
}
36+
37+
public StompReactorNettyCodec(StompDecoder decoder, StompEncoder encoder) {
38+
super(byteBuf -> decoder.decode(byteBuf.nioBuffer()),
39+
(byteBuf, message) -> byteBuf.writeBytes(encoder.encode(message)));
40+
}
41+
42+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2002-2016 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.messaging.tcp.reactor;
17+
18+
import java.util.Collection;
19+
import java.util.function.BiConsumer;
20+
import java.util.function.Function;
21+
22+
import io.netty.buffer.ByteBuf;
23+
24+
import org.springframework.messaging.Message;
25+
import org.springframework.util.Assert;
26+
27+
/**
28+
* Simple holder for a decoding {@link Function} and an encoding
29+
* {@link BiConsumer} to use with Reactor Netty.
30+
*
31+
* @author Rossen Stoyanchev
32+
* @since 5.0
33+
*/
34+
public class ReactorNettyCodec<P> {
35+
36+
private final Function<? super ByteBuf, ? extends Collection<Message<P>>> decoder;
37+
38+
private final BiConsumer<? super ByteBuf, ? super Message<P>> encoder;
39+
40+
41+
public ReactorNettyCodec(Function<? super ByteBuf, ? extends Collection<Message<P>>> decoder,
42+
BiConsumer<? super ByteBuf, ? super Message<P>> encoder) {
43+
44+
Assert.notNull(decoder, "'decoder' is required");
45+
Assert.notNull(encoder, "'encoder' is required");
46+
this.decoder = decoder;
47+
this.encoder = encoder;
48+
}
49+
50+
public Function<? super ByteBuf, ? extends Collection<Message<P>>> getDecoder() {
51+
return this.decoder;
52+
}
53+
54+
public BiConsumer<? super ByteBuf, ? super Message<P>> getEncoder() {
55+
return this.encoder;
56+
}
57+
58+
}

0 commit comments

Comments
 (0)