Skip to content

Commit 758a6bd

Browse files
garyrussellartembilan
authored andcommitted
INT-3914: Add CompoundTriggerAdvice
JIRA: https://jira.spring.io/browse/INT-3914
1 parent b5ec73b commit 758a6bd

File tree

5 files changed

+306
-13
lines changed

5 files changed

+306
-13
lines changed
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright 2015 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.integration.aop;
17+
18+
import org.springframework.integration.core.MessageSource;
19+
import org.springframework.integration.util.CompoundTrigger;
20+
import org.springframework.messaging.Message;
21+
import org.springframework.scheduling.Trigger;
22+
import org.springframework.util.Assert;
23+
24+
/**
25+
* An {@link AbstractMessageSourceAdvice} that uses a {@link CompoundTrigger} to adjust
26+
* the poller - when a message is present, the compound trigger's primary trigger is
27+
* used to determine the next poll. When no message is present, the override trigger is
28+
* used.
29+
* <p>
30+
* The poller advised by this class must be configured to use the same
31+
* {@link CompoundTrigger} instance and must <b>not</b> use a task executor.
32+
*
33+
* @author Gary Russell
34+
* @since 4.3
35+
*
36+
*/
37+
public class CompoundTriggerAdvice extends AbstractMessageSourceAdvice {
38+
39+
private final CompoundTrigger compoundTrigger;
40+
41+
private final Trigger override;
42+
43+
public CompoundTriggerAdvice(CompoundTrigger compoundTrigger, Trigger overrideTrigger) {
44+
Assert.notNull(compoundTrigger, "'compoundTrigger' cannot be null");
45+
this.compoundTrigger = compoundTrigger;
46+
this.override = overrideTrigger;
47+
}
48+
49+
@Override
50+
public boolean beforeReceive(MessageSource<?> source) {
51+
return true;
52+
}
53+
54+
@Override
55+
public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
56+
if (result == null) {
57+
this.compoundTrigger.setOverride(this.override);
58+
}
59+
else {
60+
this.compoundTrigger.setOverride(null);
61+
}
62+
return result;
63+
}
64+
65+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright 2015 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.integration.util;
17+
18+
import java.util.Date;
19+
20+
import org.springframework.scheduling.Trigger;
21+
import org.springframework.scheduling.TriggerContext;
22+
23+
import reactor.core.support.Assert;
24+
25+
/**
26+
* A {@link Trigger} that delegates the {@link #nextExecutionTime(TriggerContext)}
27+
* to one of two Triggers. If the {@link #setOverride(Trigger) override} trigger is
28+
* {@code null}, the primary trigger is invoked; otherwise the override trigger is
29+
* invoked.
30+
*
31+
* @author Gary Russell
32+
* @since 4.3
33+
*
34+
*/
35+
public class CompoundTrigger implements Trigger {
36+
37+
private volatile Trigger primary;
38+
39+
private volatile Trigger override;
40+
41+
/**
42+
* Construct a compound trigger with the supplied primary trigger.
43+
* @param primary the primary trigger.
44+
*/
45+
public CompoundTrigger(Trigger primary) {
46+
setPrimary(primary);
47+
}
48+
49+
/**
50+
* Set the primary trigger.
51+
* @param primary the trigger.
52+
*/
53+
public final void setPrimary(Trigger primary) {
54+
Assert.notNull(primary, "'primary' cannot be null");
55+
this.primary = primary;
56+
}
57+
58+
/**
59+
* Set the override trigger; set to null to revert to using the
60+
* primary trigger.
61+
* @param override the override trigger, or null.
62+
*/
63+
public void setOverride(Trigger override) {
64+
this.override = override;
65+
}
66+
67+
@Override
68+
public Date nextExecutionTime(TriggerContext triggerContext) {
69+
if (this.override != null) {
70+
return this.override.nextExecutionTime(triggerContext);
71+
}
72+
else {
73+
return this.primary.nextExecutionTime(triggerContext);
74+
}
75+
}
76+
77+
}

