Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ public class MessageProperties implements Serializable {

private volatile boolean finalRetryForMessageWithNoId;

private volatile long publishSequenceNumber;

private volatile transient Type inferredArgumentType;

private volatile transient Method targetMethod;
Expand Down Expand Up @@ -460,6 +462,24 @@ public void setFinalRetryForMessageWithNoId(boolean finalRetryForMessageWithNoId
this.finalRetryForMessageWithNoId = finalRetryForMessageWithNoId;
}

/**
* Return the publish sequence number if publisher confirms are enabled; set by the template.
* @return the sequence number.
* @since 2.1
*/
public long getPublishSequenceNumber() {
return this.publishSequenceNumber;
}

/**
* Set the publish sequence number, if publisher confirms are enabled; set by the template.
* @param publishSequenceNumber the sequence number.
* @since 2.1
*/
public void setPublishSequenceNumber(long publishSequenceNumber) {
this.publishSequenceNumber = publishSequenceNumber;
}

/**
* The inferred target argument type when using a method-level
* {@code @RabbitListener}.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -142,34 +142,35 @@ public enum CacheMode {

private final CachingConnectionFactory publisherConnectionFactory;

private volatile long channelCheckoutTimeout = 0;
/** Synchronization monitor for the shared Connection. */
private final Object connectionMonitor = new Object();

private volatile CacheMode cacheMode = CacheMode.CHANNEL;
/** Executor used for deferred close if no explicit executor set. */
private final ExecutorService deferredCloseExecutor = Executors.newCachedThreadPool();

private volatile int channelCacheSize = DEFAULT_CHANNEL_CACHE_SIZE;
private long channelCheckoutTimeout = 0;

private volatile int connectionCacheSize = 1;
private CacheMode cacheMode = CacheMode.CHANNEL;

private volatile int connectionLimit = Integer.MAX_VALUE;
private int channelCacheSize = DEFAULT_CHANNEL_CACHE_SIZE;

private volatile boolean active = true;
private int connectionCacheSize = 1;

private volatile boolean publisherConfirms;
private int connectionLimit = Integer.MAX_VALUE;

private volatile boolean publisherReturns;
private boolean publisherConfirms;

private volatile boolean initialized;
private boolean simplePublisherConfirms;

private volatile boolean stopped;
private boolean publisherReturns;

private volatile ConditionalExceptionLogger closeExceptionLogger = new DefaultChannelCloseLogger();
private ConditionalExceptionLogger closeExceptionLogger = new DefaultChannelCloseLogger();

/** Synchronization monitor for the shared Connection. */
private final Object connectionMonitor = new Object();
private volatile boolean active = true;

/** Executor used for deferred close if no explicit executor set. */
private final ExecutorService deferredCloseExecutor = Executors.newCachedThreadPool();
private volatile boolean initialized;

private volatile boolean stopped;

/**
* Create a new CachingConnectionFactory initializing the hostname to be the value returned from
Expand Down Expand Up @@ -340,13 +341,39 @@ public void setPublisherReturns(boolean publisherReturns) {
}
}

/**
* Use full publisher confirms, with correlation data and a callback for each message.
* @param publisherConfirms true for full publisher returns,
* @since 1.1
* @see #setSimplePublisherConfirms(boolean)
*/
public void setPublisherConfirms(boolean publisherConfirms) {
Assert.isTrue(!this.simplePublisherConfirms, "Cannot set both publisherConfirms and simplePublisherConfirms");
this.publisherConfirms = publisherConfirms;
if (this.publisherConnectionFactory != null) {
this.publisherConnectionFactory.setPublisherConfirms(publisherConfirms);
}
}

/**
* Use simple publisher confirms where the template simply waits for completion.
* @param simplePublisherConfirms true for confirms.
* @since 2.1
* @see #setPublisherConfirms(boolean)
*/
public void setSimplePublisherConfirms(boolean simplePublisherConfirms) {
Assert.isTrue(!this.publisherConfirms, "Cannot set both publisherConfirms and simplePublisherConfirms");
this.simplePublisherConfirms = simplePublisherConfirms;
if (this.publisherConnectionFactory != null) {
this.publisherConnectionFactory.setSimplePublisherConfirms(simplePublisherConfirms);
}
}

@Override
public boolean isSimplePublisherConfirms() {
return this.simplePublisherConfirms;
}

/**
* Sets the channel checkout timeout. When greater than 0, enables channel limiting
* in that the {@link #channelCacheSize} becomes the total number of available channels per
Expand Down Expand Up @@ -583,7 +610,7 @@ else if (this.cacheMode == CacheMode.CONNECTION) {

private Channel doCreateBareChannel(ChannelCachingConnectionProxy connection, boolean transactional) {
Channel channel = connection.createBareChannel(transactional);
if (this.publisherConfirms) {
if (this.publisherConfirms || this.simplePublisherConfirms) {
try {
channel.confirmSelect();
}
Expand Down Expand Up @@ -894,6 +921,8 @@ private final class CachedChannelInvocationHandler implements InvocationHandler

private final boolean transactional;

private volatile boolean confirmSelected = CachingConnectionFactory.this.simplePublisherConfirms;

private volatile Channel target;

private volatile boolean txStarted;
Expand Down Expand Up @@ -956,6 +985,9 @@ else if (methodName.equals("isOpen")) {
else if (methodName.equals("isTransactional")) {
return this.transactional;
}
else if (methodName.equals("isConfirmSelected")) {
return this.confirmSelected;
}
try {
if (this.target == null || !this.target.isOpen()) {
if (this.target instanceof PublisherCallbackChannel) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,4 +42,13 @@ public interface ChannelProxy extends Channel {
*/
boolean isTransactional();

/**
* Return true if confirms are selected on this channel.
* @return true if confirms selected.
* @since 2.1
*/
default boolean isConfirmSelected() {
return false;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -56,4 +56,13 @@ public interface ConnectionFactory {
return null;
}

/**
* Return true if simple publisher confirms are enabled.
* @return simplePublisherConfirms
* @since 2.1
*/
default boolean isSimplePublisherConfirms() {
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.AMQP.Queue.DeclareOk;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.GetResponse;
Expand Down Expand Up @@ -1848,6 +1849,21 @@ private <T> T doExecute(ChannelCallback<T> action, ConnectionFactory connectionF

@Override
public <T> T invoke(OperationsCallback<T> action) {
return invoke(action, null, null);
}

/**
* Invoke operations on the same channel.
* If callbacks are needed, both callbacks must be supplied.
* @param action the callback.
* @param acks a confirm callback for acks.
* @param nacks a confirm callback for nacks.
* @param <T> the return type.
* @return the result of the action method.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@since 2.1 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to add this method to RabbitOperations - can you do that if you find no more problems in the review?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops - too late.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just add subsequent commit.
Not a big deal though.

Thanks for the pointer! My bad 😄

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed via bf5498d

*/
public <T> T invoke(OperationsCallback<T> action, com.rabbitmq.client.ConfirmCallback acks,
com.rabbitmq.client.ConfirmCallback nacks) {

final Channel currentChannel = this.dedicatedChannels.get();
Assert.state(currentChannel == null, () -> "Nested invoke() calls are not supported; channel '" + currentChannel
+ "' is already associated with this thread");
Expand Down Expand Up @@ -1881,10 +1897,18 @@ public <T> T invoke(OperationsCallback<T> action) {
throw e;
}
}
ConfirmListener listener = null;
if (acks != null && nacks != null && channel instanceof ChannelProxy
&& ((ChannelProxy) channel).isConfirmSelected()) {
listener = channel.addConfirmListener(acks, nacks);
}
try {
return action.doInRabbit(this);
}
finally {
if (listener != null) {
channel.removeConfirmListener(listener);
}
this.activeTemplateCallbacks.decrementAndGet();
this.dedicatedChannels.remove();
if (resourceHolder != null) {
Expand Down Expand Up @@ -2004,9 +2028,15 @@ private void setupConfirm(Channel channel, Message message, CorrelationData corr
correlationData = this.correlationDataPostProcessor != null
? this.correlationDataPostProcessor.postProcess(message, correlationData)
: correlationData;
publisherCallbackChannel.addPendingConfirm(this, channel.getNextPublishSeqNo(),
long nextPublishSeqNo = channel.getNextPublishSeqNo();
message.getMessageProperties().setPublishSequenceNumber(nextPublishSeqNo);
publisherCallbackChannel.addPendingConfirm(this, nextPublishSeqNo,
new PendingConfirm(correlationData, System.currentTimeMillis()));
}
else if (channel instanceof ChannelProxy && ((ChannelProxy) channel).isConfirmSelected()) {
long nextPublishSeqNo = channel.getNextPublishSeqNo();
message.getMessageProperties().setPublishSequenceNumber(nextPublishSeqNo);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.rabbit.core;

import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.jupiter.api.Test;

import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.junit.RabbitAvailable;

/**
* @author Gary Russell
* @since 2.1
*
*/
@RabbitAvailable(queues = SimplePublisherConfirmsTests.QUEUE)
public class SimplePublisherConfirmsTests {

public static final String QUEUE = "simple.confirms";

@Test
public void testConfirms() {
CachingConnectionFactory cf = new CachingConnectionFactory("localhost");
cf.setSimplePublisherConfirms(true);
RabbitTemplate template = new RabbitTemplate(cf);
template.setRoutingKey(QUEUE);
assertTrue(template.invoke(t -> {
template.convertAndSend("foo");
template.convertAndSend("bar");
template.waitForConfirmsOrDie(10_000);
return true;
}));
cf.destroy();
}

@Test
public void testConfirmsWithCallbacks() {
CachingConnectionFactory cf = new CachingConnectionFactory("localhost");
cf.setSimplePublisherConfirms(true);
RabbitTemplate template = new RabbitTemplate(cf);
template.setRoutingKey(QUEUE);
AtomicReference<MessageProperties> finalProperties = new AtomicReference<>();
AtomicLong lastAck = new AtomicLong();
assertTrue(template.invoke(t -> {
template.convertAndSend("foo");
template.convertAndSend("bar", m -> {
finalProperties.set(m.getMessageProperties());
return m;
});
template.waitForConfirmsOrDie(10_000);
return true;
}, (tag, multiple) -> {
lastAck.set(tag);
}, (tag, multiple) -> { }));
assertThat(lastAck.get(), equalTo(finalProperties.get().getPublishSequenceNumber()));
cf.destroy();
}

}
Loading