Skip to content

Commit b5745af

Browse files
committed
INT-3937: Deprecate Reactor Environment usage
JIRA: https://jira.spring.io/browse/INT-3937
1 parent 4bfcdb9 commit b5745af

File tree

12 files changed

+41
-87
lines changed

12 files changed

+41
-87
lines changed

build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,6 @@ project('spring-integration-core') {
317317

318318
testCompile ("org.aspectj:aspectjweaver:$aspectjVersion")
319319
testCompile ("net.openhft:chronicle:$chronicleVersion")
320-
testCompile ("io.projectreactor.spring:reactor-spring-context:$reactorVersion")
321320
}
322321
}
323322

spring-integration-core/src/main/java/org/springframework/integration/annotation/MessagingGateway.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,9 @@
135135
* <p> This attribute is required in case of {@link reactor.rx.Promise} usage.
136136
* @return the suggested reactor Environment bean name.
137137
* @since 4.1
138+
* @deprecated with no-op in favor of global JVM-wide Reactor configuration.
138139
*/
140+
@Deprecated
139141
String reactorEnvironment() default "";
140142

141143
}

spring-integration-core/src/main/java/org/springframework/integration/config/MessagingGatewayRegistrar.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ public BeanDefinitionHolder parse(Map<String, Object> gatewayAttributes) {
9999
String defaultReplyChannel = (String) gatewayAttributes.get("defaultReplyChannel");
100100
String errorChannel = (String) gatewayAttributes.get("errorChannel");
101101
String asyncExecutor = (String) gatewayAttributes.get("asyncExecutor");
102-
String reactorEnvironment = (String) gatewayAttributes.get("reactorEnvironment");
103102
String mapper = (String) gatewayAttributes.get("mapper");
104103

105104
boolean hasMapper = StringUtils.hasText(mapper);
@@ -125,7 +124,7 @@ public BeanDefinitionHolder parse(Map<String, Object> gatewayAttributes) {
125124
String headerExpression = (String) header.get("expression");
126125
boolean hasValue = StringUtils.hasText(headerValue);
127126

128-
if (!(hasValue ^ StringUtils.hasText(headerExpression))) {
127+
if (hasValue == StringUtils.hasText(headerExpression)) {
129128
throw new BeanDefinitionStoreException("exactly one of 'value' or 'expression' " +
130129
"is required on a gateway's header.");
131130
}
@@ -157,9 +156,6 @@ public BeanDefinitionHolder parse(Map<String, Object> gatewayAttributes) {
157156
else if (StringUtils.hasText(asyncExecutor)) {
158157
gatewayProxyBuilder.addPropertyReference("asyncExecutor", asyncExecutor);
159158
}
160-
if (StringUtils.hasText(reactorEnvironment)) {
161-
gatewayProxyBuilder.addPropertyReference("reactorEnvironment", reactorEnvironment);
162-
}
163159
if (StringUtils.hasText(mapper)) {
164160
gatewayProxyBuilder.addPropertyReference("mapper", mapper);
165161
}

spring-integration-core/src/main/java/org/springframework/integration/config/xml/GatewayParser.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ public BeanDefinition parse(final Element element, ParserContext parserContext)
7373
}
7474

7575
gatewayAttributes.put("mapper", element.getAttribute("mapper"));
76-
gatewayAttributes.put("reactorEnvironment", element.getAttribute("reactor-environment"));
7776
gatewayAttributes.put("defaultReplyTimeout",
7877
element.getAttribute(isNested ? "reply-timeout" : "default-reply-timeout"));
7978
gatewayAttributes.put("defaultRequestTimeout",

spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,6 @@ public class GatewayProxyFactoryBean extends AbstractEndpoint
118118

119119
private volatile Class<?> asyncSubmitListenableType;
120120

121-
private volatile Object reactorEnvironment;
122-
123121
private volatile boolean initialized;
124122

125123
private final Object initializationMonitor = new Object();
@@ -256,12 +254,11 @@ public void setGlobalMethodMetadata(GatewayMethodMetadata globalMethodMetadata)
256254
* service interface).
257255
* @param reactorEnvironment the Reactor Environment.
258256
* @since 4.1
257+
* @deprecated with no-op in favor of global JVM-wide Reactor configuration.
259258
*/
259+
@Deprecated
260260
public void setReactorEnvironment(Object reactorEnvironment) {
261-
if (!Environment.class.getName().equals(reactorEnvironment.getClass().getName())) {
262-
throw new IllegalArgumentException("The 'reactorEnvironment' must be instance of 'reactor.Environment'");
263-
}
264-
this.reactorEnvironment = reactorEnvironment;
261+
265262
}
266263

267264
@Override
@@ -364,10 +361,7 @@ else if (Future.class.isAssignableFrom(returnType)) {
364361
}
365362
}
366363
if (reactorPresent && Promise.class.isAssignableFrom(returnType)) {
367-
if (this.reactorEnvironment == null) {
368-
throw new IllegalStateException("'reactorEnvironment' is required in case of 'Promise' return type.");
369-
}
370-
return Promises.<Object>task((Environment) this.reactorEnvironment,
364+
return Promises.<Object>task(Environment.initializeIfEmpty(),
371365
reactor.fn.Functions.supplier(new AsyncInvocationTask(invocation)));
372366
}
373367
return this.doInvoke(invocation, true);

spring-integration-core/src/main/resources/org/springframework/integration/config/spring-integration-4.3.xsd

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -789,14 +789,13 @@
789789
<xsd:attribute name="reactor-environment" type="xsd:string">
790790
<xsd:annotation>
791791
<xsd:documentation>
792-
<![CDATA[
792+
DEPRECATED with no-op in favor of global JVM-wide Reactor configuration.
793793
Provide a reference to 'reactor.core.Environment'
794794
to use for any of the interface methods that have a 'reactor.core.composable.Promise' return type.
795795
The Reactor's Environment will only be used for those async methods; the sync methods
796796
will be invoked in the caller's thread.
797797
This attribute is required if any 'service-interface' methods
798798
have a 'reactor.core.composable.Promise' return type.
799-
]]>
800799
</xsd:documentation>
801800
</xsd:annotation>
802801
</xsd:attribute>

spring-integration-core/src/test/java/org/springframework/integration/config/xml/GatewayParserTests-context.xml

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,7 @@
4949
<gateway id="promise"
5050
service-interface="org.springframework.integration.gateway.TestService"
5151
default-request-channel="requestChannel"
52-
default-reply-channel="replyChannel"
53-
reactor-environment="reactorEnvironment"/>
52+
default-reply-channel="replyChannel"/>
5453

5554
<gateway id="asyncCompletable"
5655
service-interface="org.springframework.integration.gateway.TestService"
@@ -83,7 +82,7 @@
8382
async-executor="testExecutor">
8483
<default-header name="baz" value="qux"/>
8584
<method name="oneWay" request-channel="otherRequestChannel"
86-
request-timeout="456"
85+
request-timeout="456"
8786
reply-timeout="123"
8887
payload-expression="'fiz'"
8988
reply-channel="foo">
@@ -96,6 +95,4 @@
9695

9796
<beans:bean id="testExecutor" class="org.springframework.integration.config.xml.GatewayParserTests$TestExecutor"/>
9897

99-
<beans:bean id="reactorEnvironment" class="reactor.Environment" destroy-method="shutdown"/>
100-
10198
</beans:beans>

spring-integration-core/src/test/java/org/springframework/integration/configuration/EnableIntegrationTests.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -128,18 +128,17 @@
128128
import org.springframework.test.context.support.AnnotationConfigContextLoader;
129129
import org.springframework.util.MultiValueMap;
130130

131-
import reactor.Environment;
132131
import reactor.rx.Promise;
133132
import reactor.rx.Streams;
134-
import reactor.spring.context.config.EnableReactor;
135133

136134
/**
137135
* @author Artem Bilan
138136
* @author Gary Russell
139137
* @since 4.0
140138
*/
141139
@ContextConfiguration(loader = AnnotationConfigContextLoader.class,
142-
classes = {EnableIntegrationTests.ContextConfiguration.class, EnableIntegrationTests.ContextConfiguration2.class})
140+
classes = {EnableIntegrationTests.ContextConfiguration.class,
141+
EnableIntegrationTests.ContextConfiguration2.class})
143142
@RunWith(SpringJUnit4ClassRunner.class)
144143
@DirtiesContext
145144
public class EnableIntegrationTests {
@@ -606,17 +605,13 @@ public void testBridgeAnnotations() {
606605
assertNull(replyChannel.receive(10));
607606
}
608607

609-
@Autowired
610-
private Environment environment;
611-
612608
@Test
613609
public void testPromiseGateway() throws Exception {
614610

615611
final AtomicReference<List<Integer>> ref = new AtomicReference<List<Integer>>();
616612
final CountDownLatch consumeLatch = new CountDownLatch(1);
617613

618614
Streams.just("1", "2", "3", "4", "5")
619-
.dispatchOn(this.environment)
620615
.map(Integer::parseInt)
621616
.flatMap(this.testGateway::multiply)
622617
.toList()
@@ -911,7 +906,6 @@ public Integer getInvoked() {
911906
@EnableMessageHistory("${message.history.tracked.components}")
912907
@EnablePublisher("publishedChannel")
913908
@EnableAsync
914-
@EnableReactor
915909
public static class ContextConfiguration2 {
916910

917911
/*
@@ -1332,7 +1326,7 @@ public interface TestGateway2 {
13321326

13331327
@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})
13341328
@Retention(RetentionPolicy.RUNTIME)
1335-
@MessagingGateway(defaultRequestChannel = "gatewayChannel", reactorEnvironment = "reactorEnv",
1329+
@MessagingGateway(defaultRequestChannel = "gatewayChannel",
13361330
defaultRequestTimeout = "${default.request.timeout:12300}", defaultReplyTimeout = "#{13400}",
13371331
defaultHeaders = @GatewayHeader(name = "foo", value = "FOO"))
13381332
public @interface TestMessagingGateway {

spring-integration-core/src/test/java/org/springframework/integration/gateway/AsyncGatewayTests.java

Lines changed: 4 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2015 the original author or authors.
2+
* Copyright 2002-2016 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,7 +19,6 @@
1919
import static org.junit.Assert.assertEquals;
2020
import static org.junit.Assert.assertNotEquals;
2121
import static org.junit.Assert.assertNotNull;
22-
import static org.junit.Assert.assertThat;
2322
import static org.junit.Assert.assertTrue;
2423
import static org.junit.Assert.fail;
2524
import static org.mockito.Mockito.mock;
@@ -31,11 +30,7 @@
3130
import java.util.concurrent.TimeoutException;
3231
import java.util.concurrent.atomic.AtomicReference;
3332

34-
import org.hamcrest.Matchers;
3533
import org.junit.Test;
36-
import reactor.Environment;
37-
import reactor.fn.Consumer;
38-
import reactor.rx.Promise;
3934

4035
import org.springframework.beans.factory.BeanFactory;
4136
import org.springframework.integration.annotation.Gateway;
@@ -50,6 +45,9 @@
5045
import org.springframework.util.concurrent.ListenableFuture;
5146
import org.springframework.util.concurrent.ListenableFutureCallback;
5247

48+
import reactor.fn.Consumer;
49+
import reactor.rx.Promise;
50+
5351
/**
5452
* @author Mark Fisher
5553
* @author Oleg Zhurakousky
@@ -59,15 +57,9 @@
5957
*/
6058
public class AsyncGatewayTests {
6159

62-
private final Environment reactorEnvironment = new Environment();
63-
6460
// TODO: changed from 0 because of recurrent failure: is this right?
6561
private final long safety = 100;
6662

67-
public void tearDown() {
68-
this.reactorEnvironment.shutdown();
69-
}
70-
7163
@Test
7264
public void futureWithMessageReturned() throws Exception {
7365
QueueChannel requestChannel = new QueueChannel();
@@ -255,7 +247,6 @@ public void promiseWithMessageReturned() throws Exception {
255247
proxyFactory.setServiceInterface(TestEchoService.class);
256248
proxyFactory.setBeanFactory(mock(BeanFactory.class));
257249
proxyFactory.setBeanName("testGateway");
258-
proxyFactory.setReactorEnvironment(this.reactorEnvironment);
259250
proxyFactory.afterPropertiesSet();
260251
TestEchoService service = (TestEchoService) proxyFactory.getObject();
261252
Promise<Message<?>> promise = service.returnMessagePromise("foo");
@@ -271,7 +262,6 @@ public void promiseWithPayloadReturned() throws Exception {
271262
proxyFactory.setDefaultRequestChannel(requestChannel);
272263
proxyFactory.setServiceInterface(TestEchoService.class);
273264
proxyFactory.setBeanFactory(mock(BeanFactory.class));
274-
proxyFactory.setReactorEnvironment(this.reactorEnvironment);
275265
proxyFactory.setBeanName("testGateway");
276266
proxyFactory.afterPropertiesSet();
277267
TestEchoService service = (TestEchoService) proxyFactory.getObject();
@@ -289,7 +279,6 @@ public void promiseWithWildcardReturned() throws Exception {
289279
proxyFactory.setServiceInterface(TestEchoService.class);
290280
proxyFactory.setBeanFactory(mock(BeanFactory.class));
291281
proxyFactory.setBeanName("testGateway");
292-
proxyFactory.setReactorEnvironment(this.reactorEnvironment);
293282
proxyFactory.afterPropertiesSet();
294283
TestEchoService service = (TestEchoService) proxyFactory.getObject();
295284
Promise<?> promise = service.returnSomethingPromise("foo");
@@ -307,7 +296,6 @@ public void promiseWithConsumer() throws Exception {
307296
proxyFactory.setServiceInterface(TestEchoService.class);
308297
proxyFactory.setBeanFactory(mock(BeanFactory.class));
309298
proxyFactory.setBeanName("testGateway");
310-
proxyFactory.setReactorEnvironment(this.reactorEnvironment);
311299
proxyFactory.afterPropertiesSet();
312300
TestEchoService service = (TestEchoService) proxyFactory.getObject();
313301
Promise<String> promise = service.returnStringPromise("foo");
@@ -327,24 +315,6 @@ public void accept(String s) {
327315
assertEquals("foobar", result.get());
328316
}
329317

330-
@Test
331-
public void promiseMethodWithoutEnvironment() throws Exception {
332-
GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean();
333-
proxyFactory.setServiceInterface(TestEchoService.class);
334-
proxyFactory.setBeanFactory(mock(BeanFactory.class));
335-
proxyFactory.setBeanName("testGateway");
336-
proxyFactory.afterPropertiesSet();
337-
TestEchoService service = (TestEchoService) proxyFactory.getObject();
338-
try {
339-
service.returnStringPromise("foo");
340-
fail("IllegalStateException expected");
341-
}
342-
catch (Exception e) {
343-
assertThat(e, Matchers.instanceOf(IllegalStateException.class));
344-
assertEquals(e.getMessage(), "'reactorEnvironment' is required in case of 'Promise' return type.");
345-
}
346-
}
347-
348318
private static void startResponder(final PollableChannel requestChannel) {
349319
new Thread(new Runnable() {
350320

spring-integration-core/src/test/java/org/springframework/integration/routingslip/RoutingSlipTests.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@
6161
import org.springframework.test.context.support.GenericXmlContextLoader;
6262

6363
import reactor.rx.Streams;
64-
import reactor.spring.context.config.EnableReactor;
6564

6665
/**
6766
* @author Artem Bilan
@@ -174,7 +173,6 @@ public Object getNextPath(Message<?> requestMessage, Object reply) {
174173
}
175174

176175
@Configuration
177-
@EnableReactor
178176
@EnableIntegration
179177
public static class RoutingSlipConfiguration {
180178

0 commit comments

Comments
 (0)