Skip to content

Commit 9e20a25

Browse files
wilkinsonarstoyanchev
authored andcommitted
Introduce SubProtocolHandler abstraction
Add SubProtocolHandler to encapsulate the logic for using a sub-protocol. A SubProtocolWebSocketHandler is also provided to delegate to the appropriate SubProtocolHandler based on the negotiated sub-protocol value at handshake. StompSubProtocolHandler provides handling for STOMP messages. Issue: SPR-10786
1 parent e4d83bb commit 9e20a25

File tree

10 files changed

+532
-341
lines changed

10 files changed

+532
-341
lines changed
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright 2002-2013 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.messaging.handler.websocket;
18+
19+
import java.util.List;
20+
21+
import org.springframework.messaging.Message;
22+
import org.springframework.messaging.MessageChannel;
23+
import org.springframework.web.socket.CloseStatus;
24+
import org.springframework.web.socket.WebSocketMessage;
25+
import org.springframework.web.socket.WebSocketSession;
26+
27+
28+
/**
29+
* A contract for handling WebSocket messages as part of a higher level protocol, referred
30+
* to as "sub-protocol" in the WebSocket RFC specification. Handles both
31+
* {@link WebSocketMessage}s from a client as well as {@link Message}s to a client.
32+
* <p>
33+
* Implementations of this interface can be configured on a
34+
* {@link SubProtocolWebSocketHandler} which selects a sub-protocol handler to delegate
35+
* messages to based on the sub-protocol requested by the client through the
36+
* {@code Sec-WebSocket-Protocol} request header.
37+
*
38+
* @author Andy Wilkinson
39+
* @author Rossen Stoyanchev
40+
*
41+
* @since 4.0
42+
*/
43+
public interface SubProtocolHandler {
44+
45+
/**
46+
* Return the list of sub-protocols supported by this handler, never {@code null}.
47+
*/
48+
List<String> getSupportedProtocols();
49+
50+
/**
51+
* Handle the given {@link WebSocketMessage} received from a client.
52+
*
53+
* @param session the client session
54+
* @param message the client message
55+
* @param outputChannel an output channel to send messages to
56+
*/
57+
void handleMessageFromClient(WebSocketSession session, WebSocketMessage message,
58+
MessageChannel outputChannel) throws Exception;
59+
60+
/**
61+
* Handle the given {@link Message} to the client associated with the given WebSocket
62+
* session.
63+
*
64+
* @param session the client session
65+
* @param message the client message
66+
*/
67+
void handleMessageToClient(WebSocketSession session, Message<?> message) throws Exception;
68+
69+
/**
70+
* Resolve the session id from the given message or return {@code null}.
71+
*
72+
* @param the message to resolve the session id from
73+
*/
74+
String resolveSessionId(Message<?> message);
75+
76+
/**
77+
* Invoked after a {@link WebSocketSession} has started.
78+
*
79+
* @param session the client session
80+
* @param outputChannel a channel
81+
*/
82+
void afterSessionStarted(WebSocketSession session, MessageChannel outputChannel) throws Exception;
83+
84+
/**
85+
* Invoked after a {@link WebSocketSession} has ended.
86+
*
87+
* @param session the client session
88+
* @param closeStatus the reason why the session was closed
89+
* @param outputChannel a channel
90+
*/
91+
void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus,
92+
MessageChannel outputChannel) throws Exception;
93+
94+
}
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
/*
2+
* Copyright 2002-2013 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.messaging.handler.websocket;
18+
19+
import java.util.Arrays;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.TreeMap;
23+
import java.util.concurrent.ConcurrentHashMap;
24+
25+
import org.apache.commons.logging.Log;
26+
import org.apache.commons.logging.LogFactory;
27+
import org.springframework.messaging.Message;
28+
import org.springframework.messaging.MessageChannel;
29+
import org.springframework.messaging.MessageHandler;
30+
import org.springframework.messaging.MessagingException;
31+
import org.springframework.util.Assert;
32+
import org.springframework.util.CollectionUtils;
33+
import org.springframework.web.socket.CloseStatus;
34+
import org.springframework.web.socket.WebSocketHandler;
35+
import org.springframework.web.socket.WebSocketMessage;
36+
import org.springframework.web.socket.WebSocketSession;
37+
38+
39+
/**
40+
* A {@link WebSocketHandler} that delegates messages to a {@link SubProtocolHandler}
41+
* based on the sub-protocol value requested by the client through the
42+
* {@code Sec-WebSocket-Protocol} request header A default handler can also be configured
43+
* to use if the client does not request a specific sub-protocol.
44+
*
45+
* @author Rossen Stoyanchev
46+
* @author Andy Wilkinson
47+
*
48+
* @since 4.0
49+
*/
50+
public class SubProtocolWebSocketHandler implements WebSocketHandler, MessageHandler {
51+
52+
private final Log logger = LogFactory.getLog(SubProtocolWebSocketHandler.class);
53+
54+
private final MessageChannel outputChannel;
55+
56+
private final Map<String, SubProtocolHandler> protocolHandlers =
57+
new TreeMap<String, SubProtocolHandler>(String.CASE_INSENSITIVE_ORDER);
58+
59+
private SubProtocolHandler defaultProtocolHandler;
60+
61+
private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<String, WebSocketSession>();
62+
63+
64+
/**
65+
* @param outputChannel
66+
*/
67+
public SubProtocolWebSocketHandler(MessageChannel outputChannel) {
68+
Assert.notNull(outputChannel, "outputChannel is required");
69+
this.outputChannel = outputChannel;
70+
}
71+
72+
/**
73+
* Configure one or more handlers to use depending on the sub-protocol requested by
74+
* the client in the WebSocket handshake request.
75+
*
76+
* @param protocolHandlers the sub-protocol handlers to use
77+
*/
78+
public void setProtocolHandlers(List<SubProtocolHandler> protocolHandlers) {
79+
this.protocolHandlers.clear();
80+
for (SubProtocolHandler handler: protocolHandlers) {
81+
List<String> protocols = handler.getSupportedProtocols();
82+
if (CollectionUtils.isEmpty(protocols)) {
83+
logger.warn("No sub-protocols, ignoring handler " + handler);
84+
continue;
85+
}
86+
for (String protocol: protocols) {
87+
SubProtocolHandler replaced = this.protocolHandlers.put(protocol, handler);
88+
if (replaced != null) {
89+
throw new IllegalStateException("Failed to map handler " + handler
90+
+ " to protocol '" + protocol + "', it is already mapped to handler " + replaced);
91+
}
92+
}
93+
}
94+
if ((this.protocolHandlers.size() == 1) &&(this.defaultProtocolHandler == null)) {
95+
this.defaultProtocolHandler = this.protocolHandlers.values().iterator().next();
96+
}
97+
}
98+
99+
/**
100+
* @return the configured sub-protocol handlers
101+
*/
102+
public Map<String, SubProtocolHandler> getProtocolHandlers() {
103+
return this.protocolHandlers;
104+
}
105+
106+
/**
107+
* Set the {@link SubProtocolHandler} to use when the client did not request a
108+
* sub-protocol.
109+
*
110+
* @param defaultProtocolHandler the default handler
111+
*/
112+
public void setDefaultProtocolHandler(SubProtocolHandler defaultProtocolHandler) {
113+
this.defaultProtocolHandler = defaultProtocolHandler;
114+
if (this.protocolHandlers.isEmpty()) {
115+
setProtocolHandlers(Arrays.asList(defaultProtocolHandler));
116+
}
117+
}
118+
119+
/**
120+
* @return the default sub-protocol handler to use
121+
*/
122+
public SubProtocolHandler getDefaultProtocolHandler() {
123+
return this.defaultProtocolHandler;
124+
}
125+
126+
127+
@Override
128+
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
129+
this.sessions.put(session.getId(), session);
130+
getProtocolHandler(session).afterSessionStarted(session, this.outputChannel);
131+
}
132+
133+
protected final SubProtocolHandler getProtocolHandler(WebSocketSession session) {
134+
SubProtocolHandler handler;
135+
String protocol = session.getAcceptedProtocol();
136+
if (protocol != null) {
137+
handler = this.protocolHandlers.get(protocol);
138+
Assert.state(handler != null,
139+
"No handler for sub-protocol '" + protocol + "', handlers=" + this.protocolHandlers);
140+
}
141+
else {
142+
handler = this.defaultProtocolHandler;
143+
Assert.state(handler != null,
144+
"No sub-protocol was requested and a default sub-protocol handler was not configured");
145+
}
146+
return handler;
147+
}
148+
149+
@Override
150+
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
151+
getProtocolHandler(session).handleMessageFromClient(session, message, this.outputChannel);
152+
}
153+
154+
@Override
155+
public void handleMessage(Message<?> message) throws MessagingException {
156+
157+
String sessionId = resolveSessionId(message);
158+
if (sessionId == null) {
159+
logger.error("sessionId not found in message " + message);
160+
return;
161+
}
162+
163+
WebSocketSession session = this.sessions.get(sessionId);
164+
if (session == null) {
165+
logger.error("Session not found for session with id " + sessionId);
166+
return;
167+
}
168+
169+
try {
170+
getProtocolHandler(session).handleMessageToClient(session, message);
171+
}
172+
catch (Exception e) {
173+
logger.error("Failed to send message to client " + message, e);
174+
}
175+
}
176+
177+
private String resolveSessionId(Message<?> message) {
178+
for (SubProtocolHandler handler : this.protocolHandlers.values()) {
179+
String sessionId = handler.resolveSessionId(message);
180+
if (sessionId != null) {
181+
return sessionId;
182+
}
183+
}
184+
if (this.defaultProtocolHandler != null) {
185+
String sessionId = this.defaultProtocolHandler.resolveSessionId(message);
186+
if (sessionId != null) {
187+
return sessionId;
188+
}
189+
}
190+
return null;
191+
}
192+
193+
@Override
194+
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
195+
}
196+
197+
@Override
198+
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
199+
this.sessions.remove(session.getId());
200+
getProtocolHandler(session).afterSessionEnded(session, closeStatus, this.outputChannel);
201+
}
202+
203+
@Override
204+
public boolean supportsPartialMessages() {
205+
return false;
206+
}
207+
208+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
/**
2-
* Generic support for simple messaging protocols (like STOMP).
2+
* Generic support for SImple Messaging Protocols such as STOMP.
33
*/
44
package org.springframework.messaging.simp;

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ public void start() {
197197

198198
/**
199199
* Open a "system" session for sending messages from parts of the application
200-
* not assoicated with a client STOMP session.
200+
* not associated with a client STOMP session.
201201
*/
202202
private void openSystemSession() {
203203

@@ -449,12 +449,21 @@ public void forward(Message<?> message) {
449449
}
450450
}
451451

452-
private boolean forwardInternal(Message<?> message, TcpConnection<String, String> connection) {
452+
private boolean forwardInternal(final Message<?> message, TcpConnection<String, String> connection) {
453453
if (logger.isTraceEnabled()) {
454454
logger.trace("Forwarding message to STOMP broker, message id=" + message.getHeaders().getId());
455455
}
456456
byte[] bytes = stompMessageConverter.fromMessage(message);
457-
connection.send(new String(bytes, Charset.forName("UTF-8")));
457+
connection.send(new String(bytes, Charset.forName("UTF-8")), new Consumer<Boolean>() {
458+
@Override
459+
public void accept(Boolean success) {
460+
if (!success) {
461+
String sessionId = StompHeaderAccessor.wrap(message).getSessionId();
462+
relaySessions.remove(sessionId);
463+
sendError(sessionId, "Failed to relay message to broker");
464+
}
465+
}
466+
});
458467

459468
// TODO: detect if send fails and send ERROR downstream (except on DISCONNECT)
460469
return true;

0 commit comments

Comments
 (0)