Skip to content

Commit edcf049

Browse files
committed
Refactoring in reactive WebSocketMessage
Move WebSocketMessage factory methods to the WebSocketSession which has the bufferFactory() needed to create message payloads. WebSocketMessage is left with one public constructor. WebSocketMessage exposes convenience retain/releasePayload methods.
1 parent 6cd92c6 commit edcf049

File tree

7 files changed

+114
-49
lines changed

7 files changed

+114
-49
lines changed

spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketMessage.java

Lines changed: 49 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515
*/
1616
package org.springframework.web.reactive.socket;
1717

18+
import java.nio.charset.StandardCharsets;
19+
1820
import org.springframework.core.io.buffer.DataBuffer;
21+
import org.springframework.core.io.buffer.DataBufferUtils;
1922
import org.springframework.util.Assert;
2023
import org.springframework.util.ObjectUtils;
2124

@@ -34,9 +37,17 @@ public class WebSocketMessage {
3437

3538

3639
/**
37-
* Private constructor. See static factory methods.
40+
* Constructor for a WebSocketMessage. To create, see factory methods:
41+
* <ul>
42+
* <li>{@link WebSocketSession#textMessage}
43+
* <li>{@link WebSocketSession#binaryMessage}
44+
* <li>{@link WebSocketSession#pingMessage}
45+
* <li>{@link WebSocketSession#pongMessage}
46+
* </ul>
47+
* <p>Alternatively use {@link WebSocketSession#bufferFactory()} to create
48+
* the payload and then invoke this constructor.
3849
*/
39-
private WebSocketMessage(Type type, DataBuffer payload) {
50+
public WebSocketMessage(Type type, DataBuffer payload) {
4051
Assert.notNull(type, "'type' must not be null");
4152
Assert.notNull(payload, "'payload' must not be null");
4253
this.type = type;
@@ -58,6 +69,42 @@ public DataBuffer getPayload() {
5869
return this.payload;
5970
}
6071

72+
/**
73+
* Return the message payload as UTF-8 text. This is a useful for text
74+
* WebSocket messages.
75+
*/
76+
public String getPayloadAsText() {
77+
byte[] bytes = new byte[this.payload.readableByteCount()];
78+
this.payload.read(bytes);
79+
return new String(bytes, StandardCharsets.UTF_8);
80+
}
81+
82+
/**
83+
* Retain the data buffer for the message payload, which is useful on
84+
* runtimes with pooled buffers, e.g. Netty. A shortcut for:
85+
* <pre>
86+
* DataBuffer payload = message.getPayload();
87+
* DataBufferUtils.retain(payload);
88+
* </pre>
89+
* @see DataBufferUtils#retain(DataBuffer)
90+
*/
91+
public void retainPayload() {
92+
DataBufferUtils.retain(this.payload);
93+
}
94+
95+
/**
96+
* Release the data buffer for the message payload, which is useful on
97+
* runtimes with pooled buffers, e.g. Netty. This is a shortcut for:
98+
* <pre>
99+
* DataBuffer payload = message.getPayload();
100+
* DataBufferUtils.release(payload);
101+
* </pre>
102+
* @see DataBufferUtils#release(DataBuffer)
103+
*/
104+
public void releasePayload() {
105+
DataBufferUtils.release(this.payload);
106+
}
107+
61108

62109
@Override
63110
public boolean equals(Object other) {
@@ -78,42 +125,6 @@ public int hashCode() {
78125
}
79126

80127

81-
/**
82-
* Factory method to create a text WebSocket message.
83-
*/
84-
public static WebSocketMessage text(DataBuffer payload) {
85-
return create(Type.TEXT, payload);
86-
}
87-
88-
/**
89-
* Factory method to create a binary WebSocket message.
90-
*/
91-
public static WebSocketMessage binary(DataBuffer payload) {
92-
return create(Type.BINARY, payload);
93-
}
94-
95-
/**
96-
* Factory method to create a ping WebSocket message.
97-
*/
98-
public static WebSocketMessage ping(DataBuffer payload) {
99-
return create(Type.PING, payload);
100-
}
101-
102-
/**
103-
* Factory method to create a pong WebSocket message.
104-
*/
105-
public static WebSocketMessage pong(DataBuffer payload) {
106-
return create(Type.PONG, payload);
107-
}
108-
109-
/**
110-
* Factory method to create a WebSocket message of the given type.
111-
*/
112-
public static WebSocketMessage create(Type type, DataBuffer payload) {
113-
return new WebSocketMessage(type, payload);
114-
}
115-
116-
117128
/**
118129
* WebSocket message types.
119130
*/

spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@
1616
package org.springframework.web.reactive.socket;
1717

1818
import java.net.URI;
19+
import java.util.function.Function;
1920

2021
import org.reactivestreams.Publisher;
2122
import reactor.core.publisher.Flux;
2223
import reactor.core.publisher.Mono;
2324

25+
import org.springframework.core.io.buffer.DataBuffer;
2426
import org.springframework.core.io.buffer.DataBufferFactory;
2527

2628
/**
@@ -58,6 +60,30 @@ public interface WebSocketSession {
5860
*/
5961
Mono<Void> send(Publisher<WebSocketMessage> messages);
6062

63+
/**
64+
* Factory method to create a text {@link WebSocketMessage} using the
65+
* {@link #bufferFactory()} for the session.
66+
*/
67+
WebSocketMessage textMessage(String payload);
68+
69+
/**
70+
* Factory method to create a binary WebSocketMessage using the
71+
* {@link #bufferFactory()} for the session.
72+
*/
73+
WebSocketMessage binaryMessage(Function<DataBufferFactory, DataBuffer> payloadFactory);
74+
75+
/**
76+
* Factory method to create a ping WebSocketMessage using the
77+
* {@link #bufferFactory()} for the session.
78+
*/
79+
WebSocketMessage pingMessage(Function<DataBufferFactory, DataBuffer> payloadFactory);
80+
81+
/**
82+
* Factory method to create a pong WebSocketMessage using the
83+
* {@link #bufferFactory()} for the session.
84+
*/
85+
WebSocketMessage pongMessage(Function<DataBufferFactory, DataBuffer> payloadFactory);
86+
6187
/**
6288
* Close the WebSocket session with {@link CloseStatus#NORMAL}.
6389
*/

spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,15 +101,15 @@ private <T> WebSocketMessage toMessage(Type type, T message) {
101101
if (Type.TEXT.equals(type)) {
102102
byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8);
103103
DataBuffer buffer = getBufferFactory().wrap(bytes);
104-
return WebSocketMessage.create(Type.TEXT, buffer);
104+
return new WebSocketMessage(Type.TEXT, buffer);
105105
}
106106
else if (Type.BINARY.equals(type)) {
107107
DataBuffer buffer = getBufferFactory().wrap((ByteBuffer) message);
108-
return WebSocketMessage.create(Type.BINARY, buffer);
108+
return new WebSocketMessage(Type.BINARY, buffer);
109109
}
110110
else if (Type.PONG.equals(type)) {
111111
DataBuffer buffer = getBufferFactory().wrap((ByteBuffer) message);
112-
return WebSocketMessage.create(Type.PONG, buffer);
112+
return new WebSocketMessage(Type.PONG, buffer);
113113
}
114114
else {
115115
throw new IllegalArgumentException("Unexpected message type: " + message);

spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,12 @@ private WebSocketMessage toMessage(List<WebSocketFrame> frames) {
7878
Class<?> frameType = frames.get(0).getClass();
7979
if (frames.size() == 1) {
8080
NettyDataBuffer buffer = bufferFactory().wrap(frames.get(0).content());
81-
return WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer);
81+
return new WebSocketMessage(MESSAGE_TYPES.get(frameType), buffer);
8282
}
8383
return frames.stream()
8484
.map(socketFrame -> bufferFactory().wrap(socketFrame.content()))
8585
.reduce(NettyDataBuffer::write)
86-
.map(buffer -> WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer))
86+
.map(buffer -> new WebSocketMessage(MESSAGE_TYPES.get(frameType), buffer))
8787
.get();
8888
}
8989

spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,15 +90,15 @@ public void onOpen(Session session, EndpointConfig config) {
9090
private <T> WebSocketMessage toMessage(T message) {
9191
if (message instanceof String) {
9292
byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8);
93-
return WebSocketMessage.create(Type.TEXT, getBufferFactory().wrap(bytes));
93+
return new WebSocketMessage(Type.TEXT, getBufferFactory().wrap(bytes));
9494
}
9595
else if (message instanceof ByteBuffer) {
9696
DataBuffer buffer = getBufferFactory().wrap((ByteBuffer) message);
97-
return WebSocketMessage.create(Type.BINARY, buffer);
97+
return new WebSocketMessage(Type.BINARY, buffer);
9898
}
9999
else if (message instanceof PongMessage) {
100100
DataBuffer buffer = getBufferFactory().wrap(((PongMessage) message).getApplicationData());
101-
return WebSocketMessage.create(Type.PONG, buffer);
101+
return new WebSocketMessage(Type.PONG, buffer);
102102
}
103103
else {
104104
throw new IllegalArgumentException("Unexpected message type: " + message);

spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,15 +102,15 @@ protected void onError(WebSocketChannel channel, Throwable error) {
102102
private <T> WebSocketMessage toMessage(Type type, T message) {
103103
if (Type.TEXT.equals(type)) {
104104
byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8);
105-
return WebSocketMessage.create(Type.TEXT, getBufferFactory().wrap(bytes));
105+
return new WebSocketMessage(Type.TEXT, getBufferFactory().wrap(bytes));
106106
}
107107
else if (Type.BINARY.equals(type)) {
108108
DataBuffer buffer = getBufferFactory().allocateBuffer().write((ByteBuffer[]) message);
109-
return WebSocketMessage.create(Type.BINARY, buffer);
109+
return new WebSocketMessage(Type.BINARY, buffer);
110110
}
111111
else if (Type.PONG.equals(type)) {
112112
DataBuffer buffer = getBufferFactory().allocateBuffer().write((ByteBuffer[]) message);
113-
return WebSocketMessage.create(Type.PONG, buffer);
113+
return new WebSocketMessage(Type.PONG, buffer);
114114
}
115115
else {
116116
throw new IllegalArgumentException("Unexpected message type: " + message);

spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,18 @@
1717
package org.springframework.web.reactive.socket.adapter;
1818

1919
import java.net.URI;
20+
import java.nio.charset.StandardCharsets;
21+
import java.util.function.Function;
2022

2123
import org.apache.commons.logging.Log;
2224
import org.apache.commons.logging.LogFactory;
2325
import reactor.core.publisher.Mono;
2426

27+
import org.springframework.core.io.buffer.DataBuffer;
2528
import org.springframework.core.io.buffer.DataBufferFactory;
2629
import org.springframework.util.Assert;
2730
import org.springframework.web.reactive.socket.CloseStatus;
31+
import org.springframework.web.reactive.socket.WebSocketMessage;
2832
import org.springframework.web.reactive.socket.WebSocketSession;
2933

3034
/**
@@ -87,6 +91,30 @@ public DataBufferFactory bufferFactory() {
8791
return this.bufferFactory;
8892
}
8993

94+
@Override
95+
public WebSocketMessage textMessage(String payload) {
96+
byte[] bytes = payload.getBytes(StandardCharsets.UTF_8);
97+
DataBuffer buffer = bufferFactory().wrap(bytes);
98+
return new WebSocketMessage(WebSocketMessage.Type.TEXT, buffer);
99+
}
100+
101+
@Override
102+
public WebSocketMessage binaryMessage(Function<DataBufferFactory, DataBuffer> payloadFactory) {
103+
DataBuffer payload = payloadFactory.apply(bufferFactory());
104+
return new WebSocketMessage(WebSocketMessage.Type.BINARY, payload);
105+
}
106+
107+
@Override
108+
public WebSocketMessage pingMessage(Function<DataBufferFactory, DataBuffer> payloadFactory) {
109+
DataBuffer payload = payloadFactory.apply(bufferFactory());
110+
return new WebSocketMessage(WebSocketMessage.Type.PING, payload);
111+
}
112+
113+
@Override
114+
public WebSocketMessage pongMessage(Function<DataBufferFactory, DataBuffer> payloadFactory) {
115+
DataBuffer payload = payloadFactory.apply(bufferFactory());
116+
return new WebSocketMessage(WebSocketMessage.Type.PONG, payload);
117+
}
90118

91119
@Override
92120
public final Mono<Void> close(CloseStatus status) {

0 commit comments

Comments
 (0)