From 014d77933b742ba47208fb2054fc64b2e0f81c89 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Wed, 28 Mar 2018 14:36:16 -0400 Subject: [PATCH] AMQP-806: Simple Publisher Confirms JIRA: https://jira.spring.io/browse/AMQP-806 Avoid complex infrastructure for simple use cases. --- .../amqp/core/MessageProperties.java | 20 +++++ .../connection/CachingConnectionFactory.java | 66 +++++++++++---- .../amqp/rabbit/connection/ChannelProxy.java | 11 ++- .../rabbit/connection/ConnectionFactory.java | 11 ++- .../amqp/rabbit/core/RabbitTemplate.java | 32 +++++++- .../core/SimplePublisherConfirmsTests.java | 80 +++++++++++++++++++ src/reference/asciidoc/amqp.adoc | 35 +++++++- 7 files changed, 234 insertions(+), 21 deletions(-) create mode 100644 spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/SimplePublisherConfirmsTests.java diff --git a/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java b/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java index aacde988c..bc5fca193 100644 --- a/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java +++ b/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java @@ -120,6 +120,8 @@ public class MessageProperties implements Serializable { private volatile boolean finalRetryForMessageWithNoId; + private volatile long publishSequenceNumber; + private volatile transient Type inferredArgumentType; private volatile transient Method targetMethod; @@ -460,6 +462,24 @@ public void setFinalRetryForMessageWithNoId(boolean finalRetryForMessageWithNoId this.finalRetryForMessageWithNoId = finalRetryForMessageWithNoId; } + /** + * Return the publish sequence number if publisher confirms are enabled; set by the template. + * @return the sequence number. + * @since 2.1 + */ + public long getPublishSequenceNumber() { + return this.publishSequenceNumber; + } + + /** + * Set the publish sequence number, if publisher confirms are enabled; set by the template. + * @param publishSequenceNumber the sequence number. + * @since 2.1 + */ + public void setPublishSequenceNumber(long publishSequenceNumber) { + this.publishSequenceNumber = publishSequenceNumber; + } + /** * The inferred target argument type when using a method-level * {@code @RabbitListener}. diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java index d2f6f03db..8f4573def 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -142,34 +142,35 @@ public enum CacheMode { private final CachingConnectionFactory publisherConnectionFactory; - private volatile long channelCheckoutTimeout = 0; + /** Synchronization monitor for the shared Connection. */ + private final Object connectionMonitor = new Object(); - private volatile CacheMode cacheMode = CacheMode.CHANNEL; + /** Executor used for deferred close if no explicit executor set. */ + private final ExecutorService deferredCloseExecutor = Executors.newCachedThreadPool(); - private volatile int channelCacheSize = DEFAULT_CHANNEL_CACHE_SIZE; + private long channelCheckoutTimeout = 0; - private volatile int connectionCacheSize = 1; + private CacheMode cacheMode = CacheMode.CHANNEL; - private volatile int connectionLimit = Integer.MAX_VALUE; + private int channelCacheSize = DEFAULT_CHANNEL_CACHE_SIZE; - private volatile boolean active = true; + private int connectionCacheSize = 1; - private volatile boolean publisherConfirms; + private int connectionLimit = Integer.MAX_VALUE; - private volatile boolean publisherReturns; + private boolean publisherConfirms; - private volatile boolean initialized; + private boolean simplePublisherConfirms; - private volatile boolean stopped; + private boolean publisherReturns; - private volatile ConditionalExceptionLogger closeExceptionLogger = new DefaultChannelCloseLogger(); + private ConditionalExceptionLogger closeExceptionLogger = new DefaultChannelCloseLogger(); - /** Synchronization monitor for the shared Connection. */ - private final Object connectionMonitor = new Object(); + private volatile boolean active = true; - /** Executor used for deferred close if no explicit executor set. */ - private final ExecutorService deferredCloseExecutor = Executors.newCachedThreadPool(); + private volatile boolean initialized; + private volatile boolean stopped; /** * Create a new CachingConnectionFactory initializing the hostname to be the value returned from @@ -340,13 +341,39 @@ public void setPublisherReturns(boolean publisherReturns) { } } + /** + * Use full publisher confirms, with correlation data and a callback for each message. + * @param publisherConfirms true for full publisher returns, + * @since 1.1 + * @see #setSimplePublisherConfirms(boolean) + */ public void setPublisherConfirms(boolean publisherConfirms) { + Assert.isTrue(!this.simplePublisherConfirms, "Cannot set both publisherConfirms and simplePublisherConfirms"); this.publisherConfirms = publisherConfirms; if (this.publisherConnectionFactory != null) { this.publisherConnectionFactory.setPublisherConfirms(publisherConfirms); } } + /** + * Use simple publisher confirms where the template simply waits for completion. + * @param simplePublisherConfirms true for confirms. + * @since 2.1 + * @see #setPublisherConfirms(boolean) + */ + public void setSimplePublisherConfirms(boolean simplePublisherConfirms) { + Assert.isTrue(!this.publisherConfirms, "Cannot set both publisherConfirms and simplePublisherConfirms"); + this.simplePublisherConfirms = simplePublisherConfirms; + if (this.publisherConnectionFactory != null) { + this.publisherConnectionFactory.setSimplePublisherConfirms(simplePublisherConfirms); + } + } + + @Override + public boolean isSimplePublisherConfirms() { + return this.simplePublisherConfirms; + } + /** * Sets the channel checkout timeout. When greater than 0, enables channel limiting * in that the {@link #channelCacheSize} becomes the total number of available channels per @@ -583,7 +610,7 @@ else if (this.cacheMode == CacheMode.CONNECTION) { private Channel doCreateBareChannel(ChannelCachingConnectionProxy connection, boolean transactional) { Channel channel = connection.createBareChannel(transactional); - if (this.publisherConfirms) { + if (this.publisherConfirms || this.simplePublisherConfirms) { try { channel.confirmSelect(); } @@ -894,6 +921,8 @@ private final class CachedChannelInvocationHandler implements InvocationHandler private final boolean transactional; + private volatile boolean confirmSelected = CachingConnectionFactory.this.simplePublisherConfirms; + private volatile Channel target; private volatile boolean txStarted; @@ -956,6 +985,9 @@ else if (methodName.equals("isOpen")) { else if (methodName.equals("isTransactional")) { return this.transactional; } + else if (methodName.equals("isConfirmSelected")) { + return this.confirmSelected; + } try { if (this.target == null || !this.target.isOpen()) { if (this.target instanceof PublisherCallbackChannel) { diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ChannelProxy.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ChannelProxy.java index 545cffb63..36b827609 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ChannelProxy.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ChannelProxy.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,4 +42,13 @@ public interface ChannelProxy extends Channel { */ boolean isTransactional(); + /** + * Return true if confirms are selected on this channel. + * @return true if confirms selected. + * @since 2.1 + */ + default boolean isConfirmSelected() { + return false; + } + } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConnectionFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConnectionFactory.java index 4373dfa86..def7afdd9 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConnectionFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConnectionFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -56,4 +56,13 @@ public interface ConnectionFactory { return null; } + /** + * Return true if simple publisher confirms are enabled. + * @return simplePublisherConfirms + * @since 2.1 + */ + default boolean isSimplePublisherConfirms() { + return false; + } + } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java index 6fe65a864..543b0d40f 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java @@ -96,6 +96,7 @@ import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.AMQP.Queue.DeclareOk; import com.rabbitmq.client.Channel; +import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.GetResponse; @@ -1848,6 +1849,21 @@ private T doExecute(ChannelCallback action, ConnectionFactory connectionF @Override public T invoke(OperationsCallback action) { + return invoke(action, null, null); + } + + /** + * Invoke operations on the same channel. + * If callbacks are needed, both callbacks must be supplied. + * @param action the callback. + * @param acks a confirm callback for acks. + * @param nacks a confirm callback for nacks. + * @param the return type. + * @return the result of the action method. + */ + public T invoke(OperationsCallback action, com.rabbitmq.client.ConfirmCallback acks, + com.rabbitmq.client.ConfirmCallback nacks) { + final Channel currentChannel = this.dedicatedChannels.get(); Assert.state(currentChannel == null, () -> "Nested invoke() calls are not supported; channel '" + currentChannel + "' is already associated with this thread"); @@ -1881,10 +1897,18 @@ public T invoke(OperationsCallback action) { throw e; } } + ConfirmListener listener = null; + if (acks != null && nacks != null && channel instanceof ChannelProxy + && ((ChannelProxy) channel).isConfirmSelected()) { + listener = channel.addConfirmListener(acks, nacks); + } try { return action.doInRabbit(this); } finally { + if (listener != null) { + channel.removeConfirmListener(listener); + } this.activeTemplateCallbacks.decrementAndGet(); this.dedicatedChannels.remove(); if (resourceHolder != null) { @@ -2004,9 +2028,15 @@ private void setupConfirm(Channel channel, Message message, CorrelationData corr correlationData = this.correlationDataPostProcessor != null ? this.correlationDataPostProcessor.postProcess(message, correlationData) : correlationData; - publisherCallbackChannel.addPendingConfirm(this, channel.getNextPublishSeqNo(), + long nextPublishSeqNo = channel.getNextPublishSeqNo(); + message.getMessageProperties().setPublishSequenceNumber(nextPublishSeqNo); + publisherCallbackChannel.addPendingConfirm(this, nextPublishSeqNo, new PendingConfirm(correlationData, System.currentTimeMillis())); } + else if (channel instanceof ChannelProxy && ((ChannelProxy) channel).isConfirmSelected()) { + long nextPublishSeqNo = channel.getNextPublishSeqNo(); + message.getMessageProperties().setPublishSequenceNumber(nextPublishSeqNo); + } } /** diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/SimplePublisherConfirmsTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/SimplePublisherConfirmsTests.java new file mode 100644 index 000000000..bd316316a --- /dev/null +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/SimplePublisherConfirmsTests.java @@ -0,0 +1,80 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.amqp.rabbit.core; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.jupiter.api.Test; + +import org.springframework.amqp.core.MessageProperties; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.junit.RabbitAvailable; + +/** + * @author Gary Russell + * @since 2.1 + * + */ +@RabbitAvailable(queues = SimplePublisherConfirmsTests.QUEUE) +public class SimplePublisherConfirmsTests { + + public static final String QUEUE = "simple.confirms"; + + @Test + public void testConfirms() { + CachingConnectionFactory cf = new CachingConnectionFactory("localhost"); + cf.setSimplePublisherConfirms(true); + RabbitTemplate template = new RabbitTemplate(cf); + template.setRoutingKey(QUEUE); + assertTrue(template.invoke(t -> { + template.convertAndSend("foo"); + template.convertAndSend("bar"); + template.waitForConfirmsOrDie(10_000); + return true; + })); + cf.destroy(); + } + + @Test + public void testConfirmsWithCallbacks() { + CachingConnectionFactory cf = new CachingConnectionFactory("localhost"); + cf.setSimplePublisherConfirms(true); + RabbitTemplate template = new RabbitTemplate(cf); + template.setRoutingKey(QUEUE); + AtomicReference finalProperties = new AtomicReference<>(); + AtomicLong lastAck = new AtomicLong(); + assertTrue(template.invoke(t -> { + template.convertAndSend("foo"); + template.convertAndSend("bar", m -> { + finalProperties.set(m.getMessageProperties()); + return m; + }); + template.waitForConfirmsOrDie(10_000); + return true; + }, (tag, multiple) -> { + lastAck.set(tag); + }, (tag, multiple) -> { })); + assertThat(lastAck.get(), equalTo(finalProperties.get().getPublishSequenceNumber())); + cf.destroy(); + } + +} diff --git a/src/reference/asciidoc/amqp.adoc b/src/reference/asciidoc/amqp.adoc index 7e8c7a70d..2921fe100 100644 --- a/src/reference/asciidoc/amqp.adoc +++ b/src/reference/asciidoc/amqp.adoc @@ -578,6 +578,8 @@ When such a channel is obtained, the client can register a `PublisherCallbackCha The `PublisherCallbackChannel` implementation contains logic to route a confirm/return to the appropriate listener. These features are explained further in the following sections. +Also see `simplePublisherConfirms` in <>. + TIP: For some more background information, please see the following blog post by the RabbitMQ team titled http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/[Introducing Publisher Confirms]. [[connection-channel-listeners]] @@ -977,7 +979,7 @@ The `RabbitTemplate` now provides `waitForConfirms(long timeout)` and `waitForCo The methods cannot be used outside of that scope, for obvious reasons. Note that a higher-level abstraction which allows you to correlate confirms to requests is provided elsewhere (see <>). -You still need to set the connection factory's `publisherConfirms` property to `true` as discussed in that section, but for simple use cases where you just want to wait until all confirms are received, you can use this technique here: +If you only want to wait until the broker has confirmed delivery, you can use this simple technique: [source, java] ---- @@ -996,6 +998,37 @@ For example, when running on a transacted listener container thread and performi In that case, the operations will be performed on that channel and committed when the thread returns to the container; it is not necessary to use `invoke` in that scenario. +When using confirms in this way, much of the infrastructure set up for correlating confirms to requests is not really needed. +Starting with _version 2.1_, the connection factory supports a new property `simplePublisherConfirms`. +When this is `true` the infrastructure is avoided and the confirm processing will be more efficient. + +Furthermore, the `RabbitTemplate` sets the property `publisherSequenceNumber` in the sent message `MessageProperties`. +If you wish to check (or log, etc) specific confirmations, you can do so with an overloaded `invoke` method: + +[source, java] +---- +public T invoke(OperationsCallback action, com.rabbitmq.client.ConfirmCallback acks, + com.rabbitmq.client.ConfirmCallback nacks); +---- + +NOTE: These `ConfirmCallback` objects (for acks and nacks) are the Rabbit client callbacks, not the template callback. + +Example: + +[source, java] +---- +Collection messages = getMessagesToSend(); +Boolean result = this.template.invoke(t -> { + messages.forEach(m -> t.convertAndSend(ROUTE, m)); + t.waitForConfirmsOrDie(10_000); + return true; +}, (tag, multiple) -> { + log.info("Ack: " + tag + ":" + multiple); +}, (tag, multiple) -> { + log.info("Nack: " + tag + ":" + multiple); +})); +---- + [[template-messaging]] ===== Messaging integration