spring-integration-core/src/test/java/org/springframework/integration/endpoint/PollerAdviceTests.java

Lines changed: 73 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,14 @@
1818

1919
import static org.hamcrest.Matchers.contains;
2020
import static org.junit.Assert.assertFalse;
21+
import static org.junit.Assert.assertNotNull;
2122
import static org.junit.Assert.assertThat;
2223
import static org.junit.Assert.assertTrue;
24+
import static org.mockito.Matchers.any;
25+
import static org.mockito.Mockito.atLeast;
2326
import static org.mockito.Mockito.mock;
27+
import static org.mockito.Mockito.spy;
28+
import static org.mockito.Mockito.verify;
2429

2530
import java.util.ArrayList;
2631
import java.util.Collections;
@@ -33,23 +38,27 @@
3338
import org.aopalliance.aop.Advice;
3439
import org.aopalliance.intercept.MethodInterceptor;
3540
import org.aopalliance.intercept.MethodInvocation;
36-
import org.hamcrest.Matchers;
3741
import org.junit.Test;
3842

3943
import org.springframework.beans.factory.BeanFactory;
44+
import org.springframework.context.ConfigurableApplicationContext;
45+
import org.springframework.context.support.ClassPathXmlApplicationContext;
4046
import org.springframework.integration.aop.AbstractMessageSourceAdvice;
47+
import org.springframework.integration.aop.CompoundTriggerAdvice;
4148
import org.springframework.integration.aop.SimpleActiveIdleMessageSourceAdvice;
4249
import org.springframework.integration.channel.NullChannel;
43-
import org.springframework.integration.channel.QueueChannel;
4450
import org.springframework.integration.core.MessageSource;
4551
import org.springframework.integration.scheduling.PollSkipAdvice;
4652
import org.springframework.integration.scheduling.PollSkipStrategy;
53+
import org.springframework.integration.test.util.TestUtils;
54+
import org.springframework.integration.util.CompoundTrigger;
4755
import org.springframework.integration.util.DynamicPeriodicTrigger;
4856
import org.springframework.messaging.Message;
4957
import org.springframework.messaging.support.GenericMessage;
5058
import org.springframework.scheduling.Trigger;
5159
import org.springframework.scheduling.TriggerContext;
5260
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
61+
import org.springframework.scheduling.support.PeriodicTrigger;
5362

