Skip to content

Commit 47e1416

Browse files
committed
Minor refactoring + polish reactive WebSocket support
Rename classes not specific to Tomcat: TomcatWebSocketSession -> StandardWebSocketSession TomcatWebSocketHandlerAdapter -> StandardWebSocketHandlerAdapter WebSocketSessionSupport is renamed to AbstractWebSocketSession since it actually is a WebSocketSession and pre-implements a number of methods. ServerEndpointRegistration is now package private (mainly for use in upgrade strategies) and renamed to DefaultServerEndpointConfig.
1 parent 5fd600d commit 47e1416

32 files changed

+267
-270
lines changed

spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/CloseStatus.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,10 @@
2323
* Representation of WebSocket "close" status codes and reasons. Status codes
2424
* in the 1xxx range are pre-defined by the protocol.
2525
*
26-
* <p>See <a href="https://tools.ietf.org/html/rfc6455#section-7.4.1">
27-
* RFC 6455, Section 7.4.1 "Defined Status Codes"</a>.
28-
*
2926
* @author Rossen Stoyanchev
3027
* @since 5.0
28+
* @see <a href="https://tools.ietf.org/html/rfc6455#section-7.4.1">
29+
* RFC 6455, Section 7.4.1 "Defined Status Codes"</a>
3130
*/
3231
public final class CloseStatus {
3332

spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/HandshakeInfo.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,42 +24,54 @@
2424
import org.springframework.util.Assert;
2525

2626
/**
27-
* Simple container of information from a WebSocket handshake request.
27+
* Simple container of information related to the handshake request that started
28+
* the {@link WebSocketSession} session.
2829
*
2930
* @author Rossen Stoyanchev
3031
* @since 5.0
32+
* @see WebSocketSession#getHandshakeInfo()
3133
*/
3234
public class HandshakeInfo {
3335

3436
private final URI uri;
3537

3638
private final HttpHeaders headers;
3739

38-
private final Mono<Principal> principal;
40+
private final Mono<Principal> principalMono;
3941

4042

41-
public HandshakeInfo(URI uri, HttpHeaders headers, Mono<Principal> principal) {
43+
public HandshakeInfo(URI uri, HttpHeaders headers, Mono<Principal> principalMono) {
4244
Assert.notNull(uri, "URI is required.");
4345
Assert.notNull(headers, "HttpHeaders are required.");
44-
Assert.notNull(principal, "Prinicpal is required.");
46+
Assert.notNull(principalMono, "Principal is required.");
4547
this.uri = uri;
4648
this.headers = headers;
47-
this.principal = principal;
49+
this.principalMono = principalMono;
4850
}
4951

5052

53+
/**
54+
* Return the URL for the WebSocket endpoint.
55+
*/
5156
public URI getUri() {
5257
return this.uri;
5358
}
5459

60+
/**
61+
* Return the headers from the handshake HTTP request.
62+
*/
5563
public HttpHeaders getHeaders() {
5664
return this.headers;
5765
}
5866

67+
/**
68+
* Return the principal associated with the handshake HTTP request, if any.
69+
*/
5970
public Mono<Principal> getPrincipal() {
60-
return this.principal;
71+
return this.principalMono;
6172
}
6273

74+
6375
@Override
6476
public String toString() {
6577
return "HandshakeInfo[uri=" + this.uri + ", headers=" + this.headers + "]";

spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketHandler.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import reactor.core.publisher.Mono;
2222

2323
/**
24-
* Handler for a WebSocket-style session interaction.
24+
* Handler for a WebSocket session.
2525
*
2626
* @author Rossen Stoyanchev
2727
* @since 5.0
@@ -37,9 +37,10 @@ default List<String> getSubProtocols() {
3737
}
3838

3939
/**
40-
* Handle the given WebSocket session.
41-
* @param session the session
42-
* @return signals completion for session handling
40+
* Handle the WebSocket session.
41+
* @param session the session to handle
42+
* @return completion {@code Mono<Void>} to indicate the outcome of the
43+
* WebSocket session handling.
4344
*/
4445
Mono<Void> handle(WebSocketSession session);
4546

spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketMessage.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424

2525
/**
2626
* Representation of a WebSocket message.
27-
* Use one of the static factory methods in this class to create a message.
27+
* <p>See static factory methods in {@link WebSocketSession} for creating messages
28+
* with the {@link org.springframework.core.io.buffer.DataBufferFactory
29+
* DataBufferFactory} for the session.
2830
*
2931
* @author Rossen Stoyanchev
3032
* @since 5.0
@@ -37,15 +39,10 @@ public class WebSocketMessage {
3739

3840

3941
/**
40-
* Constructor for a WebSocketMessage. To create, see factory methods:
41-
* <ul>
42-
* <li>{@link WebSocketSession#textMessage}
43-
* <li>{@link WebSocketSession#binaryMessage}
44-
* <li>{@link WebSocketSession#pingMessage}
45-
* <li>{@link WebSocketSession#pongMessage}
46-
* </ul>
47-
* <p>Alternatively use {@link WebSocketSession#bufferFactory()} to create
48-
* the payload and then invoke this constructor.
42+
* Constructor for a WebSocketMessage.
43+
* <p>See static factory methods in {@link WebSocketSession} or alternatively
44+
* use {@link WebSocketSession#bufferFactory()} to create the payload and
45+
* then invoke this constructor.
4946
*/
5047
public WebSocketMessage(Type type, DataBuffer payload) {
5148
Assert.notNull(type, "'type' must not be null");
@@ -81,7 +78,7 @@ public String getPayloadAsText() {
8178

8279
/**
8380
* Retain the data buffer for the message payload, which is useful on
84-
* runtimes with pooled buffers, e.g. Netty. A shortcut for:
81+
* runtimes (e.g. Netty) with pooled buffers. A shortcut for:
8582
* <pre>
8683
* DataBuffer payload = message.getPayload();
8784
* DataBufferUtils.retain(payload);
@@ -94,8 +91,8 @@ public WebSocketMessage retain() {
9491
}
9592

9693
/**
97-
* Release the payload {@code DataBuffer} which is useful on runtimes with
98-
* pooled buffers such as Netty. Effectively a shortcut for:
94+
* Release the payload {@code DataBuffer} which is useful on runtimes
95+
* (e.g. Netty) with pooled buffers such as Netty. A shortcut for:
9996
* <pre>
10097
* DataBuffer payload = message.getPayload();
10198
* DataBufferUtils.release(payload);

spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,15 @@
2525
import org.springframework.core.io.buffer.DataBufferFactory;
2626

2727
/**
28-
* Representation for a WebSocket session.
28+
* Represents a WebSocket session with Reactive Streams input and output.
29+
*
30+
* <p>On the server side a WebSocket session can be handled by mapping
31+
* requests to a {@link WebSocketHandler} and ensuring there is a
32+
* {@link org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter
33+
* WebSocketHandlerAdapter} strategy registered in Spring configuration.
34+
* On the client side a {@link WebSocketHandler} can be provided to a
35+
* {@link org.springframework.web.reactive.socket.client.WebSocketClient
36+
* WebSocketClient}.
2937
*
3038
* @author Rossen Stoyanchev
3139
* @since 5.0

spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,15 @@
3333
import org.springframework.web.reactive.socket.WebSocketSession;
3434

3535
/**
36-
* Base class for Listener-based {@link WebSocketSession} adapters.
36+
* Base class for {@link WebSocketSession} implementations that bridge between
37+
* event-listener WebSocket APIs (e.g. Java WebSocket API JSR-356, Jetty,
38+
* Undertow) and Reactive Streams.
3739
*
3840
* @author Violeta Georgieva
3941
* @author Rossen Stoyanchev
4042
* @since 5.0
4143
*/
42-
public abstract class AbstractListenerWebSocketSession<T> extends WebSocketSessionSupport<T> {
44+
public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSocketSession<T> {
4345

4446
/**
4547
* The "back-pressure" buffer size to use if the underlying WebSocket API
Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,20 +28,19 @@
2828
import org.springframework.core.io.buffer.DataBuffer;
2929
import org.springframework.core.io.buffer.DataBufferFactory;
3030
import org.springframework.util.Assert;
31-
import org.springframework.web.reactive.socket.CloseStatus;
3231
import org.springframework.web.reactive.socket.HandshakeInfo;
3332
import org.springframework.web.reactive.socket.WebSocketMessage;
3433
import org.springframework.web.reactive.socket.WebSocketSession;
3534

3635
/**
37-
* Base class for {@link WebSocketSession} implementations wrapping and
38-
* delegating to the native WebSocket session (or connection) of the underlying
39-
* WebSocket runtime.
36+
* Convenient base class for {@link WebSocketSession} implementations that
37+
* holds common fields and exposes accessors. Also implements the
38+
* {@code WebSocketMessage} factory methods.
4039
*
4140
* @author Rossen Stoyanchev
4241
* @since 5.0
4342
*/
44-
public abstract class WebSocketSessionSupport<T> implements WebSocketSession {
43+
public abstract class AbstractWebSocketSession<T> implements WebSocketSession {
4544

4645
protected final Log logger = LogFactory.getLog(getClass());
4746

@@ -58,7 +57,7 @@ public abstract class WebSocketSessionSupport<T> implements WebSocketSession {
5857
/**
5958
* Create a new instance and associate the given attributes with it.
6059
*/
61-
protected WebSocketSessionSupport(T delegate, String id, HandshakeInfo handshakeInfo,
60+
protected AbstractWebSocketSession(T delegate, String id, HandshakeInfo handshakeInfo,
6261
DataBufferFactory bufferFactory) {
6362

6463
Assert.notNull(delegate, "Native session is required.");
@@ -73,10 +72,7 @@ protected WebSocketSessionSupport(T delegate, String id, HandshakeInfo handshake
7372
}
7473

7574

76-
/**
77-
* Return the native session of the underlying runtime.
78-
*/
79-
public T getDelegate() {
75+
protected T getDelegate() {
8076
return this.delegate;
8177
}
8278

@@ -105,6 +101,9 @@ public DataBufferFactory bufferFactory() {
105101
return this.bufferFactory;
106102
}
107103

104+
105+
// WebSocketMessage factory methods
106+
108107
@Override
109108
public WebSocketMessage textMessage(String payload) {
110109
byte[] bytes = payload.getBytes(StandardCharsets.UTF_8);
@@ -130,16 +129,6 @@ public WebSocketMessage pongMessage(Function<DataBufferFactory, DataBuffer> payl
130129
return new WebSocketMessage(WebSocketMessage.Type.PONG, payload);
131130
}
132131

133-
@Override
134-
public final Mono<Void> close(CloseStatus status) {
135-
if (logger.isDebugEnabled()) {
136-
logger.debug("Closing " + this);
137-
}
138-
return closeInternal(status);
139-
}
140-
141-
protected abstract Mono<Void> closeInternal(CloseStatus status);
142-
143132

144133
@Override
145134
public String toString() {

spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -40,74 +40,75 @@
4040
import org.springframework.web.reactive.socket.WebSocketMessage.Type;
4141

4242
/**
43-
* Jetty {@code WebSocketHandler} implementation adapting and
44-
* delegating to a Spring {@link WebSocketHandler}.
43+
* Jetty {@link WebSocket @WebSocket} handler that delegates events to a
44+
* reactive {@link WebSocketHandler} and its session.
4545
*
4646
* @author Violeta Georgieva
47+
* @author Rossen Stoyanchev
4748
* @since 5.0
4849
*/
4950
@WebSocket
5051
public class JettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport {
5152

5253
private static final ByteBuffer EMPTY_PAYLOAD = ByteBuffer.wrap(new byte[0]);
5354

54-
private JettyWebSocketSession session;
55+
private JettyWebSocketSession delegateSession;
5556

5657

57-
public JettyWebSocketHandlerAdapter(HandshakeInfo handshakeInfo, DataBufferFactory bufferFactory,
58-
WebSocketHandler delegate) {
58+
public JettyWebSocketHandlerAdapter(WebSocketHandler delegate, HandshakeInfo info,
59+
DataBufferFactory bufferFactory) {
5960

60-
super(handshakeInfo, bufferFactory, delegate);
61+
super(delegate, info, bufferFactory);
6162
}
6263

6364

6465
@OnWebSocketConnect
6566
public void onWebSocketConnect(Session session) {
66-
this.session = new JettyWebSocketSession(session, getHandshakeInfo(), getBufferFactory());
67+
this.delegateSession = new JettyWebSocketSession(session, getHandshakeInfo(), bufferFactory());
6768
HandlerResultSubscriber subscriber = new HandlerResultSubscriber();
68-
getDelegate().handle(this.session).subscribe(subscriber);
69+
getDelegate().handle(this.delegateSession).subscribe(subscriber);
6970
}
7071

7172
@OnWebSocketMessage
7273
public void onWebSocketText(String message) {
73-
if (this.session != null) {
74+
if (this.delegateSession != null) {
7475
WebSocketMessage webSocketMessage = toMessage(Type.TEXT, message);
75-
this.session.handleMessage(webSocketMessage.getType(), webSocketMessage);
76+
this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage);
7677
}
7778
}
7879

7980
@OnWebSocketMessage
8081
public void onWebSocketBinary(byte[] message, int offset, int length) {
81-
if (this.session != null) {
82+
if (this.delegateSession != null) {
8283
ByteBuffer buffer = ByteBuffer.wrap(message, offset, length);
8384
WebSocketMessage webSocketMessage = toMessage(Type.BINARY, buffer);
84-
session.handleMessage(webSocketMessage.getType(), webSocketMessage);
85+
delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage);
8586
}
8687
}
8788

8889
@OnWebSocketFrame
8990
public void onWebSocketFrame(Frame frame) {
90-
if (this.session != null) {
91+
if (this.delegateSession != null) {
9192
if (OpCode.PONG == frame.getOpCode()) {
9293
ByteBuffer buffer = (frame.getPayload() != null ? frame.getPayload() : EMPTY_PAYLOAD);
9394
WebSocketMessage webSocketMessage = toMessage(Type.PONG, buffer);
94-
session.handleMessage(webSocketMessage.getType(), webSocketMessage);
95+
delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage);
9596
}
9697
}
9798
}
9899

99100
private <T> WebSocketMessage toMessage(Type type, T message) {
100101
if (Type.TEXT.equals(type)) {
101102
byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8);
102-
DataBuffer buffer = getBufferFactory().wrap(bytes);
103+
DataBuffer buffer = bufferFactory().wrap(bytes);
103104
return new WebSocketMessage(Type.TEXT, buffer);
104105
}
105106
else if (Type.BINARY.equals(type)) {
106-
DataBuffer buffer = getBufferFactory().wrap((ByteBuffer) message);
107+
DataBuffer buffer = bufferFactory().wrap((ByteBuffer) message);
107108
return new WebSocketMessage(Type.BINARY, buffer);
108109
}
109110
else if (Type.PONG.equals(type)) {
110-
DataBuffer buffer = getBufferFactory().wrap((ByteBuffer) message);
111+
DataBuffer buffer = bufferFactory().wrap((ByteBuffer) message);
111112
return new WebSocketMessage(Type.PONG, buffer);
112113
}
113114
else {
@@ -117,15 +118,15 @@ else if (Type.PONG.equals(type)) {
117118

118119
@OnWebSocketClose
119120
public void onWebSocketClose(int statusCode, String reason) {
120-
if (this.session != null) {
121-
this.session.handleClose(new CloseStatus(statusCode, reason));
121+
if (this.delegateSession != null) {
122+
this.delegateSession.handleClose(new CloseStatus(statusCode, reason));
122123
}
123124
}
124125

125126
@OnWebSocketError
126127
public void onWebSocketError(Throwable cause) {
127-
if (this.session != null) {
128-
this.session.handleError(cause);
128+
if (this.delegateSession != null) {
129+
this.delegateSession.handleError(cause);
129130
}
130131
}
131132

@@ -144,15 +145,16 @@ public void onNext(Void aVoid) {
144145

145146
@Override
146147
public void onError(Throwable ex) {
147-
if (session != null) {
148-
session.close(new CloseStatus(CloseStatus.SERVER_ERROR.getCode(), ex.getMessage()));
148+
if (delegateSession != null) {
149+
int code = CloseStatus.SERVER_ERROR.getCode();
150+
delegateSession.close(new CloseStatus(code, ex.getMessage()));
149151
}
150152
}
151153

152154
@Override
153155
public void onComplete() {
154-
if (session != null) {
155-
session.close();
156+
if (delegateSession != null) {
157+
delegateSession.close();
156158
}
157159
}
158160
}

0 commit comments

Comments
 (0)