Skip to content
This repository was archived by the owner on Jan 19, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.cloud.aws.messaging.listener.support.AcknowledgmentHandlerMethodArgumentResolver;
import org.springframework.cloud.aws.messaging.listener.support.VisibilityHandlerMethodArgumentResolver;
import org.springframework.cloud.aws.messaging.support.NotificationMessageArgumentResolver;
import org.springframework.cloud.aws.messaging.support.NotificationSubjectArgumentResolver;
import org.springframework.cloud.aws.messaging.support.converter.ObjectMessageConverter;
Expand Down Expand Up @@ -65,6 +66,7 @@ public class QueueMessageHandler extends AbstractMethodMessageHandler<QueueMessa

static final String LOGICAL_RESOURCE_ID = "LogicalResourceId";
static final String ACKNOWLEDGMENT = "Acknowledgment";
static final String VISIBILITY = "Visibility";
private static final boolean JACKSON_2_PRESENT = ClassUtils.isPresent(
"com.fasterxml.jackson.databind.ObjectMapper", QueueMessageHandler.class.getClassLoader());

Expand All @@ -78,6 +80,7 @@ protected List<? extends HandlerMethodArgumentResolver> initArgumentResolvers()

resolvers.add(new NotificationSubjectArgumentResolver());
resolvers.add(new AcknowledgmentHandlerMethodArgumentResolver(ACKNOWLEDGMENT));
resolvers.add(new VisibilityHandlerMethodArgumentResolver(VISIBILITY));

CompositeMessageConverter compositeMessageConverter = createPayloadArgumentCompositeConverter();
resolvers.add(new NotificationMessageArgumentResolver(compositeMessageConverter));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2013-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.aws.messaging.listener;

import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest;

import java.util.concurrent.Future;

