18
18
19
19
import java .net .InetSocketAddress ;
20
20
import java .nio .channels .ClosedChannelException ;
21
- import java .time .Duration ;
22
21
import java .util .Arrays ;
23
22
import java .util .concurrent .Callable ;
24
23
38
37
import org .junit .jupiter .api .Test ;
39
38
import org .mockito .InOrder ;
40
39
import reactor .core .publisher .Mono ;
40
+ import reactor .netty .http .client .HttpClient ;
41
41
import reactor .netty .tcp .TcpClient ;
42
42
import reactor .test .StepVerifier ;
43
43
44
44
import org .springframework .boot .rsocket .server .RSocketServer ;
45
- import org .springframework .boot .rsocket .server .RSocketServerCustomizer ;
46
45
import org .springframework .boot .rsocket .server .RSocketServer .Transport ;
46
+ import org .springframework .boot .rsocket .server .RSocketServerCustomizer ;
47
47
import org .springframework .boot .web .server .Ssl ;
48
48
import org .springframework .core .codec .CharSequenceEncoder ;
49
49
import org .springframework .core .codec .StringDecoder ;
55
55
56
56
import static org .assertj .core .api .Assertions .assertThat ;
57
57
import static org .assertj .core .api .Assertions .assertThatExceptionOfType ;
58
- import static org .assertj .core .api .Assertions .assertThatThrownBy ;
59
58
import static org .mockito .ArgumentMatchers .any ;
60
59
import static org .mockito .BDDMockito .will ;
61
60
import static org .mockito .Mockito .inOrder ;
@@ -74,10 +73,11 @@ class NettyRSocketServerFactoryTests {
74
73
75
74
private RSocketRequester requester ;
76
75
77
- private static final Duration TIMEOUT = Duration .ofSeconds (3 );
78
-
79
76
@ AfterEach
80
77
void tearDown () {
78
+ if (this .requester != null ) {
79
+ this .requester .rsocketClient ().dispose ();
80
+ }
81
81
if (this .server != null ) {
82
82
try {
83
83
this .server .stop ();
@@ -86,9 +86,6 @@ void tearDown() {
86
86
// Ignore
87
87
}
88
88
}
89
- if (this .requester != null ) {
90
- this .requester .rsocketClient ().dispose ();
91
- }
92
89
}
93
90
94
91
private NettyRSocketServerFactory getFactory () {
@@ -105,11 +102,9 @@ void specificPort() {
105
102
this .server .start ();
106
103
return port ;
107
104
});
108
- this .requester = createRSocketTcpClient (false );
109
- String payload = "test payload" ;
110
- String response = this .requester .route ("test" ).data (payload ).retrieveMono (String .class ).block (TIMEOUT );
105
+ this .requester = createRSocketTcpClient ();
111
106
assertThat (this .server .address ().getPort ()).isEqualTo (specificPort );
112
- assertThat ( response ). isEqualTo ( payload );
107
+ checkEchoRequest ( );
113
108
}
114
109
115
110
@ Test
@@ -118,10 +113,8 @@ void websocketTransport() {
118
113
factory .setTransport (RSocketServer .Transport .WEBSOCKET );
119
114
this .server = factory .create (new EchoRequestResponseAcceptor ());
120
115
this .server .start ();
121
- this .requester = createRSocketWebSocketClient (false );
122
- String payload = "test payload" ;
123
- String response = this .requester .route ("test" ).data (payload ).retrieveMono (String .class ).block (TIMEOUT );
124
- assertThat (response ).isEqualTo (payload );
116
+ this .requester = createRSocketWebSocketClient ();
117
+ checkEchoRequest ();
125
118
}
126
119
127
120
@ Test
@@ -133,10 +126,8 @@ void websocketTransportWithReactorResource() {
133
126
factory .setResourceFactory (resourceFactory );
134
127
this .server = factory .create (new EchoRequestResponseAcceptor ());
135
128
this .server .start ();
136
- this .requester = createRSocketWebSocketClient (false );
137
- String payload = "test payload" ;
138
- String response = this .requester .route ("test" ).data (payload ).retrieveMono (String .class ).block (TIMEOUT );
139
- assertThat (response ).isEqualTo (payload );
129
+ this .requester = createRSocketWebSocketClient ();
130
+ checkEchoRequest ();
140
131
}
141
132
142
133
@ Test
@@ -176,6 +167,12 @@ void websocketTransportBasicSslFromFileSystem() {
176
167
testBasicSslWithKeyStore ("src/test/resources/test.jks" , "password" , Transport .WEBSOCKET );
177
168
}
178
169
170
+ private void checkEchoRequest () {
171
+ String payload = "test payload" ;
172
+ Mono <String > response = this .requester .route ("test" ).data (payload ).retrieveMono (String .class );
173
+ StepVerifier .create (response ).expectNext (payload ).verifyComplete ();
174
+ }
175
+
179
176
private void testBasicSslWithKeyStore (String keyStore , String keyPassword , Transport transport ) {
180
177
NettyRSocketServerFactory factory = getFactory ();
181
178
factory .setTransport (transport );
@@ -185,11 +182,9 @@ private void testBasicSslWithKeyStore(String keyStore, String keyPassword, Trans
185
182
factory .setSsl (ssl );
186
183
this .server = factory .create (new EchoRequestResponseAcceptor ());
187
184
this .server .start ();
188
- this .requester = (transport == Transport .TCP ) ? createRSocketTcpClient (true )
189
- : createRSocketWebSocketClient (true );
190
- String payload = "test payload" ;
191
- Mono <String > responseMono = this .requester .route ("test" ).data (payload ).retrieveMono (String .class );
192
- StepVerifier .create (responseMono ).expectNext (payload ).verifyComplete ();
185
+ this .requester = (transport == Transport .TCP ) ? createSecureRSocketTcpClient ()
186
+ : createSecureRSocketWebSocketClient ();
187
+ checkEchoRequest ();
193
188
}
194
189
195
190
@ Test
@@ -202,48 +197,54 @@ void tcpTransportSslRejectsInsecureClient() {
202
197
factory .setSsl (ssl );
203
198
this .server = factory .create (new EchoRequestResponseAcceptor ());
204
199
this .server .start ();
205
- this .requester = createRSocketTcpClient (false );
200
+ this .requester = createRSocketTcpClient ();
206
201
String payload = "test payload" ;
207
202
Mono <String > responseMono = this .requester .route ("test" ).data (payload ).retrieveMono (String .class );
208
203
StepVerifier .create (responseMono )
209
204
.verifyErrorSatisfies ((ex ) -> assertThatExceptionOfType (ClosedChannelException .class ));
210
205
}
211
206
212
- @ Test
213
- void websocketTransportSslRejectsInsecureClient () {
214
- NettyRSocketServerFactory factory = getFactory ();
215
- factory .setTransport (Transport .WEBSOCKET );
216
- Ssl ssl = new Ssl ();
217
- ssl .setKeyStore ("classpath:test.jks" );
218
- ssl .setKeyPassword ("password" );
219
- factory .setSsl (ssl );
220
- this .server = factory .create (new EchoRequestResponseAcceptor ());
221
- this .server .start ();
222
- // For WebSocket, the SSL failure results in a hang on the initial connect call
223
- assertThatThrownBy (() -> createRSocketWebSocketClient (false )).isInstanceOf (IllegalStateException .class )
224
- .hasStackTraceContaining ("Timeout on blocking read" );
207
+ private RSocketRequester createRSocketTcpClient () {
208
+ return createRSocketRequesterBuilder ().transport (TcpClientTransport .create (createTcpClient ()));
209
+ }
210
+
211
+ private RSocketRequester createRSocketWebSocketClient () {
212
+ return createRSocketRequesterBuilder ().transport (WebsocketClientTransport .create (createHttpClient (), "/" ));
225
213
}
226
214
227
- private RSocketRequester createRSocketTcpClient (boolean ssl ) {
228
- TcpClient tcpClient = createTcpClient (ssl );
229
- return createRSocketRequesterBuilder ().connect (TcpClientTransport .create (tcpClient )).block (TIMEOUT );
215
+ private RSocketRequester createSecureRSocketTcpClient () {
216
+ return createRSocketRequesterBuilder ().transport (TcpClientTransport .create (createSecureTcpClient ()));
230
217
}
231
218
232
- private RSocketRequester createRSocketWebSocketClient ( boolean ssl ) {
233
- TcpClient tcpClient = createTcpClient ( ssl );
234
- return createRSocketRequesterBuilder (). connect (WebsocketClientTransport .create (tcpClient )). block ( TIMEOUT );
219
+ private RSocketRequester createSecureRSocketWebSocketClient ( ) {
220
+ return createRSocketRequesterBuilder ()
221
+ . transport (WebsocketClientTransport .create (createSecureHttpClient (), "/" ) );
235
222
}
236
223
237
- private TcpClient createTcpClient (boolean ssl ) {
224
+ private HttpClient createSecureHttpClient () {
225
+ HttpClient httpClient = createHttpClient ();
226
+ SslContextBuilder builder = SslContextBuilder .forClient ().sslProvider (SslProvider .JDK )
227
+ .trustManager (InsecureTrustManagerFactory .INSTANCE );
228
+ return httpClient .secure ((spec ) -> spec .sslContext (builder ));
229
+ }
230
+
231
+ private HttpClient createHttpClient () {
238
232
Assertions .assertThat (this .server ).isNotNull ();
239
233
InetSocketAddress address = this .server .address ();
240
- TcpClient tcpClient = TcpClient .create ().host (address .getHostName ()).port (address .getPort ());
241
- if (ssl ) {
242
- SslContextBuilder builder = SslContextBuilder .forClient ().sslProvider (SslProvider .JDK )
243
- .trustManager (InsecureTrustManagerFactory .INSTANCE );
244
- tcpClient = tcpClient .secure ((spec ) -> spec .sslContext (builder ));
245
- }
246
- return tcpClient ;
234
+ return HttpClient .create ().host (address .getHostName ()).port (address .getPort ());
235
+ }
236
+
237
+ private TcpClient createSecureTcpClient () {
238
+ TcpClient tcpClient = createTcpClient ();
239
+ SslContextBuilder builder = SslContextBuilder .forClient ().sslProvider (SslProvider .JDK )
240
+ .trustManager (InsecureTrustManagerFactory .INSTANCE );
241
+ return tcpClient .secure ((spec ) -> spec .sslContext (builder ));
242
+ }
243
+
244
+ private TcpClient createTcpClient () {
245
+ Assertions .assertThat (this .server ).isNotNull ();
246
+ InetSocketAddress address = this .server .address ();
247
+ return TcpClient .create ().host (address .getHostName ()).port (address .getPort ());
247
248
}
248
249
249
250
private RSocketRequester .Builder createRSocketRequesterBuilder () {
0 commit comments