Skip to content

Commit d41d3aa

Browse files
committed
Merge pull request #1250 from origin/update-stomp-reactor-netty
2 parents 845dbf0 + b874692 commit d41d3aa

15 files changed

+622
-807
lines changed

build.gradle

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ configure(allprojects) { project ->
7878
ext.protobufVersion = "3.1.0"
7979
ext.quartzVersion = "2.2.3"
8080
ext.reactivestreamsVersion = "1.0.0"
81-
ext.reactorVersion = "2.0.8.RELEASE"
8281
ext.reactorCoreVersion = '3.0.3.RELEASE'
8382
ext.reactorNettyVersion = '0.6.0.BUILD-SNAPSHOT'
8483
ext.romeVersion = "1.7.0"
@@ -578,12 +577,8 @@ project("spring-messaging") {
578577
compile(project(":spring-core"))
579578
compile(project(":spring-context"))
580579
optional(project(":spring-oxm"))
581-
optional("io.projectreactor:reactor-core:${reactorVersion}") {
582-
force = true // enforce 2.0.x
583-
}
584-
optional("io.projectreactor:reactor-net:${reactorVersion}") {
585-
exclude group: "io.netty", module: "netty-all"
586-
}
580+
optional("io.projectreactor:reactor-core:${reactorCoreVersion}")
581+
optional("io.projectreactor.ipc:reactor-netty:${reactorNettyVersion}")
587582
optional("io.netty:netty-all:${nettyVersion}")
588583
optional("org.eclipse.jetty.websocket:websocket-server:${jettyVersion}") {
589584
exclude group: "javax.servlet", module: "javax.servlet-api"
@@ -1003,10 +998,8 @@ project("spring-websocket") {
1003998
optional("com.fasterxml.jackson.core:jackson-databind:${jackson2Version}")
1004999
testCompile("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}")
10051000
testCompile("org.apache.tomcat.embed:tomcat-embed-websocket:${tomcatVersion}")
1006-
testCompile("io.projectreactor:reactor-core:${reactorVersion}") {
1007-
force = true // enforce 2.0.x
1008-
}
1009-
testCompile("io.projectreactor:reactor-net:${reactorVersion}")
1001+
testCompile("io.projectreactor:reactor-core:${reactorCoreVersion}")
1002+
testCompile("io.projectreactor.ipc:reactor-netty:${reactorNettyVersion}")
10101003
testCompile("io.netty:netty-all:${nettyVersion}")
10111004
testCompile("org.slf4j:slf4j-jcl:${slf4jVersion}")
10121005
testRuntime("org.jboss.xnio:xnio-nio:${xnioVersion}")

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2StompCodec.java

Lines changed: 0 additions & 107 deletions
This file was deleted.

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClient.java

Lines changed: 0 additions & 150 deletions
This file was deleted.
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright 2002-2016 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.messaging.simp.stomp;
18+
19+
import org.springframework.messaging.tcp.TcpOperations;
20+
import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient;
21+
import org.springframework.util.Assert;
22+
import org.springframework.util.concurrent.ListenableFuture;
23+
24+
/**
25+
* A STOMP over TCP client that uses {@link ReactorNettyTcpClient}.
26+
*
27+
* @author Rossen Stoyanchev
28+
* @since 5.0
29+
*/
30+
public class ReactorNettyTcpStompClient extends StompClientSupport {
31+
32+
private final TcpOperations<byte[]> tcpClient;
33+
34+
35+
/**
36+
* Create an instance with host "127.0.0.1" and port 61613.
37+
*/
38+
public ReactorNettyTcpStompClient() {
39+
this("127.0.0.1", 61613);
40+
}
41+
42+
/**
43+
* Create an instance with the given host and port.
44+
* @param host the host
45+
* @param port the port
46+
*/
47+
public ReactorNettyTcpStompClient(String host, int port) {
48+
this.tcpClient = new ReactorNettyTcpClient<byte[]>(host, port, new StompReactorNettyCodec());
49+
}
50+
51+
/**
52+
* Create an instance with a pre-configured TCP client.
53+
* @param tcpClient the client to use
54+
*/
55+
public ReactorNettyTcpStompClient(TcpOperations<byte[]> tcpClient) {
56+
Assert.notNull(tcpClient, "'tcpClient' is required");
57+
this.tcpClient = tcpClient;
58+
}
59+
60+
/**
61+
* Connect and notify the given {@link StompSessionHandler} when connected
62+
* on the STOMP level.
63+
* @param handler the handler for the STOMP session
64+
* @return ListenableFuture for access to the session when ready for use
65+
*/
66+
public ListenableFuture<StompSession> connect(StompSessionHandler handler) {
67+
return connect(null, handler);
68+
}
69+
70+
71+
/**
72+
* An overloaded version of {@link #connect(StompSessionHandler)} that
73+
* accepts headers to use for the STOMP CONNECT frame.
74+
* @param connectHeaders headers to add to the CONNECT frame
75+
* @param handler the handler for the STOMP session
76+
* @return ListenableFuture for access to the session when ready for use
77+
*/
78+
public ListenableFuture<StompSession> connect(StompHeaders connectHeaders, StompSessionHandler handler) {
79+
ConnectionHandlingStompSession session = createSession(connectHeaders, handler);
80+
this.tcpClient.connect(session);
81+
return session.getSessionFuture();
82+
}
83+
84+
/**
85+
* Shut down the client and release resources.
86+
*/
87+
public void shutdown() {
88+
this.tcpClient.shutdown();
89+
}
90+
91+
}

0 commit comments

Comments
 (0)