Skip to content

Commit 6cd92c6

Browse files
committed
Refactoring in reactive WebSocketSession hierarchy
Expose bufferFactory() at the WebSocketSession level for creating payloads like ReactiveHttpOutputMessage does. Promote getId(), getUri(), and bufferFactory() to the base class WebSocketSessionSupport.
1 parent 140ff7c commit 6cd92c6

12 files changed

+70
-58
lines changed

spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import reactor.core.publisher.Flux;
2222
import reactor.core.publisher.Mono;
2323

24+
import org.springframework.core.io.buffer.DataBufferFactory;
25+
2426
/**
2527
* Representation for a WebSocket session.
2628
*
@@ -39,6 +41,12 @@ public interface WebSocketSession {
3941
*/
4042
URI getUri();
4143

44+
/**
45+
* Return a {@link DataBufferFactory} that can be used for creating message payloads.
46+
* @return a buffer factory
47+
*/
48+
DataBufferFactory bufferFactory();
49+
4250
/**
4351
* Get the flux of incoming messages.
4452
*/

spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@
2424
import reactor.core.publisher.Flux;
2525
import reactor.core.publisher.Mono;
2626

27+
import org.springframework.core.io.buffer.DataBufferFactory;
2728
import org.springframework.http.server.reactive.AbstractListenerReadPublisher;
2829
import org.springframework.http.server.reactive.AbstractListenerWriteProcessor;
29-
import org.springframework.util.Assert;
3030
import org.springframework.web.reactive.socket.CloseStatus;
3131
import org.springframework.web.reactive.socket.WebSocketMessage;
3232
import org.springframework.web.reactive.socket.WebSocketMessage.Type;
@@ -48,35 +48,17 @@ public abstract class AbstractListenerWebSocketSession<T> extends WebSocketSessi
4848
private static final int RECEIVE_BUFFER_SIZE = 8192;
4949

5050

51-
private final String id;
52-
53-
private final URI uri;
54-
5551
private final WebSocketReceivePublisher receivePublisher = new WebSocketReceivePublisher();
5652

5753
private volatile WebSocketSendProcessor sendProcessor;
5854

5955
private final AtomicBoolean sendCalled = new AtomicBoolean();
6056

6157

