diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java index ccd6dc3787d..0ba43c9f9b6 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java @@ -391,10 +391,10 @@ private Flux> createFluxGenerator() { fluxSink.complete(); } }) - .limitRequest( - this.maxMessagesPerPoll < 0 + .take(this.maxMessagesPerPoll < 0 ? Long.MAX_VALUE - : this.maxMessagesPerPoll); + : this.maxMessagesPerPoll, + true); } }) .subscribeOn(Schedulers.fromExecutor(this.taskExecutor)) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupMetadata.java b/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupMetadata.java index b71ce93f8ac..18ad27e4a54 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupMetadata.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupMetadata.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,7 +40,7 @@ public class MessageGroupMetadata implements Serializable { private static final long serialVersionUID = 1L; - private List messageIds = new LinkedList<>(); + private final List messageIds = new LinkedList<>(); private long timestamp; @@ -52,8 +52,7 @@ public class MessageGroupMetadata implements Serializable { private volatile String condition; - private MessageGroupMetadata() { - //For Jackson deserialization + public MessageGroupMetadata() { } public MessageGroupMetadata(MessageGroup messageGroup) { @@ -79,7 +78,7 @@ boolean add(UUID messageId) { return !this.messageIds.contains(messageId) && this.messageIds.add(messageId); } - void setLastModified(long lastModified) { + public void setLastModified(long lastModified) { this.lastModified = lastModified; } @@ -107,7 +106,7 @@ public List getMessageIds() { return new LinkedList(this.messageIds); } - void complete() { + public void complete() { this.complete = true; } @@ -123,11 +122,15 @@ public long getTimestamp() { return this.timestamp; } + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + public int getLastReleasedMessageSequenceNumber() { return this.lastReleasedMessageSequenceNumber; } - void setLastReleasedMessageSequenceNumber(int lastReleasedMessageSequenceNumber) { + public void setLastReleasedMessageSequenceNumber(int lastReleasedMessageSequenceNumber) { this.lastReleasedMessageSequenceNumber = lastReleasedMessageSequenceNumber; } diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcMessageStore.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcMessageStore.java index f383e53ddc3..b0f97a13736 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcMessageStore.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcMessageStore.java @@ -21,7 +21,6 @@ import java.sql.Timestamp; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -38,6 +37,7 @@ import org.springframework.dao.IncorrectResultSizeDataAccessException; import org.springframework.integration.store.AbstractMessageGroupStore; import org.springframework.integration.store.MessageGroup; +import org.springframework.integration.store.MessageGroupMetadata; import org.springframework.integration.store.MessageMetadata; import org.springframework.integration.store.MessageStore; import org.springframework.integration.store.SimpleMessageGroup; @@ -334,11 +334,13 @@ public Message addMessage(final Message message) { @Override public void addMessagesToGroup(Object groupId, Message... messages) { String groupKey = getKey(groupId); - Map groupInfo = getGroupMetadata(groupKey); + MessageGroupMetadata groupMetadata = getGroupMetadata(groupKey); - Timestamp updatedDate = new Timestamp(System.currentTimeMillis()); - boolean groupNotExist = groupInfo.isEmpty(); - Timestamp createdDate = groupNotExist ? updatedDate : (Timestamp) groupInfo.get("CREATED_DATE"); + boolean groupNotExist = groupMetadata == null; + Timestamp createdDate = + groupNotExist + ? new Timestamp(System.currentTimeMillis()) + : new Timestamp(groupMetadata.getTimestamp()); for (Message message : messages) { addMessage(message); @@ -397,28 +399,39 @@ public int messageGroupSize(Object groupId) { @Override public MessageGroup getMessageGroup(Object groupId) { - String key = getKey(groupId); - Map groupInfo = getGroupMetadata(key); - - if (groupInfo.isEmpty()) { + MessageGroupMetadata groupMetadata = getGroupMetadata(groupId); + if (groupMetadata != null) { + MessageGroup messageGroup = + getMessageGroupFactory() + .create(this, groupId, groupMetadata.getTimestamp(), groupMetadata.isComplete()); + messageGroup.setLastModified(groupMetadata.getLastModified()); + messageGroup.setLastReleasedMessageSequenceNumber(groupMetadata.getLastReleasedMessageSequenceNumber()); + messageGroup.setCondition(groupMetadata.getCondition()); + return messageGroup; + } + else { return new SimpleMessageGroup(groupId); } - - MessageGroup messageGroup = getMessageGroupFactory() - .create(this, groupId, ((Timestamp) groupInfo.get("CREATED_DATE")).getTime(), - ((Long) groupInfo.get("COMPLETE")) > 0); - messageGroup.setLastModified(((Timestamp) groupInfo.get("UPDATED_DATE")).getTime()); - messageGroup.setLastReleasedMessageSequenceNumber(((Long) groupInfo.get("LAST_RELEASED_SEQUENCE")).intValue()); - messageGroup.setCondition((String) groupInfo.get("CONDITION")); - return messageGroup; } - private Map getGroupMetadata(String groupKey) { + @Override + public MessageGroupMetadata getGroupMetadata(Object groupId) { + String key = getKey(groupId); try { - return this.jdbcTemplate.queryForMap(getQuery(Query.GET_GROUP_INFO), groupKey, this.region); + return this.jdbcTemplate.queryForObject(getQuery(Query.GET_GROUP_INFO), (rs, rowNum) -> { + MessageGroupMetadata groupMetadata = new MessageGroupMetadata(); + if (rs.getInt("COMPLETE") > 0) { + groupMetadata.complete(); + } + groupMetadata.setTimestamp(rs.getTimestamp("CREATED_DATE").getTime()); + groupMetadata.setLastModified(rs.getTimestamp("UPDATED_DATE").getTime()); + groupMetadata.setLastReleasedMessageSequenceNumber(rs.getInt("LAST_RELEASED_SEQUENCE")); + groupMetadata.setCondition(rs.getString("CONDITION")); + return groupMetadata; + }, key, this.region); } catch (IncorrectResultSizeDataAccessException ex) { - return Collections.emptyMap(); + return null; } }