Skip to content

Commit 85c93f5

Browse files
committed
Polish Reactor Netty TCP client support
1 parent 870f61f commit 85c93f5

File tree

7 files changed

+165
-175
lines changed

7 files changed

+165
-175
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClient.java

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,10 @@
2929
import org.springframework.util.concurrent.ListenableFuture;
3030

3131
/**
32-
* A STOMP over TCP client that uses
33-
* {@link ReactorNettyTcpClient}.
32+
* A STOMP over TCP client that uses {@link ReactorNettyTcpClient}.
3433
*
3534
* @author Rossen Stoyanchev
36-
* @since 4.2
35+
* @since 5.0
3736
*/
3837
public class ReactorNettyTcpStompClient extends StompClientSupport {
3938

@@ -99,25 +98,21 @@ public void shutdown() {
9998
* Create a new {@link ReactorNettyTcpClient} with Stomp specific configuration for
10099
* encoding, decoding and hand-off.
101100
*
102-
* @param relayHost target host
103-
* @param relayPort target port
101+
* @param host target host
102+
* @param port target port
104103
* @param decoder {@link StompDecoder} to use
105104
* @return a new {@link TcpOperations}
106105
*/
107-
protected static TcpOperations<byte[]> create(String relayHost,
108-
int relayPort,
109-
StompDecoder decoder) {
110-
return new ReactorNettyTcpClient<>(relayHost,
111-
relayPort,
112-
new ReactorNettyTcpClient.MessageHandlerConfiguration<>(new DecodingFunction(
113-
decoder),
106+
protected static TcpOperations<byte[]> create(String host, int port, StompDecoder decoder) {
107+
return new ReactorNettyTcpClient<>(host, port,
108+
new ReactorNettyTcpClient.MessageHandlerConfiguration<>(
109+
new DecodingFunction(decoder),
114110
new EncodingConsumer(new StompEncoder()),
115111
128,
116112
Schedulers.newParallel("StompClient")));
117113
}
118114

119-
private static final class EncodingConsumer
120-
implements BiConsumer<ByteBuf, Message<byte[]>> {
115+
private static final class EncodingConsumer implements BiConsumer<ByteBuf, Message<byte[]>> {
121116

122117
private final StompEncoder encoder;
123118

@@ -127,12 +122,11 @@ public EncodingConsumer(StompEncoder encoder) {
127122

128123
@Override
129124
public void accept(ByteBuf byteBuf, Message<byte[]> message) {
130-
byteBuf.writeBytes(encoder.encode(message));
125+
byteBuf.writeBytes(this.encoder.encode(message));
131126
}
132127
}
133128

134-
private static final class DecodingFunction
135-
implements Function<ByteBuf, List<Message<byte[]>>> {
129+
private static final class DecodingFunction implements Function<ByteBuf, List<Message<byte[]>>> {
136130

137131
private final StompDecoder decoder;
138132

spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractMonoToListenableFutureAdapter.java

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package org.springframework.messaging.tcp.reactor;
1818

1919
import java.time.Duration;
20-
import java.util.Objects;
2120
import java.util.concurrent.ExecutionException;
2221
import java.util.concurrent.TimeUnit;
2322
import java.util.concurrent.TimeoutException;
@@ -33,51 +32,53 @@
3332
import org.springframework.util.concurrent.SuccessCallback;
3433

3534
/**
36-
* Adapts a reactor {@link Mono} to {@link ListenableFuture} optionally converting
37-
* the result Object type {@code <S>} to the expected target type {@code <T>}.
35+
* Adapts {@link Mono} to {@link ListenableFuture} optionally converting the
36+
* result Object type {@code <S>} to the expected target type {@code <T>}.
3837
*
3938
* @author Rossen Stoyanchev
40-
* @since 4.0
39+
* @since 5.0
4140
* @param <S> the type of object expected from the {@link Mono}
4241
* @param <T> the type of object expected from the {@link ListenableFuture}
4342
*/
44-
abstract class AbstractMonoToListenableFutureAdapter<S, T>
45-
implements ListenableFuture<T> {
43+
abstract class AbstractMonoToListenableFutureAdapter<S, T> implements ListenableFuture<T> {
4644

47-
private final MonoProcessor<S> promise;
45+
private final MonoProcessor<S> monoProcessor;
4846

4947
private final ListenableFutureCallbackRegistry<T> registry = new ListenableFutureCallbackRegistry<>();
5048

51-
protected AbstractMonoToListenableFutureAdapter(Mono<S> promise) {
52-
Assert.notNull(promise, "Mono must not be null");
53-
this.promise = promise.doOnSuccess(result -> {
54-
T adapted;
55-
try {
56-
adapted = adapt(result);
57-
}
58-
catch (Throwable ex) {
59-
registry.failure(ex);
60-
return;
61-
}
62-
registry.success(adapted);
63-
})
64-
.doOnError(registry::failure)
65-
.subscribe();
49+
50+
protected AbstractMonoToListenableFutureAdapter(Mono<S> mono) {
51+
Assert.notNull(mono, "'mono' must not be null");
52+
this.monoProcessor = mono
53+
.doOnSuccess(result -> {
54+
T adapted;
55+
try {
56+
adapted = adapt(result);
57+
}
58+
catch (Throwable ex) {
59+
registry.failure(ex);
60+
return;
61+
}
62+
registry.success(adapted);
63+
})
64+
.doOnError(this.registry::failure)
65+
.subscribe();
6666
}
6767

6868

6969
@Override
7070
public T get() throws InterruptedException {
71-
S result = this.promise.block();
71+
S result = this.monoProcessor.block();
7272
return adapt(result);
7373
}
7474

7575
@Override
76-
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
77-
Objects.requireNonNull(unit, "unit");
78-
S result = this.promise.block(Duration.ofMillis(TimeUnit.MILLISECONDS.convert(
79-
timeout,
80-
unit)));
76+
public T get(long timeout, TimeUnit unit)
77+
throws InterruptedException, ExecutionException, TimeoutException {
78+
79+
Assert.notNull(unit);
80+
Duration duration = Duration.ofMillis(TimeUnit.MILLISECONDS.convert(timeout, unit));
81+
S result = this.monoProcessor.block(duration);
8182
return adapt(result);
8283
}
8384

@@ -86,18 +87,18 @@ public boolean cancel(boolean mayInterruptIfRunning) {
8687
if (isCancelled()) {
8788
return false;
8889
}
89-
this.promise.cancel();
90+
this.monoProcessor.cancel();
9091
return true;
9192
}
9293

9394
@Override
9495
public boolean isCancelled() {
95-
return this.promise.isCancelled();
96+
return this.monoProcessor.isCancelled();
9697
}
9798

9899
@Override
99100
public boolean isDone() {
100-
return this.promise.isTerminated();
101+
return this.monoProcessor.isTerminated();
101102
}
102103

103104
@Override
@@ -111,7 +112,6 @@ public void addCallback(SuccessCallback<? super T> successCallback, FailureCallb
111112
this.registry.addFailureCallback(failureCallback);
112113
}
113114

114-
115115
protected abstract T adapt(S result);
116116

117117
}

spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/MonoToListenableFutureAdapter.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2015 the original author or authors.
2+
* Copyright 2002-2016 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,15 +19,14 @@
1919
import reactor.core.publisher.Mono;
2020

2121
/**
22-
* A Mono-to-ListenableFutureAdapter where the source and the target from
22+
* A Mono-to-ListenableFuture adapter where the source and the target from
2323
* the Promise and the ListenableFuture respectively are of the same type.
2424
*
2525
* @author Rossen Stoyanchev
2626
* @author Stephane Maldini
27-
* @since 4.0
27+
* @since 5.0
2828
*/
29-
class MonoToListenableFutureAdapter<T> extends
30-
AbstractMonoToListenableFutureAdapter<T, T> {
29+
class MonoToListenableFutureAdapter<T> extends AbstractMonoToListenableFutureAdapter<T, T> {
3130

3231

3332
public MonoToListenableFutureAdapter(Mono<T> mono) {

0 commit comments

Comments
 (0)