diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractMessageGroupStore.java b/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractMessageGroupStore.java index e40d16023e7..9cc44c90f5b 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractMessageGroupStore.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractMessageGroupStore.java @@ -25,6 +25,7 @@ import org.springframework.integration.support.DefaultMessageBuilderFactory; import org.springframework.integration.support.MessageBuilderFactory; import org.springframework.jmx.export.annotation.ManagedAttribute; +import org.springframework.jmx.export.annotation.ManagedResource; /** * @author Dave Syer @@ -34,6 +35,7 @@ * @since 2.0 * */ +@ManagedResource public abstract class AbstractMessageGroupStore implements MessageGroupStore, Iterable, BeanFactoryAware { @@ -153,4 +155,4 @@ private void expire(MessageGroup group) { } } -} +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageStore.java b/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageStore.java index 9b5a4ea6c67..8ed914f86ba 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageStore.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageStore.java @@ -25,7 +25,6 @@ import org.springframework.integration.support.locks.LockRegistry; import org.springframework.integration.util.UpperBound; import org.springframework.jmx.export.annotation.ManagedAttribute; -import org.springframework.jmx.export.annotation.ManagedResource; import org.springframework.messaging.Message; import org.springframework.messaging.MessagingException; import org.springframework.util.Assert; @@ -43,7 +42,6 @@ * * @since 2.0 */ -@ManagedResource public class SimpleMessageStore extends AbstractMessageGroupStore implements MessageStore, ChannelMessageStore { diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/AbstractConfigurableMongoDbMessageStore.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/AbstractConfigurableMongoDbMessageStore.java new file mode 100644 index 00000000000..47629c71f85 --- /dev/null +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/AbstractConfigurableMongoDbMessageStore.java @@ -0,0 +1,255 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.mongodb.store; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import com.mongodb.DB; +import com.mongodb.MongoException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.beans.BeansException; +import org.springframework.beans.DirectFieldAccessor; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.core.convert.TypeDescriptor; +import org.springframework.core.convert.converter.Converter; +import org.springframework.core.convert.converter.GenericConverter; +import org.springframework.core.serializer.support.DeserializingConverter; +import org.springframework.core.serializer.support.SerializingConverter; +import org.springframework.dao.DataAccessException; +import org.springframework.data.mongodb.MongoDbFactory; +import org.springframework.data.mongodb.core.DbCallback; +import org.springframework.data.mongodb.core.FindAndModifyOptions; +import org.springframework.data.mongodb.core.IndexOperations; +import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.data.mongodb.core.convert.CustomConversions; +import org.springframework.data.mongodb.core.convert.MappingMongoConverter; +import org.springframework.data.mongodb.core.index.Index; +import org.springframework.data.mongodb.core.mapping.MongoMappingContext; +import org.springframework.data.mongodb.core.query.Criteria; +import org.springframework.data.mongodb.core.query.Order; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.data.mongodb.core.query.Update; +import org.springframework.integration.context.IntegrationContextUtils; +import org.springframework.integration.store.BasicMessageGroupStore; +import org.springframework.integration.support.DefaultMessageBuilderFactory; +import org.springframework.integration.support.MessageBuilderFactory; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.util.Assert; + +/** + * The abstract MongoDB {@link BasicMessageGroupStore} implementation to provide configuration for common options + * for implementations of this class. + * + * @author Artem Bilan + * @since 4.0 + */ + +public abstract class AbstractConfigurableMongoDbMessageStore implements BasicMessageGroupStore, InitializingBean, + ApplicationContextAware { + + public final static String SEQUENCE_NAME = "messagesSequence"; + + /** + * The name of the message header that stores a flag to indicate that the message has been saved. This is an + * optimization for the put method. + */ + public static final String SAVED_KEY = "MongoDbMessageStore.SAVED"; + + /** + * The name of the message header that stores a timestamp for the time the message was inserted. + */ + public static final String CREATED_DATE_KEY = "MongoDbMessageStore.CREATED_DATE"; + + protected final Log logger = LogFactory.getLog(this.getClass()); + + protected final String collectionName; + + protected final MongoDbFactory mongoDbFactory; + + protected MongoTemplate mongoTemplate; + + protected MappingMongoConverter mappingMongoConverter; + + protected ApplicationContext applicationContext; + + protected MessageBuilderFactory messageBuilderFactory = new DefaultMessageBuilderFactory(); + + public AbstractConfigurableMongoDbMessageStore(MongoTemplate mongoTemplate, String collectionName) { + Assert.notNull("'mongoTemplate' must not be null"); + Assert.hasText("'collectionName' must not be empty"); + this.collectionName = collectionName; + this.mongoTemplate = mongoTemplate; + this.mongoDbFactory = null; + } + + public AbstractConfigurableMongoDbMessageStore(MongoDbFactory mongoDbFactory, String collectionName) { + this(mongoDbFactory, null, collectionName); + } + + public AbstractConfigurableMongoDbMessageStore(MongoDbFactory mongoDbFactory, MappingMongoConverter mappingMongoConverter, String collectionName) { + Assert.notNull("'mongoDbFactory' must not be null"); + Assert.hasText("'collectionName' must not be empty"); + this.collectionName = collectionName; + this.mongoDbFactory = mongoDbFactory; + this.mappingMongoConverter = mappingMongoConverter; + } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + this.messageBuilderFactory = IntegrationContextUtils.getMessageBuilderFactory(this.applicationContext); + } + + @Override + public void afterPropertiesSet() throws Exception { + if (this.mongoTemplate == null) { + if (this.mappingMongoConverter == null) { + this.mappingMongoConverter = new MappingMongoConverter(this.mongoDbFactory, new MongoMappingContext()); + this.mappingMongoConverter.setApplicationContext(this.applicationContext); + List customConverters = new ArrayList(); + customConverters.add(new MongoDbMessageBytesConverter()); + this.mappingMongoConverter.setCustomConversions(new CustomConversions(customConverters)); + this.mappingMongoConverter.afterPropertiesSet(); + } + this.mongoTemplate = new MongoTemplate(this.mongoDbFactory, this.mappingMongoConverter); + if (this.applicationContext != null) { + this.mongoTemplate.setApplicationContext(this.applicationContext); + } + } + + IndexOperations indexOperations = this.mongoTemplate.indexOps(this.collectionName); + + indexOperations.ensureIndex(new Index(MessageDocumentFields.MESSAGE_ID, Order.ASCENDING)); + + indexOperations.ensureIndex(new Index(MessageDocumentFields.GROUP_ID, Order.ASCENDING) + .on(MessageDocumentFields.LAST_MODIFIED_TIME, Order.DESCENDING) + .on(MessageDocumentFields.SEQUENCE, Order.DESCENDING)); + } + + public Message getMessage(UUID id) { + Assert.notNull(id, "'id' must not be null"); + Query query = Query.query(Criteria.where(MessageDocumentFields.MESSAGE_ID).is(id)); + MessageDocument document = this.mongoTemplate.findOne(query, MessageDocument.class, this.collectionName); + return document != null ? document.getMessage() : null; + } + + @Override + public void removeMessageGroup(Object groupId) { + this.mongoTemplate.remove(groupIdQuery(groupId), this.collectionName); + } + + @Override + public int messageGroupSize(Object groupId) { + long lCount = this.mongoTemplate.count(groupIdQuery(groupId), this.collectionName); + Assert.isTrue(lCount <= Integer.MAX_VALUE, "Message count is out of Integer's range"); + return (int) lCount; + } + + /** + * Perform MongoDB {@code INC} operation for the document, which contains the {@link MessageDocument} + * {@code sequence}, and return the new incremented value for the new {@link MessageDocument}. + * The {@link #SEQUENCE_NAME} document is created on demand. + * @return the next sequence value. + */ + protected int getNextId() { + Query query = Query.query(Criteria.where("_id").is(SEQUENCE_NAME)); + query.fields().include(MessageDocumentFields.SEQUENCE); + return (Integer) this.mongoTemplate.findAndModify(query, + new Update().inc(MessageDocumentFields.SEQUENCE, 1), + FindAndModifyOptions.options().returnNew(true).upsert(true), + Map.class, this.collectionName) + .get(MessageDocumentFields.SEQUENCE); + } + + protected void addMessageDocument(final MessageDocument document) { + this.mongoTemplate.executeInSession(new DbCallback() { + @Override + public Void doInDB(DB db) throws MongoException, DataAccessException { + Message message = document.getMessage(); + if (message.getHeaders().containsKey(SAVED_KEY)) { + Message saved = getMessage(message.getHeaders().getId()); + if (saved != null) { + if (saved.equals(message)) { + return null; + } // We need to save it under its own id + } + } + + final long createdDate = document.getCreatedTime() == 0 ? System.currentTimeMillis() : document.getCreatedTime(); + + Message result = messageBuilderFactory.fromMessage(message).setHeader(SAVED_KEY, Boolean.TRUE) + .setHeader(CREATED_DATE_KEY, createdDate).build(); + + @SuppressWarnings("unchecked") + Map innerMap = (Map) new DirectFieldAccessor(result.getHeaders()).getPropertyValue("headers"); + // using reflection to set ID since it is immutable through MessageHeaders + innerMap.put(MessageHeaders.ID, message.getHeaders().get(MessageHeaders.ID)); + innerMap.put(MessageHeaders.TIMESTAMP, message.getHeaders().get(MessageHeaders.TIMESTAMP)); + + document.setCreatedTime(createdDate); + mongoTemplate.insert(document, collectionName); + return null; + } + }); + } + + protected static Query groupIdQuery(Object groupId) { + return Query.query(Criteria.where(MessageDocumentFields.GROUP_ID).is(groupId)); + } + + /** + * A {@link org.springframework.core.convert.converter.GenericConverter} implementation to convert {@link org.springframework.messaging.Message} to + * serialized {@link byte[]} to store {@link org.springframework.messaging.Message} to the MongoDB. + * And vice versa - to convert {@link byte[]} from the MongoDB to the {@link org.springframework.messaging.Message}. + */ + private static class MongoDbMessageBytesConverter implements GenericConverter { + + private final Converter serializingConverter = new SerializingConverter(); + + private final Converter deserializingConverter = new DeserializingConverter(); + + @Override + public Set getConvertibleTypes() { + Set convertiblePairs = new HashSet(); + convertiblePairs.add(new ConvertiblePair(Message.class, byte[].class)); + convertiblePairs.add(new ConvertiblePair(byte[].class, Message.class)); + return convertiblePairs; + } + + @Override + public Object convert(Object source, TypeDescriptor sourceType, TypeDescriptor targetType) { + if (Message.class.isAssignableFrom(sourceType.getObjectType())) { + return serializingConverter.convert(source); + } + else { + return deserializingConverter.convert((byte[]) source); + } + } + + } + +} diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java index 9aec43fec9f..10e147bc3f7 100644 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java @@ -13,47 +13,34 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.springframework.integration.mongodb.store; import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; +import java.util.Collection; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; -import java.util.Map; -import java.util.Set; import java.util.UUID; -import org.springframework.beans.BeansException; -import org.springframework.beans.DirectFieldAccessor; -import org.springframework.beans.factory.InitializingBean; -import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextAware; -import org.springframework.core.convert.TypeDescriptor; -import org.springframework.core.convert.converter.Converter; -import org.springframework.core.convert.converter.GenericConverter; -import org.springframework.core.serializer.support.DeserializingConverter; -import org.springframework.core.serializer.support.SerializingConverter; -import org.springframework.data.annotation.Id; +import com.mongodb.DB; +import com.mongodb.MongoException; + +import org.springframework.dao.DataAccessException; import org.springframework.data.domain.Sort; import org.springframework.data.mongodb.MongoDbFactory; -import org.springframework.data.mongodb.core.IndexOperations; +import org.springframework.data.mongodb.core.DbCallback; import org.springframework.data.mongodb.core.MongoTemplate; -import org.springframework.data.mongodb.core.convert.CustomConversions; import org.springframework.data.mongodb.core.convert.MappingMongoConverter; -import org.springframework.data.mongodb.core.index.Index; -import org.springframework.data.mongodb.core.mapping.MongoMappingContext; import org.springframework.data.mongodb.core.query.Criteria; -import org.springframework.data.mongodb.core.query.Order; import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Update; -import org.springframework.integration.store.AbstractMessageGroupStore; import org.springframework.integration.store.MessageGroup; import org.springframework.integration.store.MessageGroupStore; import org.springframework.integration.store.MessageStore; import org.springframework.integration.store.SimpleMessageGroup; +import org.springframework.jmx.export.annotation.ManagedAttribute; import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHeaders; import org.springframework.util.Assert; /** @@ -67,41 +54,14 @@ * @author Artem Bilan * @since 3.0 */ -public class ConfigurableMongoDbMessageStore extends AbstractMessageGroupStore - implements MessageStore, InitializingBean, ApplicationContextAware { +public class ConfigurableMongoDbMessageStore extends AbstractConfigurableMongoDbMessageStore + implements MessageStore, MessageGroupStore, Iterable { public final static String DEFAULT_COLLECTION_NAME = "configurableStoreMessages"; - /** - * The name of the message header that stores a flag to indicate that the message has been saved. This is an - * optimization for the put method. - */ - public static final String SAVED_KEY = ConfigurableMongoDbMessageStore.class.getSimpleName() + ".SAVED"; - - /** - * The name of the message header that stores a timestamp for the time the message was inserted. - */ - public static final String CREATED_DATE_KEY = ConfigurableMongoDbMessageStore.class.getSimpleName() + ".CREATED_DATE"; - - private static final String MESSAGE_ID = "messageId"; - - private static final String GROUP_ID = "groupId"; - - private static final String LAST_MODIFIED_TIME = "lastModifiedTime"; - - private static final String LAST_RELEASED_SEQUENCE = "lastReleasedSequence"; - - private static final String COMPLETE = "complete"; - - private final String collectionName; - - private final MongoDbFactory mongoDbFactory; - - private volatile MongoTemplate mongoTemplate; + private final Collection expiryCallbacks = new LinkedHashSet(); - private volatile MappingMongoConverter mappingMongoConverter; - - private ApplicationContext applicationContext; + private volatile boolean timeoutOnIdle; public ConfigurableMongoDbMessageStore(MongoTemplate mongoTemplate) { @@ -109,11 +69,7 @@ public ConfigurableMongoDbMessageStore(MongoTemplate mongoTemplate) { } public ConfigurableMongoDbMessageStore(MongoTemplate mongoTemplate, String collectionName) { - Assert.notNull("'mongoTemplate' must not be null"); - Assert.hasText("'collectionName' must not be empty"); - this.collectionName = collectionName; - this.mongoTemplate = mongoTemplate; - this.mongoDbFactory = null; + super(mongoTemplate, collectionName); } public ConfigurableMongoDbMessageStore(MongoDbFactory mongoDbFactory) { @@ -129,47 +85,35 @@ public ConfigurableMongoDbMessageStore(MongoDbFactory mongoDbFactory, String col } public ConfigurableMongoDbMessageStore(MongoDbFactory mongoDbFactory, MappingMongoConverter mappingMongoConverter, String collectionName) { - Assert.notNull("'mongoDbFactory' must not be null"); - Assert.hasText("'collectionName' must not be empty"); - this.collectionName = collectionName; - this.mongoDbFactory = mongoDbFactory; - this.mappingMongoConverter = mappingMongoConverter; - } - - @Override - public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { - this.applicationContext = applicationContext; + super(mongoDbFactory, mappingMongoConverter, collectionName); } - @Override - public void afterPropertiesSet() throws Exception { - if (this.mongoTemplate == null) { - if (this.mappingMongoConverter == null) { - this.mappingMongoConverter = new MappingMongoConverter(this.mongoDbFactory, new MongoMappingContext()); - this.mappingMongoConverter.setApplicationContext(this.applicationContext); - List customConverters = new ArrayList(); - customConverters.add(new MongoDbMessageBytesConverter()); - this.mappingMongoConverter.setCustomConversions(new CustomConversions(customConverters)); - this.mappingMongoConverter.afterPropertiesSet(); - } - this.mongoTemplate = new MongoTemplate(this.mongoDbFactory, this.mappingMongoConverter); - if (this.applicationContext != null) { - this.mongoTemplate.setApplicationContext(this.applicationContext); - } + /** + * Convenient injection point for expiry callbacks in the message store. Each of the callbacks provided will simply + * be registered with the store using {@link #registerMessageGroupExpiryCallback(MessageGroupCallback)}. + * + * @param expiryCallbacks the expiry callbacks to add + */ + public void setExpiryCallbacks(Collection expiryCallbacks) { + for (MessageGroupCallback callback : expiryCallbacks) { + registerMessageGroupExpiryCallback(callback); } - - IndexOperations indexOperations = this.mongoTemplate.indexOps(this.collectionName); - indexOperations.ensureIndex(new Index(MESSAGE_ID, Order.ASCENDING)); - indexOperations.ensureIndex(new Index(GROUP_ID, Order.ASCENDING).on(LAST_MODIFIED_TIME, Order.DESCENDING)); } + public boolean isTimeoutOnIdle() { + return timeoutOnIdle; + } - @Override - public Message getMessage(UUID id) { - Assert.notNull(id, "'id' must not be null"); - MessageDocument document = this.mongoTemplate.findOne(Query.query(Criteria.where(MESSAGE_ID).is(id)), - MessageDocument.class, this.collectionName); - return (document != null) ? document.getMessage() : null; + /** + * Allows you to override the rule for the timeout calculation. Typical timeout is based from the time + * the {@link MessageGroup} was created. If you want the timeout to be based on the time + * the {@link MessageGroup} was idling (e.g., inactive from the last update) invoke this method with 'true'. + * Default is 'false'. + * + * @param timeoutOnIdle The boolean. + */ + public void setTimeoutOnIdle(boolean timeoutOnIdle) { + this.timeoutOnIdle = timeoutOnIdle; } @Override @@ -179,56 +123,28 @@ public Message addMessage(Message message) { return message; } - @SuppressWarnings({ "rawtypes", "unchecked" }) - private void addMessageDocument(MessageDocument document) { - Message message = document.getMessage(); - if (message.getHeaders().containsKey(SAVED_KEY)) { - Message saved = getMessage(message.getHeaders().getId()); - if (saved != null) { - if (saved.equals(message)) { - return; - } // We need to save it under its own id - } - } - - final long createdDate = document.getCreatedTime() == 0 ? System.currentTimeMillis() : document.getCreatedTime(); - - Message result = this.getMessageBuilderFactory().fromMessage(message).setHeader(SAVED_KEY, Boolean.TRUE) - .setHeader(CREATED_DATE_KEY, createdDate).build(); - - Map innerMap = (Map) new DirectFieldAccessor(result.getHeaders()).getPropertyValue("headers"); - // using reflection to set ID since it is immutable through MessageHeaders - innerMap.put(MessageHeaders.ID, message.getHeaders().get(MessageHeaders.ID)); - innerMap.put(MessageHeaders.TIMESTAMP, message.getHeaders().get(MessageHeaders.TIMESTAMP)); - - document.setCreatedTime(createdDate); - this.mongoTemplate.insert(document, this.collectionName); - } - @Override public Message removeMessage(UUID id) { Assert.notNull(id, "'id' must not be null"); - MessageDocument document = this.mongoTemplate.findAndRemove(Query.query(Criteria.where(MESSAGE_ID).is(id)), - MessageDocument.class, this.collectionName); + Query query = Query.query(Criteria.where(MessageDocumentFields.MESSAGE_ID).is(id)); + MessageDocument document = this.mongoTemplate.findAndRemove(query, MessageDocument.class, this.collectionName); return (document != null) ? document.getMessage() : null; } @Override public long getMessageCount() { - return this.mongoTemplate.getCollection(this.collectionName).getCount(); + Query query = Query.query(Criteria.where(MessageDocumentFields.MESSAGE_ID).exists(true) + .and(MessageDocumentFields.GROUP_ID).exists(false)); + return this.mongoTemplate.getCollection(this.collectionName).count(query.getQueryObject()); } - @Override - public int messageGroupSize(Object groupId) { - long lCount = this.mongoTemplate.count(groupIdQuery(groupId), this.collectionName); - Assert.isTrue(lCount <= Integer.MAX_VALUE, "Message count is out of Integer's range"); - return (int) lCount; - } - @Override public MessageGroup getMessageGroup(Object groupId) { - List messageDocuments = this.mongoTemplate.find(groupIdQuery(groupId), MessageDocument.class, + Assert.notNull(groupId, "'groupId' must not be null"); + + Query query = groupOrderQuery(groupId); + List messageDocuments = this.mongoTemplate.find(query, MessageDocument.class, this.collectionName); long createdTime = 0; @@ -256,209 +172,189 @@ public MessageGroup getMessageGroup(Object groupId) { } @Override - public MessageGroup addMessageToGroup(Object groupId, Message message) { + public MessageGroup addMessageToGroup(final Object groupId, final Message message) { Assert.notNull(groupId, "'groupId' must not be null"); Assert.notNull(message, "'message' must not be null"); - MessageDocument messageDocument = this.mongoTemplate.findOne(groupIdQuery(groupId), MessageDocument.class, - this.collectionName); - long createdTime = 0; - int lastReleasedSequence = 0; - boolean complete = false; + return this.mongoTemplate.executeInSession(new DbCallback() { - if (messageDocument != null) { - createdTime = messageDocument.getCreatedTime(); - lastReleasedSequence = messageDocument.getLastReleasedSequence(); - complete = messageDocument.isComplete(); - } + @Override + public MessageGroup doInDB(DB db) throws MongoException, DataAccessException { + Query query = groupOrderQuery(groupId); + MessageDocument messageDocument = mongoTemplate.findOne(query, MessageDocument.class, collectionName); + + long createdTime = 0; + int lastReleasedSequence = 0; + boolean complete = false; + + if (messageDocument != null) { + createdTime = messageDocument.getCreatedTime(); + lastReleasedSequence = messageDocument.getLastReleasedSequence(); + complete = messageDocument.isComplete(); + } - MessageDocument document = new MessageDocument(message); - document.setGroupId(groupId); - document.setComplete(complete); - document.setLastReleasedSequence(lastReleasedSequence); - document.setCreatedTime(createdTime == 0 ? System.currentTimeMillis() : createdTime); - document.setLastModifiedTime(System.currentTimeMillis()); + MessageDocument document = new MessageDocument(message); + document.setGroupId(groupId); + document.setComplete(complete); + document.setLastReleasedSequence(lastReleasedSequence); + document.setCreatedTime(createdTime == 0 ? System.currentTimeMillis() : createdTime); + document.setLastModifiedTime(System.currentTimeMillis()); + document.setSequence(getNextId()); - this.addMessageDocument(document); + addMessageDocument(document); - return this.getMessageGroup(groupId); + return getMessageGroup(groupId); + } + }); } @Override - public MessageGroup removeMessageFromGroup(Object groupId, Message messageToRemove) { + public MessageGroup removeMessageFromGroup(final Object groupId, final Message messageToRemove) { Assert.notNull(groupId, "'groupId' must not be null"); Assert.notNull(messageToRemove, "'messageToRemove' must not be null"); - Query query = groupIdQuery(groupId).addCriteria(Criteria.where(MESSAGE_ID).is(messageToRemove.getHeaders().getId())); - this.mongoTemplate.remove(query, this.collectionName); - this.updateGroup(groupId, lastModifiedUpdate()); - return this.getMessageGroup(groupId); - } - @Override - public void removeMessageGroup(Object groupId) { - this.mongoTemplate.remove(groupIdQuery(groupId), this.collectionName); - } + return this.mongoTemplate.executeInSession(new DbCallback() { - @Override - @SuppressWarnings({ "rawtypes" }) - public Iterator iterator() { - Map messageGroupMap = new HashMap(); - Query query = Query.query(Criteria.where(GROUP_ID).exists(true)); - query.fields().include(GROUP_ID); - List groupIds = this.mongoTemplate.find(query, Map.class, this.collectionName); - for (Map groupId : groupIds) { - Object key = groupId.get(GROUP_ID); - if (!messageGroupMap.containsKey(key)) { - messageGroupMap.put(key, this.getMessageGroup(groupId)); + @Override + public MessageGroup doInDB(DB db) throws MongoException, DataAccessException { + Query query = groupIdQuery(groupId) + .addCriteria(Criteria.where(MessageDocumentFields.MESSAGE_ID).is(messageToRemove.getHeaders().getId())); + mongoTemplate.remove(query, collectionName); + updateGroup(groupId, lastModifiedUpdate()); + return getMessageGroup(groupId); } - } - return messageGroupMap.values().iterator(); + }); } @Override - public Message pollMessageFromGroup(Object groupId) { + public Message pollMessageFromGroup(final Object groupId) { Assert.notNull(groupId, "'groupId' must not be null"); - Query query = groupIdQuery(groupId).with(new Sort(Sort.Direction.ASC, LAST_MODIFIED_TIME)); - MessageDocument document = this.mongoTemplate.findAndRemove(query, MessageDocument.class, this.collectionName); - Message message = null; - if (document != null) { - message = document.getMessage(); - this.updateGroup(groupId, lastModifiedUpdate()); - } - return message; + + return this.mongoTemplate.executeInSession(new DbCallback>() { + + @Override + public Message doInDB(DB db) throws MongoException, DataAccessException { + Sort sort = new Sort(MessageDocumentFields.LAST_MODIFIED_TIME, MessageDocumentFields.SEQUENCE); + Query query = groupIdQuery(groupId).with(sort); + MessageDocument document = mongoTemplate.findAndRemove(query, MessageDocument.class, collectionName); + Message message = null; + if (document != null) { + message = document.getMessage(); + updateGroup(groupId, lastModifiedUpdate()); + } + return message; + } + }); } @Override public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { - this.updateGroup(groupId, lastModifiedUpdate().set(LAST_RELEASED_SEQUENCE, sequenceNumber)); + this.updateGroup(groupId, lastModifiedUpdate().set(MessageDocumentFields.LAST_RELEASED_SEQUENCE, sequenceNumber)); } @Override public void completeGroup(Object groupId) { - this.updateGroup(groupId, lastModifiedUpdate().set(COMPLETE, true)); + this.updateGroup(groupId, lastModifiedUpdate().set(MessageDocumentFields.COMPLETE, true)); } + @Override + public Iterator iterator() { + return this.mongoTemplate.executeInSession(new DbCallback>() { - private void updateGroup(Object groupId, Update update) { - this.mongoTemplate.updateFirst(groupIdQuery(groupId), update, this.collectionName); - } + @Override + public Iterator doInDB(DB db) throws MongoException, DataAccessException { + List messageGroups = new ArrayList(); - private static Update lastModifiedUpdate() { - return Update.update(LAST_MODIFIED_TIME, System.currentTimeMillis()); - } + Query query = Query.query(Criteria.where(MessageDocumentFields.GROUP_ID).exists(true)); + @SuppressWarnings("rawtypes") + List groupIds = mongoTemplate.getCollection(collectionName) + .distinct(MessageDocumentFields.GROUP_ID, query.getQueryObject()); + for (Object groupId : groupIds) { + messageGroups.add(getMessageGroup(groupId)); + } - private static Query groupIdQuery(Object groupId) { - return Query.query(Criteria.where(GROUP_ID).is(groupId)); + return messageGroups.iterator(); + } + }); } - /** - * The entity class to wrap {@link Message} to the MongoDB document. - */ - private static class MessageDocument { - - /* - * Needed as a persistence property to suppress 'Cannot determine IsNewStrategy' MappingException - * when the application context is configured with auditing. The document is not - * currently Auditable. - */ - @SuppressWarnings("unused") - @Id - private String _id; - - private final Message message; - - @SuppressWarnings("unused") - private final UUID messageId; - - private volatile Long createdTime = 0L; - - @SuppressWarnings("unused") - private volatile Object groupId; - - private volatile Long lastModifiedTime = 0L; - - private volatile Boolean complete = false; - - private volatile Integer lastReleasedSequence = 0; - - public MessageDocument(Message message) { - Assert.notNull(message, "'message' must not be null"); - this.message = message; - this.messageId = message.getHeaders().getId(); - } - - public Message getMessage() { - return message; - } - - public void setGroupId(Object groupId) { - this.groupId = groupId; - } + @Override + public void registerMessageGroupExpiryCallback(MessageGroupCallback callback) { + expiryCallbacks.add(callback); + } - public Long getLastModifiedTime() { - return lastModifiedTime; - } + @Override + public int expireMessageGroups(long timeout) { + int count = 0; + long threshold = System.currentTimeMillis() - timeout; + for (MessageGroup group : this) { + + long timestamp = group.getTimestamp(); + if (this.isTimeoutOnIdle() && group.getLastModified() > 0) { + timestamp = group.getLastModified(); + } - public void setLastModifiedTime(long lastModifiedTime) { - this.lastModifiedTime = lastModifiedTime; + if (timestamp <= threshold) { + count++; + expire(group); + } } + return count; + } - public Long getCreatedTime() { - return createdTime; - } + @Override + @ManagedAttribute + public int getMessageCountForAllMessageGroups() { + Query query = Query.query(Criteria.where(MessageDocumentFields.MESSAGE_ID).exists(true) + .and(MessageDocumentFields.GROUP_ID).exists(true)); + long count = this.mongoTemplate.count(query, this.collectionName); + Assert.isTrue(count <= Integer.MAX_VALUE, "Message count is out of Integer's range"); + return (int) count; + } - public void setCreatedTime(long createdTime) { - this.createdTime = createdTime; - } + @Override + @ManagedAttribute + public int getMessageGroupCount() { + Query query = Query.query(Criteria.where(MessageDocumentFields.GROUP_ID).exists(true)); + return this.mongoTemplate.getCollection(this.collectionName) + .distinct(MessageDocumentFields.GROUP_ID, query.getQueryObject()) + .size(); + } - public Boolean isComplete() { - return complete; - } + private void expire(MessageGroup group) { - public void setComplete(boolean complete) { - this.complete = complete; - } + RuntimeException exception = null; - public Integer getLastReleasedSequence() { - return lastReleasedSequence; + for (MessageGroupCallback callback : expiryCallbacks) { + try { + callback.execute(this, group); + } + catch (RuntimeException e) { + if (exception == null) { + exception = e; + } + logger.error("Exception in expiry callback", e); + } } - public void setLastReleasedSequence(int lastReleasedSequence) { - this.lastReleasedSequence = lastReleasedSequence; + if (exception != null) { + throw exception; } - } - /** - * A {@link GenericConverter} implementation to convert {@link Message} to - * serialized {@link byte[]} to store {@link Message} to the MongoDB. - * And vice versa - to convert {@link byte[]} from the MongoDB to the {@link Message}. - */ - private static class MongoDbMessageBytesConverter implements GenericConverter { - private final Converter serializingConverter = new SerializingConverter(); - - private final Converter deserializingConverter = new DeserializingConverter(); - - @Override - public Set getConvertibleTypes() { - Set convertiblePairs = new HashSet(); - convertiblePairs.add(new ConvertiblePair(Message.class, byte[].class)); - convertiblePairs.add(new ConvertiblePair(byte[].class, Message.class)); - return convertiblePairs; - } + private void updateGroup(Object groupId, Update update) { + this.mongoTemplate.updateFirst(groupOrderQuery(groupId), update, this.collectionName); + } - @Override - public Object convert(Object source, TypeDescriptor sourceType, TypeDescriptor targetType) { - if (Message.class.isAssignableFrom(sourceType.getObjectType())) { - return serializingConverter.convert(source); - } - else { - return deserializingConverter.convert((byte[]) source); - } - } + private static Update lastModifiedUpdate() { + return Update.update(MessageDocumentFields.LAST_MODIFIED_TIME, System.currentTimeMillis()); + } + private static Query groupOrderQuery(Object groupId) { + Sort sort = new Sort(Sort.Direction.DESC, MessageDocumentFields.LAST_MODIFIED_TIME, MessageDocumentFields.SEQUENCE); + return groupIdQuery(groupId).with(sort); } } diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MessageDocument.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MessageDocument.java new file mode 100644 index 00000000000..9cc3e092de9 --- /dev/null +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MessageDocument.java @@ -0,0 +1,120 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.mongodb.store; + +import java.util.UUID; + +import org.springframework.data.annotation.Id; +import org.springframework.data.mongodb.core.mapping.Document; +import org.springframework.messaging.Message; +import org.springframework.util.Assert; + +/** + * The entity class to wrap {@link org.springframework.messaging.Message} to the MongoDB document. + * + * @author Artem Bilan + * @since 4.0 + */ +@Document +public class MessageDocument { + + /* + * Needed as a persistence property to suppress 'Cannot determine IsNewStrategy' MappingException + * when the application context is configured with auditing. The document is not + * currently Auditable. + */ + @SuppressWarnings("unused") + @Id + private String _id; + + private final Message message; + + @SuppressWarnings("unused") + private final UUID messageId; + + @SuppressWarnings("unused") + private Integer priority; + + private Long createdTime = 0L; + + @SuppressWarnings("unused") + private Object groupId; + + private Long lastModifiedTime = 0L; + + private Boolean complete = false; + + private Integer lastReleasedSequence = 0; + + @SuppressWarnings("unused") + private int sequence; + + public MessageDocument(Message message) { + Assert.notNull(message, "'message' must not be null"); + this.message = message; + this.messageId = message.getHeaders().getId(); + } + + public Message getMessage() { + return message; + } + + public void setGroupId(Object groupId) { + this.groupId = groupId; + } + + public void setPriority(Integer priority) { + this.priority = priority; + } + + public Long getLastModifiedTime() { + return lastModifiedTime; + } + + public void setLastModifiedTime(long lastModifiedTime) { + this.lastModifiedTime = lastModifiedTime; + } + + public Long getCreatedTime() { + return createdTime; + } + + public void setCreatedTime(long createdTime) { + this.createdTime = createdTime; + } + + public Boolean isComplete() { + return complete; + } + + public void setComplete(boolean complete) { + this.complete = complete; + } + + public Integer getLastReleasedSequence() { + return lastReleasedSequence; + } + + public void setLastReleasedSequence(int lastReleasedSequence) { + this.lastReleasedSequence = lastReleasedSequence; + } + + public void setSequence(int sequence) { + this.sequence = sequence; + } + +} diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MessageDocumentFields.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MessageDocumentFields.java new file mode 100644 index 00000000000..331590d86c3 --- /dev/null +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MessageDocumentFields.java @@ -0,0 +1,43 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.mongodb.store; + +/** + * @author Artem Bilan + * + * @since 4.0 + */ +public final class MessageDocumentFields { + + public static final String MESSAGE_ID = "messageId"; + + public static final String PRIORITY = "priority"; + + public static final String GROUP_ID = "groupId"; + + public static final String LAST_MODIFIED_TIME = "lastModifiedTime"; + + public static final String SEQUENCE = "sequence"; + + public static final String LAST_RELEASED_SEQUENCE = "lastReleasedSequence"; + + public static final String COMPLETE = "complete"; + + private MessageDocumentFields() { + } + +} diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbChannelMessageStore.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbChannelMessageStore.java new file mode 100644 index 00000000000..b9d6d4ff01e --- /dev/null +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbChannelMessageStore.java @@ -0,0 +1,143 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.mongodb.store; + +import org.springframework.data.domain.Sort; +import org.springframework.data.mongodb.MongoDbFactory; +import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.data.mongodb.core.convert.MappingMongoConverter; +import org.springframework.data.mongodb.core.index.Index; +import org.springframework.data.mongodb.core.query.Order; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.integration.IntegrationMessageHeaderAccessor; +import org.springframework.integration.store.MessageGroup; +import org.springframework.integration.store.PriorityCapableChannelMessageStore; +import org.springframework.integration.store.SimpleMessageGroup; +import org.springframework.messaging.Message; +import org.springframework.util.Assert; + +/** + * MongoDB {@link PriorityCapableChannelMessageStore} implementation. + * This message store shall be used for message channels only. + * + *

Provide the {@link #priorityEnabled} option to allow to poll messages via {@code priority} manner. + * + *

As a priority document field the {@link org.springframework.integration.IntegrationMessageHeaderAccessor#PRIORITY} + * message header is used. + * + *

The same collection can be used for {@link org.springframework.integration.channel.QueueChannel}s and + * {@link org.springframework.integration.channel.PriorityChannel}s, but the different instances of + * {@link MongoDbChannelMessageStore} should be used for those cases, and the last one with + * {@code priorityEnabled = true} option. + * + * @author Artem Bilan + * @since 4.0 + */ +public class MongoDbChannelMessageStore extends AbstractConfigurableMongoDbMessageStore + implements PriorityCapableChannelMessageStore { + + public final static String DEFAULT_COLLECTION_NAME = "channelMessages"; + + private volatile boolean priorityEnabled; + + public MongoDbChannelMessageStore(MongoTemplate mongoTemplate) { + this(mongoTemplate, DEFAULT_COLLECTION_NAME); + } + + public MongoDbChannelMessageStore(MongoTemplate mongoTemplate, String collectionName) { + super(mongoTemplate, collectionName); + } + + public MongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) { + this(mongoDbFactory, null, DEFAULT_COLLECTION_NAME); + } + + public MongoDbChannelMessageStore(MongoDbFactory mongoDbFactory, MappingMongoConverter mappingMongoConverter) { + this(mongoDbFactory, mappingMongoConverter, DEFAULT_COLLECTION_NAME); + } + + public MongoDbChannelMessageStore(MongoDbFactory mongoDbFactory, String collectionName) { + this(mongoDbFactory, null, collectionName); + } + + public MongoDbChannelMessageStore(MongoDbFactory mongoDbFactory, MappingMongoConverter mappingMongoConverter, String collectionName) { + super(mongoDbFactory, mappingMongoConverter, collectionName); + } + + public void setPriorityEnabled(boolean priorityEnabled) { + this.priorityEnabled = priorityEnabled; + } + + @Override + public boolean isPriorityEnabled() { + return this.priorityEnabled; + } + + @Override + public void afterPropertiesSet() throws Exception { + super.afterPropertiesSet(); + this.mongoTemplate.indexOps(this.collectionName) + .ensureIndex(new Index(MessageDocumentFields.GROUP_ID, Order.ASCENDING) + .on(MessageDocumentFields.PRIORITY, Order.DESCENDING) + .on(MessageDocumentFields.LAST_MODIFIED_TIME, Order.ASCENDING) + .on(MessageDocumentFields.SEQUENCE, Order.ASCENDING)); + } + + @Override + public MessageGroup addMessageToGroup(Object groupId, Message message) { + Assert.notNull(groupId, "'groupId' must not be null"); + Assert.notNull(message, "'message' must not be null"); + + MessageDocument document = new MessageDocument(message); + document.setGroupId(groupId); + document.setCreatedTime(System.currentTimeMillis()); + document.setLastModifiedTime(System.currentTimeMillis()); + if (this.priorityEnabled) { + document.setPriority(new IntegrationMessageHeaderAccessor(message).getPriority()); + } + document.setSequence(this.getNextId()); + + this.addMessageDocument(document); + return this.getMessageGroup(groupId); + } + + /** + * Not fully used. Only wraps the provided group id. + */ + @Override + public MessageGroup getMessageGroup(Object groupId) { + return new SimpleMessageGroup(groupId); + } + + @Override + public Message pollMessageFromGroup(Object groupId) { + Assert.notNull(groupId, "'groupId' must not be null"); + + Sort sort = new Sort(MessageDocumentFields.LAST_MODIFIED_TIME, MessageDocumentFields.SEQUENCE); + if (this.priorityEnabled) { + sort = new Sort(Sort.Direction.DESC, MessageDocumentFields.PRIORITY).and(sort); + } + Query query = groupIdQuery(groupId).with(sort); + MessageDocument document = this.mongoTemplate.findAndRemove(query, MessageDocument.class, this.collectionName); + Message message = null; + if (document != null) { + message = document.getMessage(); + } + return message; + } + +} diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java index e050d6782fd..1e105dbfb6a 100644 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java @@ -16,11 +16,6 @@ package org.springframework.integration.mongodb.store; -import static org.springframework.data.mongodb.core.query.Criteria.where; -import static org.springframework.integration.history.MessageHistory.NAME_PROPERTY; -import static org.springframework.integration.history.MessageHistory.TIMESTAMP_PROPERTY; -import static org.springframework.integration.history.MessageHistory.TYPE_PROPERTY; - import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -31,6 +26,12 @@ import java.util.Set; import java.util.UUID; +import com.mongodb.BasicDBList; +import com.mongodb.BasicDBObject; +import com.mongodb.DB; +import com.mongodb.DBObject; +import com.mongodb.MongoException; + import org.springframework.beans.BeansException; import org.springframework.beans.DirectFieldAccessor; import org.springframework.beans.factory.BeanClassLoaderAware; @@ -42,19 +43,25 @@ import org.springframework.core.convert.converter.GenericConverter; import org.springframework.core.serializer.support.DeserializingConverter; import org.springframework.core.serializer.support.SerializingConverter; +import org.springframework.dao.DataAccessException; import org.springframework.data.annotation.Id; import org.springframework.data.annotation.Transient; import org.springframework.data.convert.WritingConverter; import org.springframework.data.domain.Sort; -import org.springframework.data.domain.Sort.Direction; import org.springframework.data.mapping.context.MappingContext; import org.springframework.data.mongodb.MongoDbFactory; +import org.springframework.data.mongodb.core.DbCallback; +import org.springframework.data.mongodb.core.FindAndModifyOptions; +import org.springframework.data.mongodb.core.IndexOperations; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.convert.CustomConversions; import org.springframework.data.mongodb.core.convert.MappingMongoConverter; +import org.springframework.data.mongodb.core.index.Index; import org.springframework.data.mongodb.core.mapping.MongoMappingContext; import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity; import org.springframework.data.mongodb.core.mapping.MongoPersistentProperty; +import org.springframework.data.mongodb.core.query.Criteria; +import org.springframework.data.mongodb.core.query.Order; import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Update; import org.springframework.integration.history.MessageHistory; @@ -74,10 +81,6 @@ import org.springframework.util.ClassUtils; import org.springframework.util.StringUtils; -import com.mongodb.BasicDBList; -import com.mongodb.BasicDBObject; -import com.mongodb.DBObject; - /** * An implementation of both the {@link MessageStore} and {@link MessageGroupStore} @@ -96,6 +99,19 @@ public class MongoDbMessageStore extends AbstractMessageGroupStore private final static String DEFAULT_COLLECTION_NAME = "messages"; + public final static String SEQUENCE_NAME = "messagesSequence"; + + /** + * The name of the message header that stores a flag to indicate that the message has been saved. This is an + * optimization for the put method. + */ + public static final String SAVED_KEY = ConfigurableMongoDbMessageStore.class.getSimpleName() + ".SAVED"; + + /** + * The name of the message header that stores a timestamp for the time the message was inserted. + */ + public static final String CREATED_DATE_KEY = ConfigurableMongoDbMessageStore.class.getSimpleName() + ".CREATED_DATE"; + private final static String GROUP_ID_KEY = "_groupId"; private final static String GROUP_COMPLETE_KEY = "_group_complete"; @@ -108,6 +124,8 @@ public class MongoDbMessageStore extends AbstractMessageGroupStore private final static String CREATED_DATE = "_createdDate"; + private static final String SEQUENCE = "sequence"; + private final MongoTemplate template; @@ -161,15 +179,53 @@ public void afterPropertiesSet() throws Exception { this.converter.setApplicationContext(this.applicationContext); } this.converter.afterPropertiesSet(); + + IndexOperations indexOperations = this.template.indexOps(this.collectionName); + + indexOperations.ensureIndex(new Index(GROUP_ID_KEY, Order.ASCENDING) + .on(GROUP_UPDATE_TIMESTAMP_KEY, Order.DESCENDING) + .on(SEQUENCE, Order.DESCENDING)); } @Override public Message addMessage(Message message) { Assert.notNull(message, "'message' must not be null"); - this.template.insert(new MessageWrapper(message), this.collectionName); + this.addMessageDocument(new MessageWrapper(message)); return message; } + private void addMessageDocument(final MessageWrapper document) { + this.template.executeInSession(new DbCallback() { + @Override + public Void doInDB(DB db) throws MongoException, DataAccessException { + Message message = document.getMessage(); + if (message.getHeaders().containsKey(SAVED_KEY)) { + Message saved = getMessage(message.getHeaders().getId()); + if (saved != null) { + if (saved.equals(message)) { + return null; + } // We need to save it under its own id + } + } + + final long createdDate = document.get_Group_timestamp() == 0 ? System.currentTimeMillis() : document.get_Group_timestamp(); + + Message result = getMessageBuilderFactory().fromMessage(message).setHeader(SAVED_KEY, Boolean.TRUE) + .setHeader(CREATED_DATE_KEY, createdDate).build(); + + @SuppressWarnings("unchecked") + Map innerMap = (Map) new DirectFieldAccessor(result.getHeaders()).getPropertyValue("headers"); + // using reflection to set ID since it is immutable through MessageHeaders + innerMap.put(MessageHeaders.ID, message.getHeaders().get(MessageHeaders.ID)); + innerMap.put(MessageHeaders.TIMESTAMP, message.getHeaders().get(MessageHeaders.TIMESTAMP)); + + document.set_Group_timestamp(createdDate); + template.insert(document, collectionName); + return null; + } + }); + } + @Override public Message getMessage(UUID id) { Assert.notNull(id, "'id' must not be null"); @@ -193,16 +249,17 @@ public Message removeMessage(UUID id) { @Override public MessageGroup getMessageGroup(Object groupId) { Assert.notNull(groupId, "'groupId' must not be null"); - List messageWrappers = this.template.find(whereGroupIdIs(groupId), MessageWrapper.class, this.collectionName); + Query query = whereGroupIdOrder(groupId); + List messageWrappers = this.template.find(query, MessageWrapper.class, this.collectionName); List> messages = new ArrayList>(); long timestamp = 0; - long lastmodified = 0; + long lastModified = 0; int lastReleasedSequenceNumber = 0; boolean completeGroup = false; if (messageWrappers.size() > 0){ MessageWrapper messageWrapper = messageWrappers.get(0); timestamp = messageWrapper.get_Group_timestamp(); - lastmodified = messageWrapper.get_Group_update_timestamp(); + lastModified = messageWrapper.get_Group_update_timestamp(); completeGroup = messageWrapper.get_Group_complete(); lastReleasedSequenceNumber = messageWrapper.get_LastReleasedSequenceNumber(); } @@ -212,7 +269,7 @@ public MessageGroup getMessageGroup(Object groupId) { } SimpleMessageGroup messageGroup = new SimpleMessageGroup(messages, groupId, timestamp, completeGroup); - messageGroup.setLastModified(lastmodified); + messageGroup.setLastModified(lastModified); if (lastReleasedSequenceNumber > 0){ messageGroup.setLastReleasedMessageSequenceNumber(lastReleasedSequenceNumber); } @@ -221,97 +278,140 @@ public MessageGroup getMessageGroup(Object groupId) { } @Override - public MessageGroup addMessageToGroup(Object groupId, Message message) { + public MessageGroup addMessageToGroup(final Object groupId, final Message message) { Assert.notNull(groupId, "'groupId' must not be null"); Assert.notNull(message, "'message' must not be null"); - MessageGroup messageGroup = this.getMessageGroup(groupId); + return this.template.executeInSession(new DbCallback() { - long messageGroupTimestamp = messageGroup.getTimestamp(); - long lastModified = messageGroup.getLastModified(); + @Override + public MessageGroup doInDB(DB db) throws MongoException, DataAccessException { + Query query = whereGroupIdOrder(groupId); + MessageWrapper messageDocument = template.findOne(query, MessageWrapper.class, collectionName); - if (messageGroupTimestamp == 0){ - messageGroupTimestamp = System.currentTimeMillis(); - lastModified = messageGroupTimestamp; - } - else { - lastModified = System.currentTimeMillis(); - } + long createdTime = 0; + int lastReleasedSequence = 0; + boolean complete = false; + + if (messageDocument != null) { + createdTime = messageDocument.get_Group_timestamp(); + lastReleasedSequence = messageDocument.get_LastReleasedSequenceNumber(); + complete = messageDocument.get_Group_complete(); + } - MessageWrapper wrapper = new MessageWrapper(message); - wrapper.set_GroupId(groupId); - wrapper.set_Group_timestamp(messageGroupTimestamp); - wrapper.set_Group_update_timestamp(lastModified); - wrapper.set_Group_complete(messageGroup.isComplete()); - wrapper.set_LastReleasedSequenceNumber(messageGroup.getLastReleasedMessageSequenceNumber()); - this.template.insert(wrapper, this.collectionName); - return this.getMessageGroup(groupId); + MessageWrapper wrapper = new MessageWrapper(message); + wrapper.set_GroupId(groupId); + wrapper.set_Group_timestamp(createdTime == 0 ? System.currentTimeMillis() : createdTime); + wrapper.set_Group_update_timestamp(System.currentTimeMillis()); + wrapper.set_Group_complete(complete); + wrapper.set_LastReleasedSequenceNumber(lastReleasedSequence); + wrapper.setSequence(getNextId()); + + addMessageDocument(wrapper); + return getMessageGroup(groupId); + } + }); } @Override - public MessageGroup removeMessageFromGroup(Object groupId, Message messageToRemove) { + public MessageGroup removeMessageFromGroup(final Object groupId, final Message messageToRemove) { Assert.notNull(groupId, "'groupId' must not be null"); Assert.notNull(messageToRemove, "'messageToRemove' must not be null"); - this.template.findAndRemove(whereMessageIdIsAndGroupIdIs( - messageToRemove.getHeaders().getId(), groupId), MessageWrapper.class, this.collectionName); - this.updateGroup(groupId); - return this.getMessageGroup(groupId); + + return this.template.executeInSession(new DbCallback() { + + @Override + public MessageGroup doInDB(DB db) throws MongoException, DataAccessException { + template.findAndRemove(whereMessageIdIsAndGroupIdIs(messageToRemove.getHeaders().getId(), groupId), + MessageWrapper.class, collectionName); + updateGroup(groupId, lastModifiedUpdate()); + return getMessageGroup(groupId); + } + }); } @Override public void removeMessageGroup(Object groupId) { - List messageWrappers = this.template.find(whereGroupIdIs(groupId), MessageWrapper.class, this.collectionName); - for (MessageWrapper messageWrapper : messageWrappers) { - this.removeMessageFromGroup(groupId, messageWrapper.getMessage()); - } + this.template.remove(whereGroupIdIs(groupId), this.collectionName); } @Override public Iterator iterator() { - List groupedMessages = this.template.find(whereGroupIdExists(), MessageWrapper.class, this.collectionName); - Map messageGroups = new HashMap(); - for (MessageWrapper groupedMessage : groupedMessages) { - Object groupId = groupedMessage.get_GroupId(); - if (!messageGroups.containsKey(groupId)) { - messageGroups.put(groupId, this.getMessageGroup(groupId)); + return this.template.executeInSession(new DbCallback>() { + + @Override + public Iterator doInDB(DB db) throws MongoException, DataAccessException { + List messageGroups = new ArrayList(); + + Query query = Query.query(Criteria.where(GROUP_ID_KEY).exists(true)); + @SuppressWarnings("rawtypes") + List groupIds = template.getCollection(collectionName) + .distinct(GROUP_ID_KEY, query.getQueryObject()); + + for (Object groupId : groupIds) { + messageGroups.add(getMessageGroup(groupId)); + } + + return messageGroups.iterator(); } - } - return messageGroups.values().iterator(); + }); + } + @Override + public Message pollMessageFromGroup(final Object groupId) { + Assert.notNull(groupId, "'groupId' must not be null"); + return this.template.executeInSession(new DbCallback>() { + @Override + public Message doInDB(DB db) throws MongoException, DataAccessException { + Query query = whereGroupIdIs(groupId).with(new Sort(GROUP_UPDATE_TIMESTAMP_KEY, SEQUENCE)); + MessageWrapper messageWrapper = template.findAndRemove(query, MessageWrapper.class, collectionName); + Message message = null; + if (messageWrapper != null) { + message = messageWrapper.getMessage(); + } + updateGroup(groupId, lastModifiedUpdate()); + return message; + } + }); } @Override - public void completeGroup(Object groupId) { - Update update = Update.update(GROUP_COMPLETE_KEY, true); - Query q = whereGroupIdIs(groupId); - this.template.updateFirst(q, update, this.collectionName); - this.updateGroup(groupId); + public int messageGroupSize(Object groupId) { + long lCount = this.template.count(new Query(Criteria.where(GROUP_ID_KEY).is(groupId)), this.collectionName); + Assert.isTrue(lCount <= Integer.MAX_VALUE, "Message count is out of Integer's range"); + return (int) lCount; } @Override public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { - Update update = Update.update(LAST_RELEASED_SEQUENCE_NUMBER, sequenceNumber); - Query q = whereGroupIdIs(groupId); - this.template.updateFirst(q, update, this.collectionName); - this.updateGroup(groupId); + this.updateGroup(groupId, lastModifiedUpdate().set(LAST_RELEASED_SEQUENCE_NUMBER, sequenceNumber)); } @Override - public Message pollMessageFromGroup(Object groupId) { - Assert.notNull(groupId, "'groupId' must not be null"); - MessageWrapper messageWrapper = this.template.findAndRemove(whereGroupIdIsOrdered(groupId), MessageWrapper.class, this.collectionName); - Message message = null; - if (messageWrapper != null) { - message = messageWrapper.getMessage(); - } - this.updateGroup(groupId); - return message; + public void completeGroup(Object groupId) { + this.updateGroup(groupId, lastModifiedUpdate().set(GROUP_COMPLETE_KEY, true)); } @Override - public int messageGroupSize(Object groupId) { - long lCount = this.template.count(new Query(where(GROUP_ID_KEY).is(groupId)), this.collectionName); - Assert.isTrue(lCount <= Integer.MAX_VALUE, "Message count is out of Integer's range"); - return (int) lCount; + @ManagedAttribute + public int getMessageCountForAllMessageGroups() { + Query query = Query.query(Criteria.where(MessageDocumentFields.MESSAGE_ID).exists(true) + .and(MessageDocumentFields.GROUP_ID).exists(true)); + long count = this.template.count(query, this.collectionName); + Assert.isTrue(count <= Integer.MAX_VALUE, "Message count is out of Integer's range"); + return (int) count; + } + + @Override + @ManagedAttribute + public int getMessageGroupCount() { + Query query = Query.query(Criteria.where(MessageDocumentFields.GROUP_ID).exists(true)); + return this.template.getCollection(this.collectionName) + .distinct(MessageDocumentFields.GROUP_ID, query.getQueryObject()) + .size(); + } + + private static Update lastModifiedUpdate() { + return Update.update(GROUP_UPDATE_TIMESTAMP_KEY, System.currentTimeMillis()); } /* @@ -319,35 +419,36 @@ public int messageGroupSize(Object groupId) { */ private static Query whereMessageIdIs(UUID id) { - return new Query(where("headers.id._value").is(id.toString())); + return new Query(Criteria.where("headers.id._value").is(id.toString())); } private static Query whereMessageIdIsAndGroupIdIs(UUID id, Object groupId) { - return new Query(where("headers.id._value").is(id.toString()).and(GROUP_ID_KEY).is(groupId)); + return new Query(Criteria.where("headers.id._value").is(id.toString()).and(GROUP_ID_KEY).is(groupId)); } - private static Query whereGroupIdIs(Object groupId) { - Query q = new Query(where(GROUP_ID_KEY).is(groupId)); - q.with(new Sort(Direction.DESC, GROUP_UPDATE_TIMESTAMP_KEY)); - return q; - } - private static Query whereGroupIdExists() { - return new Query(where(GROUP_ID_KEY).exists(true)); + private static Query whereGroupIdOrder(Object groupId) { + return whereGroupIdIs(groupId).with(new Sort(Sort.Direction.DESC, GROUP_UPDATE_TIMESTAMP_KEY, SEQUENCE)); } - private static Query whereGroupIdIsOrdered(Object groupId) { - Query q = new Query(where(GROUP_ID_KEY).is(groupId)).limit(1); - q.with(new Sort(Direction.ASC, CREATED_DATE)); - return q; + private static Query whereGroupIdIs(Object groupId) { + return new Query(Criteria.where(GROUP_ID_KEY).is(groupId)); } - private void updateGroup(Object groupId) { - Update update = Update.update(GROUP_UPDATE_TIMESTAMP_KEY, System.currentTimeMillis()); - Query q = whereGroupIdIs(groupId); - this.template.updateFirst(q, update, this.collectionName); + private void updateGroup(Object groupId, Update update) { + Query query = whereGroupIdIs(groupId).with(new Sort(Sort.Direction.DESC, GROUP_UPDATE_TIMESTAMP_KEY, SEQUENCE)); + this.template.updateFirst(query, update, this.collectionName); } + private int getNextId() { + Query query = Query.query(Criteria.where("_id").is(SEQUENCE_NAME)); + query.fields().include(SEQUENCE); + return (Integer) this.template.findAndModify(query, + new Update().inc(SEQUENCE, 1), + FindAndModifyOptions.options().returnNew(true).upsert(true), + Map.class, + this.collectionName).get(SEQUENCE); + } /** * Custom implementation of the {@link MappingMongoConverter} strategy. @@ -516,9 +617,9 @@ public DBObject convert(MessageHistory source) { BasicDBList dbList = new BasicDBList(); for (Properties properties : source) { BasicDBObject dbo = new BasicDBObject(); - dbo.put(NAME_PROPERTY, properties.getProperty(NAME_PROPERTY)); - dbo.put(TYPE_PROPERTY, properties.getProperty(TYPE_PROPERTY)); - dbo.put(TIMESTAMP_PROPERTY, properties.getProperty(TIMESTAMP_PROPERTY)); + dbo.put(MessageHistory.NAME_PROPERTY, properties.getProperty(MessageHistory.NAME_PROPERTY)); + dbo.put(MessageHistory.TYPE_PROPERTY, properties.getProperty(MessageHistory.TYPE_PROPERTY)); + dbo.put(MessageHistory.TIMESTAMP_PROPERTY, properties.getProperty(MessageHistory.TIMESTAMP_PROPERTY)); dbList.add(dbo); } obj.put("components", dbList); @@ -654,11 +755,13 @@ private static final class MessageWrapper { @SuppressWarnings("unused") private final String _messageType; + @SuppressWarnings("unused") private final Object payload; @SuppressWarnings("unused") private final Map headers; + @SuppressWarnings("unused") private final Message inputMessage; private volatile long _group_timestamp; @@ -669,6 +772,9 @@ private static final class MessageWrapper { private volatile boolean _group_complete; + @SuppressWarnings("unused") + private int sequence; + public MessageWrapper(Message message) { Assert.notNull(message, "'message' must not be null"); this.message = message; @@ -726,5 +832,11 @@ public void set_LastReleasedSequenceNumber(int lastReleasedSequenceNumber) { public void set_Group_complete(boolean completedGroup) { this._group_complete = completedGroup; } + + public void setSequence(int sequence) { + this.sequence = sequence; + } + } + } diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageGroupStoreTests.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageGroupStoreTests.java index 821714313bd..5ef7e539034 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageGroupStoreTests.java +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/AbstractMongoDbMessageGroupStoreTests.java @@ -15,17 +15,14 @@ */ package org.springframework.integration.mongodb.store; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import java.util.Collection; import java.util.Iterator; import java.util.Properties; import java.util.UUID; +import com.mongodb.Mongo; import org.junit.Test; import org.springframework.context.support.ClassPathXmlApplicationContext; @@ -44,8 +41,6 @@ import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.GenericMessage; -import com.mongodb.Mongo; - /** * @author Oleg Zhurakousky * @author Gary Russell diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageGroupStoreTests.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageGroupStoreTests.java index 6140b0d8583..135956550e1 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageGroupStoreTests.java +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageGroupStoreTests.java @@ -15,10 +15,13 @@ */ package org.springframework.integration.mongodb.store; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; import java.util.Map; +import com.mongodb.DBObject; +import com.mongodb.Mongo; +import org.hamcrest.Matchers; import org.junit.Test; import org.springframework.context.support.ClassPathXmlApplicationContext; @@ -26,15 +29,15 @@ import org.springframework.core.convert.converter.Converter; import org.springframework.data.mongodb.MongoDbFactory; import org.springframework.data.mongodb.core.SimpleMongoDbFactory; +import org.springframework.integration.IntegrationMessageHeaderAccessor; +import org.springframework.integration.channel.PriorityChannel; +import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.mongodb.rules.MongoDbAvailable; import org.springframework.integration.store.MessageStore; import org.springframework.integration.support.MessageBuilder; import org.springframework.integration.test.util.TestUtils; import org.springframework.messaging.Message; -import com.mongodb.DBObject; -import com.mongodb.Mongo; - /** * @author Amol Nayak * @author Artem Bilan @@ -73,7 +76,7 @@ public void testWithAggregatorWithShutdown() throws Exception { @Test @MongoDbAvailable public void testWithCustomConverter() throws Exception { - this.prepareMongoFactory("testConfigurableMongoDbMessageStore"); + this.cleanupCollections(new SimpleMongoDbFactory(new Mongo(), "test")); ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("ConfigurableMongoDbMessageStore-CustomConverter.xml", this.getClass()); context.refresh(); @@ -84,6 +87,68 @@ public void testWithCustomConverter() throws Exception { context.close(); } + @Test + @MongoDbAvailable + public void testPriorityChannel() throws Exception { + this.cleanupCollections(new SimpleMongoDbFactory(new Mongo(), "test")); + ClassPathXmlApplicationContext context = + new ClassPathXmlApplicationContext("ConfigurableMongoDbMessageStore-CustomConverter.xml", this.getClass()); + context.refresh(); + + Object priorityChannel = context.getBean("priorityChannel"); + assertThat(priorityChannel, Matchers.not(Matchers.instanceOf(PriorityChannel.class))); + assertThat(priorityChannel, Matchers.instanceOf(QueueChannel.class)); + + QueueChannel channel = (QueueChannel) priorityChannel; + + Message message = MessageBuilder.withPayload("1").setHeader(IntegrationMessageHeaderAccessor.PRIORITY, 1).build(); + channel.send(message); + message = MessageBuilder.withPayload("-1").setHeader(IntegrationMessageHeaderAccessor.PRIORITY, -1).build(); + channel.send(message); + message = MessageBuilder.withPayload("3").setHeader(IntegrationMessageHeaderAccessor.PRIORITY, 3).build(); + channel.send(message); + message = MessageBuilder.withPayload("0").setHeader(IntegrationMessageHeaderAccessor.PRIORITY, 0).build(); + channel.send(message); + message = MessageBuilder.withPayload("2").setHeader(IntegrationMessageHeaderAccessor.PRIORITY, 2).build(); + channel.send(message); + message = MessageBuilder.withPayload("none").build(); + channel.send(message); + message = MessageBuilder.withPayload("31").setHeader(IntegrationMessageHeaderAccessor.PRIORITY, 3).build(); + channel.send(message); + + Message receive = channel.receive(1000); + assertNotNull(receive); + assertEquals("3", receive.getPayload()); + + receive = channel.receive(1000); + assertNotNull(receive); + assertEquals("31", receive.getPayload()); + + receive = channel.receive(1000); + assertNotNull(receive); + assertEquals("2", receive.getPayload()); + + receive = channel.receive(1000); + assertNotNull(receive); + assertEquals("1", receive.getPayload()); + + receive = channel.receive(1000); + assertNotNull(receive); + assertEquals("0", receive.getPayload()); + + receive = channel.receive(1000); + assertNotNull(receive); + assertEquals("-1", receive.getPayload()); + + receive = channel.receive(1000); + assertNotNull(receive); + assertEquals("none", receive.getPayload()); + + context.close(); + } + + + public static interface TestGateway { String service(String payload); diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore-CustomConverter.xml b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore-CustomConverter.xml index 0fa44dde9ae..ef4b35e43cb 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore-CustomConverter.xml +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore-CustomConverter.xml @@ -12,18 +12,21 @@ - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageGroupStoreTests.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageGroupStoreTests.java index 78d6e516c2a..af4ef200cea 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageGroupStoreTests.java +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageGroupStoreTests.java @@ -15,14 +15,13 @@ */ package org.springframework.integration.mongodb.store; +import com.mongodb.Mongo; import org.junit.Test; import org.springframework.data.mongodb.core.SimpleMongoDbFactory; import org.springframework.integration.mongodb.rules.MongoDbAvailable; import org.springframework.integration.store.MessageStore; -import com.mongodb.Mongo; - /** * @author Oleg Zhurakousky * @author Gary Russell @@ -48,4 +47,5 @@ protected MessageStore getMessageStore() throws Exception { public void testWithAggregatorWithShutdown() throws Exception { super.testWithAggregatorWithShutdown("mongo-aggregator-config.xml"); } + } diff --git a/src/reference/docbook/channel.xml b/src/reference/docbook/channel.xml index 1c9dcd0b614..bc3dc146710 100644 --- a/src/reference/docbook/channel.xml +++ b/src/reference/docbook/channel.xml @@ -201,20 +201,20 @@ The DirectChannel internally delegates to a Message Dispatcher to invoke its subscribed Message Handlers, and that dispatcher can have a load-balancing strategy exposed via load-balancer or load-balancer-ref attributes (mutually exclusive). The load balancing strategy - is used by the Message Dispatcher to help determine how Messages are distributed amongst Message Handlers + is used by the Message Dispatcher to help determine how Messages are distributed amongst Message Handlers in the case that there are multiple Message Handlers subscribed to the same channel. - As a convinience the load-balancer attribute exposes enumeration of values pointing to pre-existing implementations + As a convinience the load-balancer attribute exposes enumeration of values pointing to pre-existing implementations of LoadBalancingStrategy. - The "round-robin" (load-balances across the handlers in rotation) and "none" (for the cases where one wants to explicitely disable load balancing) - are the only available values. - Other strategy implementations may be added in future versions. + The "round-robin" (load-balances across the handlers in rotation) and "none" (for the cases where one wants to explicitely disable load balancing) + are the only available values. + Other strategy implementations may be added in future versions. However, since version 3.0 you can provide your own implementation of the LoadBalancingStrategy and - inject it using load-balancer-ref attribute which should point to a bean that implements + inject it using load-balancer-ref attribute which should point to a bean that implements LoadBalancingStrategy. - + ]]> Note that load-balancer or load-balancer-ref attributes are mutually exclusive. @@ -666,8 +666,8 @@ payload to an Integer. The message store must be a PriorityCapableChannelMessageStore and, in this case, the namespace parser will declare a QueueChannel instead of a PriorityChannel. Implementations of the - PriorityCapableChannelMessageStore are currently provided for Redis - and JDBC. + PriorityCapableChannelMessageStore are currently provided for Redis, + JDBC and MongoDB. See . diff --git a/src/reference/docbook/message-store.xml b/src/reference/docbook/message-store.xml index 87976f6349b..8f946fda889 100644 --- a/src/reference/docbook/message-store.xml +++ b/src/reference/docbook/message-store.xml @@ -94,4 +94,17 @@ + + Spring Integration 4.0 introduced two new interfaces ChannelMessageStore - + to implement operations specific for QueueChannels, PriorityCapableChannelMessageStore - + to mark MessageStore implementation to be used for PriorityChannels and to provide + priority order for persisted Messages. The real behaviour depends on implementation. The Framework provides these implementations, + which can be used as a persistent MessageStore for PriorityChannel: + + + + + + + diff --git a/src/reference/docbook/mongodb.xml b/src/reference/docbook/mongodb.xml index 86eb8de6d34..104c22b020e 100644 --- a/src/reference/docbook/mongodb.xml +++ b/src/reference/docbook/mongodb.xml @@ -142,6 +142,49 @@ configurableStoreMessages. It is recommended to use this implementation for robust and flexible solutions when messages contain complex data types. +

+ MongodDB Channel Message Store + + + Starting with version 4.0 the new MongoDbChannelMessageStore + has been introduced. With priorityEnabled = true option it can be used as a reference for + <int:priority-queue>s to achieve priority order polling for + persisted message. The priority MonogDB document field is the + IntegrationMessageHeaderAccessor.PRIORITY (the priority Message header). + + + In addition MongoDB MessageStores add sequence field for MessageGroup + documents. The sequence value is a result of $inc operation for simple sequence + document from the same collection, which is created on demand. The sequence field is used in + poll operations to allow to have correct messages order, which isn't possible when only + timestamp field is used. + + + + It's not recommended to use the same MongoDbChannelMessageStore bean + for priority and non-priority, because priorityEnabled option applies to the entire store. + However the same collection can be used for both + MongoDbChannelMessageStore types, because the real message polling from + store is sorted and uses indexes.To configure that scenario, simply extend one message store bean + from the other: + + + + + + + + + + + + + + +]]> + + +
diff --git a/src/reference/docbook/whats-new.xml b/src/reference/docbook/whats-new.xml index 8196f1f57c0..35f72079095 100644 --- a/src/reference/docbook/whats-new.xml +++ b/src/reference/docbook/whats-new.xml @@ -110,6 +110,16 @@ For more information, see .
+
+ MongodDB Channel Message Store + + MongoDB support now provides the MongoDbChannelMessageStore - + a channel specific MessageStore implementation. + With priorityEnabled = true option it can be used as a reference for + <int:priority-queue>s to achieve priority order polling for + persisted message. For more information see . + +
@EnableIntegrationMBeanExport