/**
* @author Szymon Dembek
* @since 1.3
*/
public class QueueMessageVisibility implements Visibility {

private final AmazonSQSAsync amazonSqsAsync;
private final String queueUrl;
private final String receiptHandle;

public QueueMessageVisibility(AmazonSQSAsync amazonSqsAsync, String queueUrl, String receiptHandle) {
this.amazonSqsAsync = amazonSqsAsync;
this.queueUrl = queueUrl;
this.receiptHandle = receiptHandle;
}

@Override
public Future<?> extend(int seconds) {
return amazonSqsAsync.changeMessageVisibilityAsync(new ChangeMessageVisibilityRequest()
.withQueueUrl(queueUrl)
.withReceiptHandle(receiptHandle)
.withVisibilityTimeout(seconds));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ private org.springframework.messaging.Message<String> getMessageForExecution() {
QueueMessageAcknowledgment acknowledgment = new QueueMessageAcknowledgment(SimpleMessageListenerContainer.this.getAmazonSqs(), this.queueUrl, receiptHandle);
additionalHeaders.put(QueueMessageHandler.ACKNOWLEDGMENT, acknowledgment);
}
additionalHeaders.put(QueueMessageHandler.VISIBILITY, new QueueMessageVisibility(SimpleMessageListenerContainer.this.getAmazonSqs(), this.queueUrl, this.message.getReceiptHandle()));

return createMessage(this.message, additionalHeaders);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2013-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.aws.messaging.listener;

import java.util.concurrent.Future;

/**
* Visibility interface that can be injected as parameter into a listener method. The purpose of this interface
* is to provide a way for the listener methods to extend the visibility timeout of the message being currently processed.
*
* @author Szymon Dembek
* @since 1.3
*/
public interface Visibility {

/**
* Allows extending the visibility timeout of a message that was already fetched from the queue, in case when
* the configured visibility timeout turns out to be to short
*
* @param seconds number of seconds to extend the visibility timeout by
*
* @return a {@link Future} as the extension can involve some asynchronous request (i.e. request to an AWS API).
*/
Future<?> extend(int seconds);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2013-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.aws.messaging.listener.support;

import org.springframework.cloud.aws.messaging.listener.Visibility;
import org.springframework.core.MethodParameter;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
import org.springframework.util.ClassUtils;

/**
* @author Szymon Dembek
* @since 1.3
*/
public class VisibilityHandlerMethodArgumentResolver implements HandlerMethodArgumentResolver {
private final String visibilityHeaderName;

public VisibilityHandlerMethodArgumentResolver(String visibilityHeaderName) {
this.visibilityHeaderName = visibilityHeaderName;
}

@Override
public boolean supportsParameter(MethodParameter parameter) {
return ClassUtils.isAssignable(Visibility.class, parameter.getParameterType());
}

@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception {
if (!message.getHeaders().containsKey(this.visibilityHeaderName) ||
message.getHeaders().get(this.visibilityHeaderName) == null) {
throw new IllegalArgumentException("No visibility object found for message header: '" + this.visibilityHeaderName + "'");
}
return message.getHeaders().get(this.visibilityHeaderName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.buffered.AmazonSQSBufferedAsyncClient;
import com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
Expand Down Expand Up @@ -638,6 +639,50 @@ protected void executeMessage(org.springframework.messaging.Message<String> stri
container.stop();
}

@Test
public void receiveMessage_withMessageListenerMethodAndVisibilityProlonging_callsChangeMessageVisibility() throws Exception {
// Arrange
final CountDownLatch countDownLatch = new CountDownLatch(1);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer() {

@Override
protected void executeMessage(org.springframework.messaging.Message<String> stringMessage) {
countDownLatch.countDown();
super.executeMessage(stringMessage);
}
};

AmazonSQSAsync sqs = mock(AmazonSQSAsync.class);
container.setAmazonSqs(sqs);

QueueMessageHandler messageHandler = new QueueMessageHandler();
container.setMessageHandler(messageHandler);

StaticApplicationContext applicationContext = new StaticApplicationContext();
applicationContext.registerSingleton("testListener", TestMessageListenerWithVisibilityProlong.class);

mockGetQueueUrl(sqs, "testQueue", "http://receiveMessage_withMessageListenerMethodAndVisibilityProlonging_callsChangeMessageVisibility.amazonaws.com");
mockGetQueueAttributesWithEmptyResult(sqs, "http://receiveMessage_withMessageListenerMethodAndVisibilityProlonging_callsChangeMessageVisibility.amazonaws.com");

messageHandler.setApplicationContext(applicationContext);
messageHandler.afterPropertiesSet();
container.afterPropertiesSet();

mockReceiveMessage(sqs, "http://receiveMessage_withMessageListenerMethodAndVisibilityProlonging_callsChangeMessageVisibility.amazonaws.com", "messageContent", "ReceiptHandle");

// Act
container.start();

// Assert
countDownLatch.await(1L, TimeUnit.SECONDS);
verify(sqs, never()).changeMessageVisibilityAsync(any(ChangeMessageVisibilityRequest.class));
TestMessageListenerWithVisibilityProlong testMessageListenerWithVisibilityProlong = applicationContext.getBean(TestMessageListenerWithVisibilityProlong.class);
testMessageListenerWithVisibilityProlong.getCountDownLatch().await(1L, TimeUnit.SECONDS);
testMessageListenerWithVisibilityProlong.extend(5);
verify(sqs, times(1)).changeMessageVisibilityAsync(eq(new ChangeMessageVisibilityRequest("http://receiveMessage_withMessageListenerMethodAndVisibilityProlonging_callsChangeMessageVisibility.amazonaws.com", "ReceiptHandle", 5)));
container.stop();
}

@Test
public void executeMessage_withDifferentDeletionPolicies_shouldActAccordingly() throws Exception {
// Arrange
Expand Down Expand Up @@ -1077,6 +1122,26 @@ public AmazonSQSAsync amazonSQS() {

}

private static class TestMessageListenerWithVisibilityProlong {
private Visibility visibility;
private final CountDownLatch countDownLatch = new CountDownLatch(1);

@RuntimeUse
@SqsListener(value = "testQueue", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
private void manualSuccess(String message, Visibility visibility) {
this.visibility = visibility;
this.countDownLatch.countDown();
}

public void extend(int seconds) {
this.visibility.extend(seconds);
}

public CountDownLatch getCountDownLatch() {
return this.countDownLatch;
}
}

private static class TestMessageListenerWithManualDeletionPolicy {

private Acknowledgment acknowledgment;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2013-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.aws.messaging.listener.support;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

/**
* @author Szymon Dembek
* @since 1.3
*/
public class VisibilityHandlerMethodArgumentResolverTest {

@Rule
public final ExpectedException expectedException = ExpectedException.none();

@Test
public void resolveArgument_messageWithNoVisibilityHeader_throwIllegalArgumentException() throws Exception {
// Arrange
VisibilityHandlerMethodArgumentResolver visibilityHandlerMethodArgumentResolver = new VisibilityHandlerMethodArgumentResolver("Visibility");
Message<String> message = MessageBuilder.withPayload("no content").build();

this.expectedException.expect(IllegalArgumentException.class);
this.expectedException.expectMessage("Visibility");

// Act
visibilityHandlerMethodArgumentResolver.resolveArgument(null, message);
}
}