5463
/**
5564
* @author Gary Russell
@@ -221,11 +230,11 @@ public Message<Object> receive() {
221230
return m;
222231
}
223232
});
224-
QueueChannel channel = new QueueChannel();
225233
SimpleActiveIdleMessageSourceAdvice toggling = new SimpleActiveIdleMessageSourceAdvice(trigger);
226234
toggling.setActivePollPeriod(11);
227235
toggling.setIdlePollPeriod(12);
228236
adapter.setAdviceChain(Collections.singletonList(toggling));
237+
adapter.setTrigger(trigger);
229238
configure(adapter);
230239
adapter.afterPropertiesSet();
231240
adapter.start();
@@ -234,7 +243,42 @@ public Message<Object> receive() {
234243
while (triggerPeriods.size() > 5) {
235244
triggerPeriods.removeLast();
236245
}
237-
assertThat(triggerPeriods, Matchers.contains(10L, 12L, 11L, 12L, 11L));
246+
assertThat(triggerPeriods, contains(10L, 12L, 11L, 12L, 11L));
247+
}
248+
249+
@Test
250+
public void testCompoundTriggerAdvice() throws Exception {
251+
SourcePollingChannelAdapter adapter = new SourcePollingChannelAdapter();
252+
final CountDownLatch latch = new CountDownLatch(5);
253+
final LinkedList<Object> overridePresent = new LinkedList<Object>();
254+
final CompoundTrigger compoundTrigger = new CompoundTrigger(new PeriodicTrigger(10));
255+
Trigger override = spy(new PeriodicTrigger(5));
256+
final CompoundTriggerAdvice advice = new CompoundTriggerAdvice(compoundTrigger, override);
257+
adapter.setSource(new MessageSource<Object>() {
258+
259+
@Override
260+
public Message<Object> receive() {
261+
overridePresent.add(TestUtils.getPropertyValue(compoundTrigger, "override"));
262+
Message<Object> m = null;
263+
if (latch.getCount() % 2 == 0) {
264+
m = new GenericMessage<Object>("foo");
265+
}
266+
latch.countDown();
267+
return m;
268+
}
269+
});
270+
adapter.setAdviceChain(Collections.singletonList(advice));
271+
adapter.setTrigger(compoundTrigger);
272+
configure(adapter);
273+
adapter.afterPropertiesSet();
274+
adapter.start();
275+
assertTrue(latch.await(10, TimeUnit.SECONDS));
276+
adapter.stop();
277+
while (overridePresent.size() > 5) {
278+
overridePresent.removeLast();
279+
}
280+
assertThat(overridePresent, contains(null, override, null, override, null));
281+
verify(override, atLeast(2)).nextExecutionTime(any(TriggerContext.class));
238282
}
239283

240284
private void configure(SourcePollingChannelAdapter adapter) {
@@ -245,4 +289,29 @@ private void configure(SourcePollingChannelAdapter adapter) {
245289
adapter.setTaskScheduler(scheduler);
246290
}
247291

292+
@Test
293+
public void testCompoundAdviceXML() throws Exception {
294+
ConfigurableApplicationContext ctx = new ClassPathXmlApplicationContext("compound-trigger-context.xml",
295+
getClass());
296+
SourcePollingChannelAdapter adapter = ctx.getBean(SourcePollingChannelAdapter.class);
297+
Source source = ctx.getBean(Source.class);
298+
adapter.start();
299+
assertTrue(source.latch.await(10, TimeUnit.SECONDS));
300+
assertNotNull(TestUtils.getPropertyValue(adapter, "trigger.override"));
301+
adapter.stop();
302+
ctx.close();
303+
}
304+
305+
public static class Source implements MessageSource<Object> {
306+
307+
private final CountDownLatch latch = new CountDownLatch(5);
308+
309+
@Override
310+
public Message<Object> receive() {
311+
latch.countDown();
312+
return null;
313+
}
314+
315+
}
316+
248317
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<beans xmlns="http://www.springframework.org/schema/beans"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xmlns:int="http://www.springframework.org/schema/integration"
5+
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
6+
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd">
7+
8+
<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
9+
<bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
10+
<int:poller trigger="compoundTrigger">
11+
<int:advice-chain>
12+
<bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
13+
<constructor-arg ref="compoundTrigger"/>
14+
<constructor-arg ref="secondary"/>
15+
</bean>
16+
</int:advice-chain>
17+
</int:poller>
18+
</int:inbound-channel-adapter>
19+
20+
<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
21+
<constructor-arg ref="primary" />
22+
</bean>
23+
24+
<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
25+
<constructor-arg value="*/1 * * * * *" />
26+
</bean>
27+
28+
<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
29+
<constructor-arg value="10" />
30+
</bean>
31+
32+
</beans>

src/reference/asciidoc/polling-consumer.adoc

Lines changed: 59 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,16 @@ Message<?> afterReceive(Message<?> result, MessageSource<?> source)
8484
This method is called after the `receive()` method; again, you can reconfigure the source, or take any action perhaps depending on the result (which can be `null` if there was no message created by the source).
8585
You can even return a different message!
8686

