Skip to content

Commit 0290412

Browse files
committed
Add RSocketRequest.Builder in Spring Messaging
Prior to this commit, `RSocketRequester` would have a single `RSocketRequester.create` static method taking a fully built `RSocket` as an argument. Developers need to build an `RSocket` instance using the `RSocketFactory` and then use it to create a requester. To help developers set up a requester, this commit adds a new `RSocketRequester.Builder` interface and implementation. The `RSocket` building phase and codecs configuration are part of a single call chain. Subscribing to the returned `Mono<RSocketRequester>` will configure and connect to the remote RSocket server. This design should be improved in gh-22798, since we will need to support metadata in a broader fashion. Closes gh-22806
1 parent 900abfc commit 0290412

File tree

5 files changed

+257
-14
lines changed

5 files changed

+257
-14
lines changed

spring-messaging/spring-messaging.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ dependencies {
1616
optional(project(":spring-oxm"))
1717
optional("io.projectreactor.netty:reactor-netty")
1818
optional("io.rsocket:rsocket-core:${rsocketVersion}")
19+
optional("io.rsocket:rsocket-transport-netty:${rsocketVersion}")
1920
optional("com.fasterxml.jackson.core:jackson-databind:${jackson2Version}")
2021
optional("javax.xml.bind:jaxb-api:2.3.1")
2122
testCompile("javax.inject:javax.inject-tck:1")
@@ -29,7 +30,6 @@ dependencies {
2930
testCompile("org.apache.activemq:activemq-stomp:5.8.0")
3031
testCompile("io.projectreactor:reactor-test")
3132
testCompile "io.reactivex.rxjava2:rxjava:${rxjava2Version}"
32-
testCompile("io.rsocket:rsocket-transport-netty:${rsocketVersion}")
3333
testCompile("org.jetbrains.kotlin:kotlin-reflect:${kotlinVersion}")
3434
testCompile("org.jetbrains.kotlin:kotlin-stdlib:${kotlinVersion}")
3535
testCompile("org.xmlunit:xmlunit-matchers:2.6.2")
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright 2002-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.messaging.rsocket;
18+
19+
import java.net.URI;
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import java.util.function.Consumer;
23+
24+
import io.rsocket.RSocketFactory;
25+
import io.rsocket.transport.ClientTransport;
26+
import io.rsocket.transport.netty.client.TcpClientTransport;
27+
import io.rsocket.transport.netty.client.WebsocketClientTransport;
28+
import reactor.core.publisher.Mono;
29+
30+
import org.springframework.lang.Nullable;
31+
import org.springframework.util.MimeType;
32+
33+
/**
34+
* Default implementation of {@link RSocketRequester.Builder}.
35+
*
36+
* @author Brian Clozel
37+
* @since 5.2
38+
*/
39+
final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
40+
41+
@Nullable
42+
private List<Consumer<RSocketFactory.ClientRSocketFactory>> factoryConfigurers = new ArrayList<>();
43+
44+
@Nullable
45+
private List<Consumer<RSocketStrategies.Builder>> strategiesConfigurers = new ArrayList<>();
46+
47+
@Override
48+
public RSocketRequester.Builder rsocketFactory(Consumer<RSocketFactory.ClientRSocketFactory> configurer) {
49+
this.factoryConfigurers.add(configurer);
50+
return this;
51+
}
52+
53+
@Override
54+
public RSocketRequester.Builder rsocketStrategies(Consumer<RSocketStrategies.Builder> configurer) {
55+
this.strategiesConfigurers.add(configurer);
56+
return this;
57+
}
58+
59+
@Override
60+
public Mono<RSocketRequester> connect(ClientTransport transport, MimeType dataMimeType) {
61+
return Mono.defer(() -> {
62+
RSocketStrategies.Builder strategiesBuilder = RSocketStrategies.builder();
63+
this.strategiesConfigurers.forEach(configurer -> configurer.accept(strategiesBuilder));
64+
RSocketFactory.ClientRSocketFactory clientFactory = RSocketFactory.connect()
65+
.dataMimeType(dataMimeType.toString());
66+
this.factoryConfigurers.forEach(configurer -> configurer.accept(clientFactory));
67+
return clientFactory.transport(transport).start()
68+
.map(rsocket -> RSocketRequester.create(rsocket, dataMimeType, strategiesBuilder.build()));
69+
});
70+
}
71+
72+
@Override
73+
public Mono<RSocketRequester> connectTcp(String host, int port, MimeType dataMimeType) {
74+
return connect(TcpClientTransport.create(host, port), dataMimeType);
75+
}
76+
77+
@Override
78+
public Mono<RSocketRequester> connectWebSocket(URI uri, MimeType dataMimeType) {
79+
return connect(WebsocketClientTransport.create(uri), dataMimeType);
80+
}
81+
82+
}

spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,12 @@
1616

1717
package org.springframework.messaging.rsocket;
1818

19+
import java.net.URI;
20+
import java.util.function.Consumer;
21+
1922
import io.rsocket.RSocket;
23+
import io.rsocket.RSocketFactory;
24+
import io.rsocket.transport.ClientTransport;
2025
import org.reactivestreams.Publisher;
2126
import reactor.core.publisher.Flux;
2227
import reactor.core.publisher.Mono;
@@ -32,6 +37,7 @@
3237
* methods specify routing and other metadata.
3338
*
3439
* @author Rossen Stoyanchev
40+
* @author Brian Clozel
3541
* @since 5.2
3642
*/
3743
public interface RSocketRequester {
@@ -55,6 +61,61 @@ static RSocketRequester create(RSocket rsocket, @Nullable MimeType dataMimeType,
5561
return new DefaultRSocketRequester(rsocket, dataMimeType, strategies);
5662
}
5763

64+
/**
65+
* Obtain a {@code RSocketRequester} builder.
66+
*/
67+
static RSocketRequester.Builder builder() {
68+
return new DefaultRSocketRequesterBuilder();
69+
}
70+
71+
/**
72+
* A mutable builder for creating a client {@link RSocketRequester}.
73+
*/
74+
interface Builder {
75+
76+
/**
77+
* Configure the client {@code RSocketFactory}. This is useful for
78+
* customizing protocol options and add RSocket plugins.
79+
* @param configurer the configurer to apply
80+
*/
81+
RSocketRequester.Builder rsocketFactory(
82+
Consumer<RSocketFactory.ClientRSocketFactory> configurer);
83+
84+
/**
85+
* Configure the builder for {@link RSocketStrategies}.
86+
* <p>The builder starts with an empty {@code RSocketStrategies}.
87+
* @param configurer the configurer to apply
88+
*/
89+
RSocketRequester.Builder rsocketStrategies(Consumer<RSocketStrategies.Builder> configurer);
90+
91+
/**
92+
* Configure the {@code ClientTransport} for the RSocket connection
93+
* and connect to the RSocket server
94+
* @param transport the chosen client transport
95+
* @return a mono containing the connected {@code RSocketRequester}
96+
*/
97+
Mono<RSocketRequester> connect(ClientTransport transport, MimeType dataMimeType);
98+
99+
/**
100+
* Connect to the RSocket server over TCP transport using the
101+
* provided connection parameters
102+
* @param host the RSocket server host
103+
* @param port the RSocket server port
104+
* @param dataMimeType the data MimeType
105+
* @return a mono containing the connected {@code RSocketRequester}
106+
*/
107+
Mono<RSocketRequester> connectTcp(String host, int port, MimeType dataMimeType);
108+
109+
/**
110+
* Connect to the RSocket server over WebSocket transport using the
111+
* provided connection parameters
112+
* @param uri the RSocket server endpoint URI
113+
* @param dataMimeType the data MimeType
114+
* @return a mono containing the connected {@code RSocketRequester}
115+
*/
116+
Mono<RSocketRequester> connectWebSocket(URI uri, MimeType dataMimeType);
117+
118+
}
58119

59120
// For now we treat metadata as a simple string that is the route.
60121
// This will change after the resolution of:
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright 2002-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.messaging.rsocket;
18+
19+
import java.util.function.Consumer;
20+
21+
import io.netty.buffer.ByteBuf;
22+
import io.rsocket.DuplexConnection;
23+
import io.rsocket.RSocketFactory;
24+
import io.rsocket.transport.ClientTransport;
25+
import org.junit.Before;
26+
import org.junit.Test;
27+
import org.reactivestreams.Publisher;
28+
import reactor.core.publisher.Flux;
29+
import reactor.core.publisher.Mono;
30+
31+
import org.springframework.util.MimeTypeUtils;
32+
33+
import static org.mockito.ArgumentMatchers.any;
34+
import static org.mockito.ArgumentMatchers.anyInt;
35+
import static org.mockito.Mockito.mock;
36+
import static org.mockito.Mockito.verify;
37+
import static org.mockito.Mockito.verifyZeroInteractions;
38+
import static org.mockito.Mockito.when;
39+
40+
/**
41+
* Unit tests for {@link DefaultRSocketRequesterBuilder}.
42+
*
43+
* @author Brian Clozel
44+
*/
45+
public class DefaultRSocketRequesterBuilderTests {
46+
47+
private ClientTransport transport;
48+
49+
@Before
50+
public void setup() {
51+
this.transport = mock(ClientTransport.class);
52+
when(this.transport.connect(anyInt())).thenReturn(Mono.just(new MockConnection()));
53+
}
54+
55+
@SuppressWarnings("unchecked")
56+
@Test
57+
public void shouldApplyCustomizationsAtSubscription() {
58+
Consumer<RSocketFactory.ClientRSocketFactory> factoryConfigurer = mock(Consumer.class);
59+
Consumer<RSocketStrategies.Builder> strategiesConfigurer = mock(Consumer.class);
60+
Mono<RSocketRequester> requester = RSocketRequester.builder()
61+
.rsocketFactory(factoryConfigurer)
62+
.rsocketStrategies(strategiesConfigurer)
63+
.connect(this.transport, MimeTypeUtils.APPLICATION_JSON);
64+
verifyZeroInteractions(this.transport, factoryConfigurer, strategiesConfigurer);
65+
}
66+
67+
@SuppressWarnings("unchecked")
68+
@Test
69+
public void shouldApplyCustomizations() {
70+
Consumer<RSocketFactory.ClientRSocketFactory> factoryConfigurer = mock(Consumer.class);
71+
Consumer<RSocketStrategies.Builder> strategiesConfigurer = mock(Consumer.class);
72+
RSocketRequester requester = RSocketRequester.builder()
73+
.rsocketFactory(factoryConfigurer)
74+
.rsocketStrategies(strategiesConfigurer)
75+
.connect(this.transport, MimeTypeUtils.APPLICATION_JSON)
76+
.block();
77+
verify(this.transport).connect(anyInt());
78+
verify(factoryConfigurer).accept(any(RSocketFactory.ClientRSocketFactory.class));
79+
verify(strategiesConfigurer).accept(any(RSocketStrategies.Builder.class));
80+
}
81+
82+
static class MockConnection implements DuplexConnection {
83+
84+
@Override
85+
public Mono<Void> send(Publisher<ByteBuf> frames) {
86+
return Mono.empty();
87+
}
88+
89+
@Override
90+
public Flux<ByteBuf> receive() {
91+
return Flux.empty();
92+
}
93+
94+
@Override
95+
public Mono<Void> onClose() {
96+
return Mono.empty();
97+
}
98+
99+
@Override
100+
public void dispose() {
101+
102+
}
103+
}
104+
105+
}

spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@
1919
import java.time.Duration;
2020

2121
import io.netty.buffer.PooledByteBufAllocator;
22-
import io.rsocket.RSocket;
2322
import io.rsocket.RSocketFactory;
2423
import io.rsocket.frame.decoder.PayloadDecoder;
25-
import io.rsocket.transport.netty.client.TcpClientTransport;
2624
import io.rsocket.transport.netty.server.CloseableChannel;
2725
import io.rsocket.transport.netty.server.TcpServerTransport;
2826
import org.junit.AfterClass;
@@ -59,8 +57,6 @@ public class RSocketClientToServerIntegrationTests {
5957

6058
private static FireAndForgetCountingInterceptor interceptor = new FireAndForgetCountingInterceptor();
6159

62-
private static RSocket client;
63-
6460
private static RSocketRequester requester;
6561

6662

@@ -77,20 +73,19 @@ public static void setupOnce() {
7773
.start()
7874
.block();
7975

80-
client = RSocketFactory.connect()
81-
.dataMimeType(MimeTypeUtils.TEXT_PLAIN_VALUE)
82-
.frameDecoder(PayloadDecoder.ZERO_COPY)
83-
.transport(TcpClientTransport.create("localhost", 7000))
84-
.start()
76+
requester = RSocketRequester.builder()
77+
.rsocketFactory(factory -> factory.frameDecoder(PayloadDecoder.ZERO_COPY))
78+
.rsocketStrategies(strategies -> strategies
79+
.decoder(StringDecoder.allMimeTypes())
80+
.encoder(CharSequenceEncoder.allMimeTypes())
81+
.dataBufferFactory(new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT)))
82+
.connectTcp("localhost", 7000, MimeTypeUtils.TEXT_PLAIN)
8583
.block();
86-
87-
requester = RSocketRequester.create(
88-
client, MimeTypeUtils.TEXT_PLAIN, context.getBean(RSocketStrategies.class));
8984
}
9085

9186
@AfterClass
9287
public static void tearDownOnce() {
93-
client.dispose();
88+
requester.rsocket().dispose();
9489
server.dispose();
9590
}
9691

0 commit comments

Comments
 (0)