1+ /*
2+ * Copyright 2002-2016 the original author or authors.
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License");
5+ * you may not use this file except in compliance with the License.
6+ * You may obtain a copy of the License at
7+ *
8+ * http://www.apache.org/licenses/LICENSE-2.0
9+ *
10+ * Unless required by applicable law or agreed to in writing, software
11+ * distributed under the License is distributed on an "AS IS" BASIS,
12+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ * See the License for the specific language governing permissions and
14+ * limitations under the License.
15+ */
16+
17+ package org .springframework .web .reactive .socket .adapter ;
18+
19+ import java .io .IOException ;
20+ import java .net .URI ;
21+ import java .util .concurrent .atomic .AtomicBoolean ;
22+
23+ import org .reactivestreams .Publisher ;
24+ import reactor .core .publisher .Flux ;
25+ import reactor .core .publisher .Mono ;
26+
27+ import org .springframework .http .server .reactive .AbstractListenerReadPublisher ;
28+ import org .springframework .http .server .reactive .AbstractListenerWriteProcessor ;
29+ import org .springframework .util .Assert ;
30+ import org .springframework .web .reactive .socket .CloseStatus ;
31+ import org .springframework .web .reactive .socket .WebSocketMessage ;
32+ import org .springframework .web .reactive .socket .WebSocketMessage .Type ;
33+ import org .springframework .web .reactive .socket .WebSocketSession ;
34+
35+ /**
36+ * Base class for Listener-based {@link WebSocketSession} adapters.
37+ *
38+ * @author Violeta Georgieva
39+ * @author Rossen Stoyanchev
40+ * @since 5.0
41+ */
42+ public abstract class AbstractListenerWebSocketSession <T > extends WebSocketSessionSupport <T > {
43+
44+ /**
45+ * The "back-pressure" buffer size to use if the underlying WebSocket API
46+ * does not have flow control for receiving messages.
47+ */
48+ private static final int RECEIVE_BUFFER_SIZE = 8192 ;
49+
50+
51+ private final String id ;
52+
53+ private final URI uri ;
54+
55+ private final WebSocketReceivePublisher receivePublisher = new WebSocketReceivePublisher ();
56+
57+ private volatile WebSocketSendProcessor sendProcessor ;
58+
59+ private final AtomicBoolean sendCalled = new AtomicBoolean ();
60+
61+
62+ public AbstractListenerWebSocketSession (T delegate , String id , URI uri ) {
63+ super (delegate );
64+ Assert .notNull (id , "'id' is required." );
65+ Assert .notNull (uri , "'uri' is required." );
66+ this .id = id ;
67+ this .uri = uri ;
68+ }
69+
70+
71+ @ Override
72+ public String getId () {
73+ return this .id ;
74+ }
75+
76+ @ Override
77+ public URI getUri () {
78+ return this .uri ;
79+ }
80+
81+ protected WebSocketSendProcessor getSendProcessor () {
82+ return this .sendProcessor ;
83+ }
84+
85+ @ Override
86+ public Flux <WebSocketMessage > receive () {
87+ return canSuspendReceiving () ?
88+ Flux .from (this .receivePublisher ) :
89+ Flux .from (this .receivePublisher ).onBackpressureBuffer (RECEIVE_BUFFER_SIZE );
90+ }
91+
92+ @ Override
93+ public Mono <Void > send (Publisher <WebSocketMessage > messages ) {
94+ if (this .sendCalled .compareAndSet (false , true )) {
95+ this .sendProcessor = new WebSocketSendProcessor ();
96+ return Mono .from (subscriber -> {
97+ messages .subscribe (this .sendProcessor );
98+ this .sendProcessor .subscribe (subscriber );
99+ });
100+ }
101+ else {
102+ return Mono .error (new IllegalStateException ("send() has already been called" ));
103+ }
104+ }
105+
106+ /**
107+ * Whether the underlying WebSocket API has flow control and can suspend and
108+ * resume the receiving of messages.
109+ */
110+ protected abstract boolean canSuspendReceiving ();
111+
112+ /**
113+ * Suspend receiving until received message(s) are processed and more demand
114+ * is generated by the downstream Subscriber.
115+ * <p><strong>Note:</strong> if the underlying WebSocket API does not provide
116+ * flow control for receiving messages, and this method should be a no-op
117+ * and {@link #canSuspendReceiving()} should return {@code false}.
118+ */
119+ protected abstract void suspendReceiving ();
120+
121+ /**
122+ * Resume receiving new message(s) after demand is generated by the
123+ * downstream Subscriber.
124+ * <p><strong>Note:</strong> if the underlying WebSocket API does not provide
125+ * flow control for receiving messages, and this method should be a no-op
126+ * and {@link #canSuspendReceiving()} should return {@code false}.
127+ */
128+ protected abstract void resumeReceiving ();
129+
130+ /**
131+ * Send the given WebSocket message.
132+ */
133+ protected abstract boolean sendMessage (WebSocketMessage message ) throws IOException ;
134+
135+
136+ // WebSocketHandler adapter delegate methods
137+
138+ /** Handle a message callback from the WebSocketHandler adapter */
139+ void handleMessage (Type type , WebSocketMessage message ) {
140+ this .receivePublisher .handleMessage (message );
141+ }
142+
143+ /** Handle an error callback from the WebSocketHandler adapter */
144+ void handleError (Throwable ex ) {
145+ this .receivePublisher .onError (ex );
146+ if (this .sendProcessor != null ) {
147+ this .sendProcessor .cancel ();
148+ this .sendProcessor .onError (ex );
149+ }
150+ }
151+
152+ /** Handle a close callback from the WebSocketHandler adapter */
153+ void handleClose (CloseStatus reason ) {
154+ this .receivePublisher .onAllDataRead ();
155+ if (this .sendProcessor != null ) {
156+ this .sendProcessor .cancel ();
157+ this .sendProcessor .onComplete ();
158+ }
159+ }
160+
161+
162+ private final class WebSocketReceivePublisher extends AbstractListenerReadPublisher <WebSocketMessage > {
163+
164+ private volatile WebSocketMessage webSocketMessage ;
165+
166+ @ Override
167+ protected void checkOnDataAvailable () {
168+ if (this .webSocketMessage != null ) {
169+ onDataAvailable ();
170+ }
171+ }
172+
173+ @ Override
174+ protected WebSocketMessage read () throws IOException {
175+ if (this .webSocketMessage != null ) {
176+ WebSocketMessage result = this .webSocketMessage ;
177+ this .webSocketMessage = null ;
178+ resumeReceiving ();
179+ return result ;
180+ }
181+
182+ return null ;
183+ }
184+
185+ void handleMessage (WebSocketMessage webSocketMessage ) {
186+ this .webSocketMessage = webSocketMessage ;
187+ suspendReceiving ();
188+ onDataAvailable ();
189+ }
190+ }
191+
192+ protected final class WebSocketSendProcessor extends AbstractListenerWriteProcessor <WebSocketMessage > {
193+
194+ private volatile boolean isReady = true ;
195+
196+ @ Override
197+ protected boolean write (WebSocketMessage message ) throws IOException {
198+ return sendMessage (message );
199+ }
200+
201+ @ Override
202+ protected void releaseData () {
203+ if (logger .isTraceEnabled ()) {
204+ logger .trace ("releaseData: " + this .currentData );
205+ }
206+ this .currentData = null ;
207+ }
208+
209+ @ Override
210+ protected boolean isDataEmpty (WebSocketMessage message ) {
211+ return message .getPayload ().readableByteCount () == 0 ;
212+ }
213+
214+ @ Override
215+ protected boolean isWritePossible () {
216+ return this .isReady && this .currentData != null ;
217+ }
218+
219+ /**
220+ * Sub-classes can invoke this before sending a message (false) and
221+ * after receiving the async send callback (true) effective translating
222+ * async completion callback into simple flow control.
223+ */
224+ public void setReadyToSend (boolean ready ) {
225+ this .isReady = ready ;
226+ }
227+ }
228+
229+ }
0 commit comments