Skip to content

Commit 0b70862

Browse files
committed
Fix RSocket websocket config with WebFlux
In the case of a WebFlux + RSocket over websocket setup, the RSocket auto-configuration would not set up the required routes; only the websocket endpoint for RSocket would be available, overriding the handler configured for WebFlux. This commit introduces `NettyRouteProvider`. Components implementing that interface can contribute HTTP routes to the Reactor Netty server being built. * if none is provided, the regular handler setup is used * if one or more routes are provided, routes are sorted and added before the WebFlux handler (acting as a default) Fixes gh-16826
1 parent 45507c4 commit 0b70862

File tree

8 files changed

+247
-20
lines changed

8 files changed

+247
-20
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/rsocket/RSocketServerAutoConfiguration.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,6 @@
3838
import org.springframework.boot.rsocket.server.RSocketServerBootstrap;
3939
import org.springframework.boot.rsocket.server.RSocketServerFactory;
4040
import org.springframework.boot.rsocket.server.ServerRSocketFactoryCustomizer;
41-
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory;
42-
import org.springframework.boot.web.server.WebServerFactoryCustomizer;
4341
import org.springframework.context.annotation.Bean;
4442
import org.springframework.context.annotation.Conditional;
4543
import org.springframework.context.annotation.Configuration;
@@ -70,12 +68,11 @@ public class RSocketServerAutoConfiguration {
7068
static class WebFluxServerAutoConfiguration {
7169

7270
@Bean
73-
public WebServerFactoryCustomizer<NettyReactiveWebServerFactory> rSocketWebsocketCustomizer(
71+
public RSocketWebSocketNettyRouteProvider rSocketWebsocketRouteProvider(
7472
RSocketProperties properties,
7573
MessageHandlerAcceptor messageHandlerAcceptor) {
76-
RSocketNettyServerCustomizer customizer = new RSocketNettyServerCustomizer(
74+
return new RSocketWebSocketNettyRouteProvider(
7775
properties.getServer().getMappingPath(), messageHandlerAcceptor);
78-
return (factory) -> factory.addServerCustomizers(customizer);
7976
}
8077

8178
}
Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,34 +19,34 @@
1919
import io.rsocket.RSocketFactory;
2020
import io.rsocket.transport.ServerTransport;
2121
import io.rsocket.transport.netty.server.WebsocketRouteTransport;
22-
import reactor.netty.http.server.HttpServer;
22+
import reactor.netty.http.server.HttpServerRoutes;
2323

24-
import org.springframework.boot.web.embedded.netty.NettyServerCustomizer;
24+
import org.springframework.boot.web.embedded.netty.NettyRouteProvider;
2525
import org.springframework.messaging.rsocket.MessageHandlerAcceptor;
2626

2727
/**
28-
* {@link NettyServerCustomizer} that configures an RSocket Websocket endpoint.
28+
* {@link NettyRouteProvider} that configures an RSocket Websocket endpoint.
2929
*
3030
* @author Brian Clozel
3131
*/
32-
class RSocketNettyServerCustomizer implements NettyServerCustomizer {
32+
class RSocketWebSocketNettyRouteProvider implements NettyRouteProvider {
3333

3434
private final String mappingPath;
3535

3636
private final MessageHandlerAcceptor messageHandlerAcceptor;
3737

38-
RSocketNettyServerCustomizer(String mappingPath,
38+
RSocketWebSocketNettyRouteProvider(String mappingPath,
3939
MessageHandlerAcceptor messageHandlerAcceptor) {
4040
this.mappingPath = mappingPath;
4141
this.messageHandlerAcceptor = messageHandlerAcceptor;
4242
}
4343

4444
@Override
45-
public HttpServer apply(HttpServer httpServer) {
45+
public HttpServerRoutes apply(HttpServerRoutes httpServerRoutes) {
4646
ServerTransport.ConnectionAcceptor acceptor = RSocketFactory.receive()
4747
.acceptor(this.messageHandlerAcceptor).toConnectionAcceptor();
48-
return httpServer.route((routes) -> routes.ws(this.mappingPath,
49-
WebsocketRouteTransport.newHandler(acceptor)));
48+
return httpServerRoutes.ws(this.mappingPath,
49+
WebsocketRouteTransport.newHandler(acceptor));
5050
}
5151

5252
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/web/reactive/ReactiveWebServerFactoryConfiguration.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.springframework.boot.web.embedded.jetty.JettyReactiveWebServerFactory;
2828
import org.springframework.boot.web.embedded.jetty.JettyServerCustomizer;
2929
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory;
30+
import org.springframework.boot.web.embedded.netty.NettyRouteProvider;
3031
import org.springframework.boot.web.embedded.tomcat.TomcatConnectorCustomizer;
3132
import org.springframework.boot.web.embedded.tomcat.TomcatContextCustomizer;
3233
import org.springframework.boot.web.embedded.tomcat.TomcatProtocolHandlerCustomizer;
@@ -65,9 +66,12 @@ public ReactorResourceFactory reactorServerResourceFactory() {
6566

6667
@Bean
6768
public NettyReactiveWebServerFactory nettyReactiveWebServerFactory(
68-
ReactorResourceFactory resourceFactory) {
69+
ReactorResourceFactory resourceFactory,
70+
ObjectProvider<NettyRouteProvider> routes) {
6971
NettyReactiveWebServerFactory serverFactory = new NettyReactiveWebServerFactory();
7072
serverFactory.setResourceFactory(resourceFactory);
73+
routes.orderedStream()
74+
.forEach((route) -> serverFactory.addRouteProviders(route));
7175
return serverFactory;
7276
}
7377

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/rsocket/RSocketServerAutoConfigurationTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,7 @@ public void shouldCreateDefaultBeansForReactiveWebApp() {
7777
.withPropertyValues("spring.rsocket.server.transport=websocket",
7878
"spring.rsocket.server.mapping-path=/rsocket")
7979
.run((context) -> assertThat(context)
80-
.getBeanNames(WebServerFactoryCustomizer.class).hasSize(1)
81-
.containsOnly("rSocketWebsocketCustomizer"));
80+
.hasSingleBean(RSocketWebSocketNettyRouteProvider.class));
8281
}
8382

8483
@Test
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Copyright 2012-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.rsocket;
18+
19+
import java.net.URI;
20+
import java.time.Duration;
21+
22+
import org.junit.jupiter.api.Test;
23+
24+
import org.springframework.boot.autoconfigure.AutoConfigurations;
25+
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
26+
import org.springframework.boot.autoconfigure.http.codec.CodecsAutoConfiguration;
27+
import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration;
28+
import org.springframework.boot.autoconfigure.web.reactive.HttpHandlerAutoConfiguration;
29+
import org.springframework.boot.autoconfigure.web.reactive.WebFluxAutoConfiguration;
30+
import org.springframework.boot.autoconfigure.web.reactive.error.ErrorWebFluxAutoConfiguration;
31+
import org.springframework.boot.test.context.runner.ReactiveWebApplicationContextRunner;
32+
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory;
33+
import org.springframework.boot.web.reactive.context.AnnotationConfigReactiveWebServerApplicationContext;
34+
import org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext;
35+
import org.springframework.boot.web.server.WebServer;
36+
import org.springframework.context.ApplicationContext;
37+
import org.springframework.context.annotation.Bean;
38+
import org.springframework.context.annotation.Configuration;
39+
import org.springframework.http.MediaType;
40+
import org.springframework.messaging.handler.annotation.MessageMapping;
41+
import org.springframework.messaging.rsocket.RSocketRequester;
42+
import org.springframework.stereotype.Controller;
43+
import org.springframework.test.web.reactive.server.WebTestClient;
44+
import org.springframework.web.bind.annotation.GetMapping;
45+
import org.springframework.web.bind.annotation.ResponseBody;
46+
47+
import static org.assertj.core.api.Assertions.assertThat;
48+
49+
/**
50+
* Tests for {@link RSocketWebSocketNettyRouteProvider}.
51+
*
52+
* @author Brian Clozel
53+
*/
54+
55+
public class RSocketWebSocketNettyRouteProviderTests {
56+
57+
@Test
58+
public void webEndpointsShouldWork() throws Exception {
59+
new ReactiveWebApplicationContextRunner(
60+
AnnotationConfigReactiveWebServerApplicationContext::new)
61+
.withConfiguration(
62+
AutoConfigurations.of(HttpHandlerAutoConfiguration.class,
63+
WebFluxAutoConfiguration.class,
64+
ErrorWebFluxAutoConfiguration.class,
65+
PropertyPlaceholderAutoConfiguration.class,
66+
JacksonAutoConfiguration.class,
67+
CodecsAutoConfiguration.class,
68+
RSocketStrategiesAutoConfiguration.class,
69+
RSocketServerAutoConfiguration.class,
70+
RSocketMessagingAutoConfiguration.class,
71+
RSocketRequesterAutoConfiguration.class))
72+
.withUserConfiguration(WebConfiguration.class)
73+
.withPropertyValues("spring.rsocket.server.transport=websocket",
74+
"spring.rsocket.server.mapping-path=/rsocket")
75+
.run((context) -> {
76+
ReactiveWebServerApplicationContext serverContext = (ReactiveWebServerApplicationContext) context
77+
.getSourceApplicationContext();
78+
RSocketRequester requester = createRSocketRequester(context,
79+
serverContext.getWebServer());
80+
TestProtocol rsocketResponse = requester.route("websocket")
81+
.data(new TestProtocol("rsocket"))
82+
.retrieveMono(TestProtocol.class)
83+
.block(Duration.ofSeconds(3));
84+
assertThat(rsocketResponse.getName()).isEqualTo("rsocket");
85+
WebTestClient client = createWebTestClient(
86+
serverContext.getWebServer());
87+
client.get().uri("/protocol").exchange().expectStatus().isOk()
88+
.expectBody().jsonPath("name", "http");
89+
});
90+
}
91+
92+
private WebTestClient createWebTestClient(WebServer server) {
93+
return WebTestClient.bindToServer()
94+
.baseUrl("http://localhost:" + server.getPort()).build();
95+
}
96+
97+
private RSocketRequester createRSocketRequester(ApplicationContext context,
98+
WebServer server) {
99+
int port = server.getPort();
100+
RSocketRequester.Builder builder = context
101+
.getBean(RSocketRequester.Builder.class);
102+
return builder.connectWebSocket(URI.create("ws://localhost:" + port + "/rsocket"))
103+
.block();
104+
}
105+
106+
@Configuration(proxyBeanMethods = false)
107+
static class WebConfiguration {
108+
109+
@Bean
110+
public WebController webController() {
111+
return new WebController();
112+
}
113+
114+
@Bean
115+
public NettyReactiveWebServerFactory customServerFactory(
116+
RSocketWebSocketNettyRouteProvider routeProvider) {
117+
NettyReactiveWebServerFactory serverFactory = new NettyReactiveWebServerFactory(
118+
0);
119+
serverFactory.addRouteProviders(routeProvider);
120+
return serverFactory;
121+
}
122+
123+
}
124+
125+
@Controller
126+
static class WebController {
127+
128+
@GetMapping(path = "/protocol", produces = MediaType.APPLICATION_JSON_VALUE)
129+
@ResponseBody
130+
public TestProtocol testWebEndpoint() {
131+
return new TestProtocol("http");
132+
}
133+
134+
@MessageMapping("websocket")
135+
public TestProtocol testRSocketEndpoint() {
136+
return new TestProtocol("rsocket");
137+
}
138+
139+
}
140+
141+
static class TestProtocol {
142+
143+
private String name;
144+
145+
TestProtocol() {
146+
}
147+
148+
TestProtocol(String name) {
149+
this.name = name;
150+
}
151+
152+
public String getName() {
153+
return this.name;
154+
}
155+
156+
public void setName(String name) {
157+
this.name = name;
158+
}
159+
160+
}
161+
162+
}

spring-boot-project/spring-boot/src/main/java/org/springframework/boot/web/embedded/netty/NettyReactiveWebServerFactory.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ public class NettyReactiveWebServerFactory extends AbstractReactiveWebServerFact
4545

4646
private List<NettyServerCustomizer> serverCustomizers = new ArrayList<>();
4747

48+
private List<NettyRouteProvider> routeProviders = new ArrayList<>();
49+
4850
private Duration lifecycleTimeout;
4951

5052
private boolean useForwardHeaders;
@@ -63,7 +65,10 @@ public WebServer getWebServer(HttpHandler httpHandler) {
6365
HttpServer httpServer = createHttpServer();
6466
ReactorHttpHandlerAdapter handlerAdapter = new ReactorHttpHandlerAdapter(
6567
httpHandler);
66-
return new NettyWebServer(httpServer, handlerAdapter, this.lifecycleTimeout);
68+
NettyWebServer webServer = new NettyWebServer(httpServer, handlerAdapter,
69+
this.lifecycleTimeout);
70+
webServer.setRouteProviders(this.routeProviders);
71+
return webServer;
6772
}
6873

6974
/**
@@ -95,6 +100,16 @@ public void addServerCustomizers(NettyServerCustomizer... serverCustomizers) {
95100
this.serverCustomizers.addAll(Arrays.asList(serverCustomizers));
96101
}
97102

103+
/**
104+
* Add {@link NettyRouteProvider}s that should be applied, in order, before the the
105+
* handler for the Spring application.
106+
* @param routeProviders the route providers to add
107+
*/
108+
public void addRouteProviders(NettyRouteProvider... routeProviders) {
109+
Assert.notNull(routeProviders, "NettyRouteProvider must not be null");
110+
this.routeProviders.addAll(Arrays.asList(routeProviders));
111+
}
112+
98113
/**
99114
* Set the maximum amount of time that should be waited when starting or stopping the
100115
* server.
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2012-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.web.embedded.netty;
18+
19+
import java.util.function.Function;
20+
21+
import reactor.netty.http.server.HttpServerRoutes;
22+
23+
/**
24+
* Function that can add new routes to an {@link HttpServerRoutes} instance.
25+
*
26+
* @author Brian Clozel
27+
* @see NettyReactiveWebServerFactory
28+
* @since 2.2.0
29+
*/
30+
@FunctionalInterface
31+
public interface NettyRouteProvider extends Function<HttpServerRoutes, HttpServerRoutes> {
32+
33+
}

spring-boot-project/spring-boot/src/main/java/org/springframework/boot/web/embedded/netty/NettyWebServer.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package org.springframework.boot.web.embedded.netty;
1818

1919
import java.time.Duration;
20+
import java.util.Collections;
21+
import java.util.List;
2022

2123
import org.apache.commons.logging.Log;
2224
import org.apache.commons.logging.LogFactory;
@@ -50,6 +52,8 @@ public class NettyWebServer implements WebServer {
5052

5153
private final Duration lifecycleTimeout;
5254

55+
private List<NettyRouteProvider> routeProviders = Collections.emptyList();
56+
5357
private DisposableServer disposableServer;
5458

5559
public NettyWebServer(HttpServer httpServer, ReactorHttpHandlerAdapter handlerAdapter,
@@ -61,6 +65,10 @@ public NettyWebServer(HttpServer httpServer, ReactorHttpHandlerAdapter handlerAd
6165
this.lifecycleTimeout = lifecycleTimeout;
6266
}
6367

68+
public void setRouteProviders(List<NettyRouteProvider> routeProviders) {
69+
this.routeProviders = routeProviders;
70+
}
71+
6472
@Override
6573
public void start() throws WebServerException {
6674
if (this.disposableServer == null) {
@@ -80,11 +88,20 @@ public void start() throws WebServerException {
8088
}
8189

8290
private DisposableServer startHttpServer() {
91+
HttpServer server = this.httpServer;
92+
if (this.routeProviders.isEmpty()) {
93+
server = server.handle(this.handlerAdapter);
94+
}
95+
else {
96+
server = server.route((routes) -> {
97+
this.routeProviders.forEach((provider) -> provider.apply(routes));
98+
routes.route((r) -> true, this.handlerAdapter);
99+
});
100+
}
83101
if (this.lifecycleTimeout != null) {
84-
return this.httpServer.handle(this.handlerAdapter)
85-
.bindNow(this.lifecycleTimeout);
102+
return server.bindNow(this.lifecycleTimeout);
86103
}
87-
return this.httpServer.handle(this.handlerAdapter).bindNow();
104+
return server.bindNow();
88105
}
89106

90107
private ChannelBindException findBindException(Exception ex) {

0 commit comments

Comments
 (0)