62-
public AbstractListenerWebSocketSession(T delegate, String id, URI uri) {
63-
super(delegate);
64-
Assert.notNull(id, "'id' is required.");
65-
Assert.notNull(uri, "'uri' is required.");
66-
this.id = id;
67-
this.uri = uri;
68-
}
69-
70-
71-
@Override
72-
public String getId() {
73-
return this.id;
58+
public AbstractListenerWebSocketSession(T delegate, String id, URI uri, DataBufferFactory bufferFactory) {
59+
super(delegate, id, uri, bufferFactory);
7460
}
7561

76-
@Override
77-
public URI getUri() {
78-
return this.uri;
79-
}
8062

8163
protected WebSocketSendProcessor getSendProcessor() {
8264
return this.sendProcessor;

spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public JettyWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpRespons
6363

6464
@OnWebSocketConnect
6565
public void onWebSocketConnect(Session session) {
66-
this.session = new JettyWebSocketSession(session);
66+
this.session = new JettyWebSocketSession(session, getUri(), getBufferFactory());
6767

6868
HandlerResultSubscriber subscriber = new HandlerResultSubscriber();
6969
getDelegate().handle(this.session).subscribe(subscriber);

spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717
package org.springframework.web.reactive.socket.adapter;
1818

1919
import java.io.IOException;
20+
import java.net.URI;
2021
import java.nio.ByteBuffer;
2122
import java.nio.charset.StandardCharsets;
2223

2324
import org.eclipse.jetty.websocket.api.Session;
2425
import org.eclipse.jetty.websocket.api.WriteCallback;
2526
import reactor.core.publisher.Mono;
2627

28+
import org.springframework.core.io.buffer.DataBufferFactory;
2729
import org.springframework.util.ObjectUtils;
2830
import org.springframework.web.reactive.socket.CloseStatus;
2931
import org.springframework.web.reactive.socket.WebSocketMessage;
@@ -38,9 +40,9 @@
3840
*/
3941
public class JettyWebSocketSession extends AbstractListenerWebSocketSession<Session> {
4042

41-
public JettyWebSocketSession(Session session) {
42-
super(session, ObjectUtils.getIdentityHexString(session),
43-
session.getUpgradeRequest().getRequestURI());
43+
44+
public JettyWebSocketSession(Session session, URI uri, DataBufferFactory bufferFactory) {
45+
super(session, ObjectUtils.getIdentityHexString(session), uri, bufferFactory);
4446
}
4547

4648

spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java

Lines changed: 6 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131

3232
import org.springframework.core.io.buffer.NettyDataBuffer;
3333
import org.springframework.core.io.buffer.NettyDataBufferFactory;
34-
import org.springframework.util.Assert;
3534
import org.springframework.util.ObjectUtils;
3635
import org.springframework.web.reactive.socket.WebSocketMessage;
3736
import org.springframework.web.reactive.socket.WebSocketSession;
@@ -55,32 +54,16 @@ public abstract class NettyWebSocketSessionSupport<T> extends WebSocketSessionSu
5554
}
5655

5756

58-
protected final String id;
59-
60-
protected final URI uri;
61-
62-
protected final NettyDataBufferFactory bufferFactory;
63-
64-
65-
protected NettyWebSocketSessionSupport(T delegate, URI uri, NettyDataBufferFactory factory) {
66-
super(delegate);
67-
Assert.notNull(uri, "'uri' is required.");
68-
Assert.notNull(uri, "'bufferFactory' is required.");
69-
this.uri = uri;
70-
this.bufferFactory = factory;
71-
this.id = ObjectUtils.getIdentityHexString(getDelegate());
57+
protected NettyWebSocketSessionSupport(T delegate, URI uri, NettyDataBufferFactory bufferFactory) {
58+
super(delegate, ObjectUtils.getIdentityHexString(delegate), uri, bufferFactory);
7259
}
7360

7461

7562
@Override
76-
public String getId() {
77-
return this.id;
63+
public NettyDataBufferFactory bufferFactory() {
64+
return (NettyDataBufferFactory) super.bufferFactory();
7865
}
7966

80-
@Override
81-
public URI getUri() {
82-
return this.uri;
83-
}
8467

8568
protected Flux<WebSocketMessage> toMessageFlux(Flux<WebSocketFrame> frameFlux) {
8669
return frameFlux
@@ -94,11 +77,11 @@ protected Flux<WebSocketMessage> toMessageFlux(Flux<WebSocketFrame> frameFlux) {
9477
private WebSocketMessage toMessage(List<WebSocketFrame> frames) {
9578
Class<?> frameType = frames.get(0).getClass();
9679
if (frames.size() == 1) {
97-
NettyDataBuffer buffer = this.bufferFactory.wrap(frames.get(0).content());
80+
NettyDataBuffer buffer = bufferFactory().wrap(frames.get(0).content());
9881
return WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer);
9982
}
10083
return frames.stream()
101-
.map(socketFrame -> bufferFactory.wrap(socketFrame.content()))
84+
.map(socketFrame -> bufferFactory().wrap(socketFrame.content()))
10285
.reduce(NettyDataBuffer::write)
10386
.map(buffer -> WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer))
10487
.get();

spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@
4141
public class ReactorNettyWebSocketSession
4242
extends NettyWebSocketSessionSupport<ReactorNettyWebSocketSession.WebSocketConnection> {
4343

44-
protected ReactorNettyWebSocketSession(WebsocketInbound inbound,
45-
WebsocketOutbound outbound,
44+
45+
protected ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound,
4646
URI uri, NettyDataBufferFactory factory) {
4747

4848
super(new WebSocketConnection(inbound, outbound), uri, factory);

spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
*/
4141
public class RxNettyWebSocketSession extends NettyWebSocketSessionSupport<WebSocketConnection> {
4242

43+
4344
public RxNettyWebSocketSession(WebSocketConnection conn, URI uri, NettyDataBufferFactory factory) {
4445
super(conn, uri, factory);
4546
}

spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ private class StandardEndpoint extends Endpoint {
6767

6868
@Override
6969
public void onOpen(Session session, EndpointConfig config) {
70-
TomcatWebSocketHandlerAdapter.this.session = new TomcatWebSocketSession(session);
70+
TomcatWebSocketHandlerAdapter.this.session =
71+
new TomcatWebSocketSession(session, getUri(), getBufferFactory());
7172

7273
session.addMessageHandler(String.class, message -> {
7374
WebSocketMessage webSocketMessage = toMessage(message);

spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.web.reactive.socket.adapter;
1818

1919
import java.io.IOException;
20+
import java.net.URI;
2021
import java.nio.ByteBuffer;
2122
import java.nio.charset.StandardCharsets;
2223
import javax.websocket.CloseReason;
@@ -27,6 +28,7 @@
2728

2829
import reactor.core.publisher.Mono;
2930

31+
import org.springframework.core.io.buffer.DataBufferFactory;
3032
import org.springframework.web.reactive.socket.CloseStatus;
3133
import org.springframework.web.reactive.socket.WebSocketMessage;
3234
import org.springframework.web.reactive.socket.WebSocketSession;
@@ -40,8 +42,9 @@
4042
*/
4143
public class TomcatWebSocketSession extends AbstractListenerWebSocketSession<Session> {
4244

43-
public TomcatWebSocketSession(Session session) {
44-
super(session, session.getId(), session.getRequestURI());
45+
46+
public TomcatWebSocketSession(Session session, URI uri, DataBufferFactory bufferFactory) {
47+
super(session, session.getId(), uri, bufferFactory);
4548
}
4649

4750

spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public UndertowWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResp
5959

6060
@Override
6161
public void onConnect(WebSocketHttpExchange exchange, WebSocketChannel channel) {
62-
this.session = new UndertowWebSocketSession(channel, getUri());
62+
this.session = new UndertowWebSocketSession(channel, getUri(), getBufferFactory());
6363
channel.getReceiveSetter().set(new UndertowReceiveListener());
6464
channel.resumeReceives();
6565

0 commit comments

Comments
 (0)