diff --git a/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/listener/QueueMessageHandler.java b/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/listener/QueueMessageHandler.java index 44726e304..dbcf1d546 100644 --- a/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/listener/QueueMessageHandler.java +++ b/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/listener/QueueMessageHandler.java @@ -21,6 +21,7 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener; import org.springframework.cloud.aws.messaging.listener.support.AcknowledgmentHandlerMethodArgumentResolver; +import org.springframework.cloud.aws.messaging.listener.support.VisibilityHandlerMethodArgumentResolver; import org.springframework.cloud.aws.messaging.support.NotificationMessageArgumentResolver; import org.springframework.cloud.aws.messaging.support.NotificationSubjectArgumentResolver; import org.springframework.cloud.aws.messaging.support.converter.ObjectMessageConverter; @@ -65,6 +66,7 @@ public class QueueMessageHandler extends AbstractMethodMessageHandler initArgumentResolvers() resolvers.add(new NotificationSubjectArgumentResolver()); resolvers.add(new AcknowledgmentHandlerMethodArgumentResolver(ACKNOWLEDGMENT)); + resolvers.add(new VisibilityHandlerMethodArgumentResolver(VISIBILITY)); CompositeMessageConverter compositeMessageConverter = createPayloadArgumentCompositeConverter(); resolvers.add(new NotificationMessageArgumentResolver(compositeMessageConverter)); diff --git a/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/listener/QueueMessageVisibility.java b/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/listener/QueueMessageVisibility.java new file mode 100644 index 000000000..337ba6952 --- /dev/null +++ b/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/listener/QueueMessageVisibility.java @@ -0,0 +1,47 @@ +/* + * Copyright 2013-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.aws.messaging.listener; + +import com.amazonaws.services.sqs.AmazonSQSAsync; +import com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest; + +import java.util.concurrent.Future; + +/** + * @author Szymon Dembek + * @since 1.3 + */ +public class QueueMessageVisibility implements Visibility { + + private final AmazonSQSAsync amazonSqsAsync; + private final String queueUrl; + private final String receiptHandle; + + public QueueMessageVisibility(AmazonSQSAsync amazonSqsAsync, String queueUrl, String receiptHandle) { + this.amazonSqsAsync = amazonSqsAsync; + this.queueUrl = queueUrl; + this.receiptHandle = receiptHandle; + } + + @Override + public Future extend(int seconds) { + return amazonSqsAsync.changeMessageVisibilityAsync(new ChangeMessageVisibilityRequest() + .withQueueUrl(queueUrl) + .withReceiptHandle(receiptHandle) + .withVisibilityTimeout(seconds)); + } +} diff --git a/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/listener/SimpleMessageListenerContainer.java b/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/listener/SimpleMessageListenerContainer.java index 8dbcca938..cfe3f62a1 100644 --- a/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/listener/SimpleMessageListenerContainer.java +++ b/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/listener/SimpleMessageListenerContainer.java @@ -347,6 +347,7 @@ private org.springframework.messaging.Message getMessageForExecution() { QueueMessageAcknowledgment acknowledgment = new QueueMessageAcknowledgment(SimpleMessageListenerContainer.this.getAmazonSqs(), this.queueUrl, receiptHandle); additionalHeaders.put(QueueMessageHandler.ACKNOWLEDGMENT, acknowledgment); } + additionalHeaders.put(QueueMessageHandler.VISIBILITY, new QueueMessageVisibility(SimpleMessageListenerContainer.this.getAmazonSqs(), this.queueUrl, this.message.getReceiptHandle())); return createMessage(this.message, additionalHeaders); } diff --git a/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/listener/Visibility.java b/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/listener/Visibility.java new file mode 100644 index 000000000..cb588122b --- /dev/null +++ b/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/listener/Visibility.java @@ -0,0 +1,40 @@ +/* + * Copyright 2013-2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.aws.messaging.listener; + +import java.util.concurrent.Future; + +/** + * Visibility interface that can be injected as parameter into a listener method. The purpose of this interface + * is to provide a way for the listener methods to extend the visibility timeout of the message being currently processed. + * + * @author Szymon Dembek + * @since 1.3 + */ +public interface Visibility { + + /** + * Allows extending the visibility timeout of a message that was already fetched from the queue, in case when + * the configured visibility timeout turns out to be to short + * + * @param seconds number of seconds to extend the visibility timeout by + * + * @return a {@link Future} as the extension can involve some asynchronous request (i.e. request to an AWS API). + */ + Future extend(int seconds); + +} diff --git a/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/listener/support/VisibilityHandlerMethodArgumentResolver.java b/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/listener/support/VisibilityHandlerMethodArgumentResolver.java new file mode 100644 index 000000000..2dda2146c --- /dev/null +++ b/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/listener/support/VisibilityHandlerMethodArgumentResolver.java @@ -0,0 +1,49 @@ +/* + * Copyright 2013-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.aws.messaging.listener.support; + +import org.springframework.cloud.aws.messaging.listener.Visibility; +import org.springframework.core.MethodParameter; +import org.springframework.messaging.Message; +import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver; +import org.springframework.util.ClassUtils; + +/** + * @author Szymon Dembek + * @since 1.3 + */ +public class VisibilityHandlerMethodArgumentResolver implements HandlerMethodArgumentResolver { + private final String visibilityHeaderName; + + public VisibilityHandlerMethodArgumentResolver(String visibilityHeaderName) { + this.visibilityHeaderName = visibilityHeaderName; + } + + @Override + public boolean supportsParameter(MethodParameter parameter) { + return ClassUtils.isAssignable(Visibility.class, parameter.getParameterType()); + } + + @Override + public Object resolveArgument(MethodParameter parameter, Message message) throws Exception { + if (!message.getHeaders().containsKey(this.visibilityHeaderName) || + message.getHeaders().get(this.visibilityHeaderName) == null) { + throw new IllegalArgumentException("No visibility object found for message header: '" + this.visibilityHeaderName + "'"); + } + return message.getHeaders().get(this.visibilityHeaderName); + } +} diff --git a/spring-cloud-aws-messaging/src/test/java/org/springframework/cloud/aws/messaging/listener/SimpleMessageListenerContainerTest.java b/spring-cloud-aws-messaging/src/test/java/org/springframework/cloud/aws/messaging/listener/SimpleMessageListenerContainerTest.java index 3d5107020..d3ba04ec3 100644 --- a/spring-cloud-aws-messaging/src/test/java/org/springframework/cloud/aws/messaging/listener/SimpleMessageListenerContainerTest.java +++ b/spring-cloud-aws-messaging/src/test/java/org/springframework/cloud/aws/messaging/listener/SimpleMessageListenerContainerTest.java @@ -19,6 +19,7 @@ import com.amazonaws.AmazonClientException; import com.amazonaws.services.sqs.AmazonSQSAsync; import com.amazonaws.services.sqs.buffered.AmazonSQSBufferedAsyncClient; +import com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest; import com.amazonaws.services.sqs.model.DeleteMessageRequest; import com.amazonaws.services.sqs.model.GetQueueAttributesRequest; import com.amazonaws.services.sqs.model.GetQueueAttributesResult; @@ -638,6 +639,50 @@ protected void executeMessage(org.springframework.messaging.Message stri container.stop(); } + @Test + public void receiveMessage_withMessageListenerMethodAndVisibilityProlonging_callsChangeMessageVisibility() throws Exception { + // Arrange + final CountDownLatch countDownLatch = new CountDownLatch(1); + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer() { + + @Override + protected void executeMessage(org.springframework.messaging.Message stringMessage) { + countDownLatch.countDown(); + super.executeMessage(stringMessage); + } + }; + + AmazonSQSAsync sqs = mock(AmazonSQSAsync.class); + container.setAmazonSqs(sqs); + + QueueMessageHandler messageHandler = new QueueMessageHandler(); + container.setMessageHandler(messageHandler); + + StaticApplicationContext applicationContext = new StaticApplicationContext(); + applicationContext.registerSingleton("testListener", TestMessageListenerWithVisibilityProlong.class); + + mockGetQueueUrl(sqs, "testQueue", "http://receiveMessage_withMessageListenerMethodAndVisibilityProlonging_callsChangeMessageVisibility.amazonaws.com"); + mockGetQueueAttributesWithEmptyResult(sqs, "http://receiveMessage_withMessageListenerMethodAndVisibilityProlonging_callsChangeMessageVisibility.amazonaws.com"); + + messageHandler.setApplicationContext(applicationContext); + messageHandler.afterPropertiesSet(); + container.afterPropertiesSet(); + + mockReceiveMessage(sqs, "http://receiveMessage_withMessageListenerMethodAndVisibilityProlonging_callsChangeMessageVisibility.amazonaws.com", "messageContent", "ReceiptHandle"); + + // Act + container.start(); + + // Assert + countDownLatch.await(1L, TimeUnit.SECONDS); + verify(sqs, never()).changeMessageVisibilityAsync(any(ChangeMessageVisibilityRequest.class)); + TestMessageListenerWithVisibilityProlong testMessageListenerWithVisibilityProlong = applicationContext.getBean(TestMessageListenerWithVisibilityProlong.class); + testMessageListenerWithVisibilityProlong.getCountDownLatch().await(1L, TimeUnit.SECONDS); + testMessageListenerWithVisibilityProlong.extend(5); + verify(sqs, times(1)).changeMessageVisibilityAsync(eq(new ChangeMessageVisibilityRequest("http://receiveMessage_withMessageListenerMethodAndVisibilityProlonging_callsChangeMessageVisibility.amazonaws.com", "ReceiptHandle", 5))); + container.stop(); + } + @Test public void executeMessage_withDifferentDeletionPolicies_shouldActAccordingly() throws Exception { // Arrange @@ -1077,6 +1122,26 @@ public AmazonSQSAsync amazonSQS() { } + private static class TestMessageListenerWithVisibilityProlong { + private Visibility visibility; + private final CountDownLatch countDownLatch = new CountDownLatch(1); + + @RuntimeUse + @SqsListener(value = "testQueue", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS) + private void manualSuccess(String message, Visibility visibility) { + this.visibility = visibility; + this.countDownLatch.countDown(); + } + + public void extend(int seconds) { + this.visibility.extend(seconds); + } + + public CountDownLatch getCountDownLatch() { + return this.countDownLatch; + } + } + private static class TestMessageListenerWithManualDeletionPolicy { private Acknowledgment acknowledgment; diff --git a/spring-cloud-aws-messaging/src/test/java/org/springframework/cloud/aws/messaging/listener/support/VisibilityHandlerMethodArgumentResolverTest.java b/spring-cloud-aws-messaging/src/test/java/org/springframework/cloud/aws/messaging/listener/support/VisibilityHandlerMethodArgumentResolverTest.java new file mode 100644 index 000000000..9694f4732 --- /dev/null +++ b/spring-cloud-aws-messaging/src/test/java/org/springframework/cloud/aws/messaging/listener/support/VisibilityHandlerMethodArgumentResolverTest.java @@ -0,0 +1,46 @@ +/* + * Copyright 2013-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.aws.messaging.listener.support; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +/** + * @author Szymon Dembek + * @since 1.3 + */ +public class VisibilityHandlerMethodArgumentResolverTest { + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + @Test + public void resolveArgument_messageWithNoVisibilityHeader_throwIllegalArgumentException() throws Exception { + // Arrange + VisibilityHandlerMethodArgumentResolver visibilityHandlerMethodArgumentResolver = new VisibilityHandlerMethodArgumentResolver("Visibility"); + Message message = MessageBuilder.withPayload("no content").build(); + + this.expectedException.expect(IllegalArgumentException.class); + this.expectedException.expectMessage("Visibility"); + + // Act + visibilityHandlerMethodArgumentResolver.resolveArgument(null, message); + } +} \ No newline at end of file