87+
.Advice Chain Ordering
88+
[IMPORTANT]
89+
=====
90+
It is important to understand how the advice chain is processed during initialization.
91+
`Advice` objects that do not extend `AbstractMessageSourceAdvice` are applied to the whole poll process and are all invoked first, in order, before any `AbstractMessageSourceAdvice`; then `AbstractMessageSourceAdvice` objects are invoked in order around the `MessageSource` `receive()` method.
92+
If you have, say `Advice` objects `a, b, c, d`, where `b` and `d` are `AbstractMessageSourceAdvice`, they will be applied in the order `a, c, b, d`.
93+
Also, if a `MessageSource` is already a `Proxy`, the `AbstractMessageSourceAdvice` will be invoked after any existing `Advice` objects.
94+
If you wish to change the order, you should wire up the proxy yourself.
95+
=====
96+
8797
===== SimpleActiveIdleMessageSourceAdvice
8898

8999
This advice is a simple implementation of `AbstractMessageSourceAdvice`, when used in conjunction with a `DynamicPeriodicTrigger`, it adjuststhe polling frequency depending on whether or not the previous poll resulted in a message or not.
@@ -95,12 +105,52 @@ This will only work if the advice is called on the poller thread.
95105
It will *not* work if the poller has a `task-executor`.
96106
To use this advice where you wish to use async operations after the result of a poll, do the async handoff later, perhaps by using an `ExecutorChannel`.
97107

98-
.Advice Chain Ordering
99-
[IMPORTANT]
100-
=====
101-
It is important to understand how the advice chain is processed during initialization.
102-
`Advice` objects that do not extend `AbstractMessageSourceAdvice` are applied to the whole poll process and are all invoked first, in order, before any `AbstractMessageSourceAdvice`; then `AbstractMessageSourceAdvice` objects are invoked in order around the `MessageSource` `receive()` method.
103-
If you have, say `Advice` objects `a, b, c, d`, where `b` and `d` are `AbstractMessageSourceAdvice`, they will be applied in the order `a, c, b, d`.
104-
Also, if a `MessageSource` is already a `Proxy`, the `AbstractMessageSourceAdvice` will be invoked after any existing `Advice` objects.
105-
If you wish to change the order, you should wire up the proxy yourself.
106-
=====
108+
===== CompoundTriggerAdvice
109+
110+
This advice allows the selection of one of two triggers based on whether a poll returns a message or not.
111+
Consider a poller that uses a `CronTrigger`; `CronTrigger` s are immutable so cannot be altered once constructed.
112+
Consider a use case where we want to use a cron expression to trigger a poll once each hour but, if no message is
113+
received, poll once per minute and, when a message is retrieved, revert to using the cron expression.
114+
115+
The advice (and poller) use a `CompoundTrigger` for this purpose.
116+
The trigger's `primary` trigger can be a `CronTrigger`.
117+
When the advice detects that no message is received, it adds the secondary trigger to the `CompoundTrigger`.
118+
When the `CompoundTrigger` 's `nextExecutionTime` method is invoked, it will delegate to the secondary trigger, if
119+
present; otherwise the primary trigger.
120+
121+
The poller must also have a reference to the same `CompoundTrigger`.
122+
123+
The following shows the configuration for the hourly cron expression with fall-back to every minute...
124+
125+
[source, xml]
126+
----
127+
<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
128+
<bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
129+
<int:poller trigger="compoundTrigger">
130+
<int:advice-chain>
131+
<bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
132+
<constructor-arg ref="compoundTrigger"/>
133+
<constructor-arg ref="secondary"/>
134+
</bean>
135+
</int:advice-chain>
136+
</int:poller>
137+
</int:inbound-channel-adapter>
138+
139+
<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
140+
<constructor-arg ref="primary" />
141+
</bean>
142+
143+
<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
144+
<constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
145+
</bean>
146+
147+
<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
148+
<constructor-arg value="60000" />
149+
</bean>
150+
----
151+
152+
.Important: Async Handoff
153+
IMPORTANT: This advice modifies the trigger based on the `receive()` result.
154+
This will only work if the advice is called on the poller thread.
155+
It will *not* work if the poller has a `task-executor`.
156+
To use this advice where you wish to use async operations after the result of a poll, do the async handoff later, perhaps by using an `ExecutorChannel`.

0 commit comments

Comments
 (0)