diff --git a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java index e8bb6c50..e17cc650 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java +++ b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java @@ -71,22 +71,21 @@ import org.springframework.util.StringUtils; import com.amazonaws.services.kinesis.AmazonKinesis; -import com.amazonaws.services.kinesis.model.DescribeStreamRequest; -import com.amazonaws.services.kinesis.model.DescribeStreamResult; import com.amazonaws.services.kinesis.model.ExpiredIteratorException; import com.amazonaws.services.kinesis.model.GetRecordsRequest; import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; +import com.amazonaws.services.kinesis.model.LimitExceededException; +import com.amazonaws.services.kinesis.model.ListShardsRequest; +import com.amazonaws.services.kinesis.model.ListShardsResult; import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; import com.amazonaws.services.kinesis.model.Record; -import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.model.ShardIteratorType; -import com.amazonaws.services.kinesis.model.StreamStatus; /** - * The {@link MessageProducerSupport} implementation for receiving data from Amazon - * Kinesis stream(s). + * The {@link MessageProducerSupport} implementation for receiving data from Amazon Kinesis + * stream(s). * * @author Artem Bilan * @author Krzysztof Witkowski @@ -115,8 +114,10 @@ public class KinesisMessageDrivenChannelAdapter extends MessageProducerSupport private final ShardConsumerManager shardConsumerManager = new ShardConsumerManager(); - private final ExecutorService shardLocksExecutor = Executors.newSingleThreadExecutor(new CustomizableThreadFactory( - (getComponentName() == null ? "" : getComponentName()) + "-kinesis-shard-locks-")); + private final ExecutorService shardLocksExecutor = + Executors.newSingleThreadExecutor( + new CustomizableThreadFactory( + (getComponentName() == null ? "" : getComponentName()) + "-kinesis-shard-locks-")); private String consumerGroup = "SpringIntegration"; @@ -179,13 +180,16 @@ public KinesisMessageDrivenChannelAdapter(AmazonKinesis amazonKinesis, String... this.streams = Arrays.copyOf(streams, streams.length); } - public KinesisMessageDrivenChannelAdapter(AmazonKinesis amazonKinesis, KinesisShardOffset... shardOffsets) { + public KinesisMessageDrivenChannelAdapter( + AmazonKinesis amazonKinesis, KinesisShardOffset... shardOffsets) { Assert.notNull(amazonKinesis, "'amazonKinesis' must not be null."); Assert.notEmpty(shardOffsets, "'shardOffsets' must not be null."); Assert.noNullElements(shardOffsets, "'shardOffsets' must not contain null elements."); for (KinesisShardOffset shardOffset : shardOffsets) { - Assert.isTrue(StringUtils.hasText(shardOffset.getStream()) && StringUtils.hasText(shardOffset.getShard()), + Assert.isTrue( + StringUtils.hasText(shardOffset.getStream()) + && StringUtils.hasText(shardOffset.getShard()), "The 'shardOffsets' must be provided with particular 'stream' and 'shard' values."); this.shardOffsets.add(new KinesisShardOffset(shardOffset)); } @@ -225,8 +229,8 @@ public void setStreamInitialSequence(KinesisShardOffset streamInitialSequence) { } /** - * Specify a {@link Converter} to deserialize the {@code byte[]} from record's body. - * Can be {@code null} meaning no deserialization. + * Specify a {@link Converter} to deserialize the {@code byte[]} from record's body. Can be {@code + * null} meaning no deserialization. * @param converter the {@link Converter} to use or null */ public void setConverter(Converter converter) { @@ -253,8 +257,7 @@ public void setCheckpointsInterval(long checkpointsInterval) { } /** - * The maximum record to poll per on get-records request. Not greater then - * {@code 10000}. + * The maximum record to poll per on get-records request. Not greater then {@code 10000}. * @param recordsLimit the number of records to for per on get-records request. * @see GetRecordsRequest#setLimit */ @@ -282,11 +285,12 @@ public void setStartTimeout(int startTimeout) { } /** - * The maximum number of concurrent {@link ConsumerInvoker}s running. The - * {@link ShardConsumer}s are evenly distributed between {@link ConsumerInvoker}s. - * Messages from within the same shard will be processed sequentially. In other words - * each shard is tied with the particular thread. By default the concurrency is - * unlimited and shard is processed in the {@link #consumerExecutor} directly. + * The maximum number of concurrent {@link ConsumerInvoker}s running. The {@link ShardConsumer}s + * are evenly distributed between {@link ConsumerInvoker}s. Messages from within the same shard + * will be processed sequentially. In other words each shard is tied with the particular thread. + * By default the concurrency is unlimited and shard is processed in the {@link #consumerExecutor} + * directly. + * * @param concurrency the concurrency maximum number */ public void setConcurrency(int concurrency) { @@ -294,8 +298,9 @@ public void setConcurrency(int concurrency) { } /** - * The sleep interval in milliseconds used in the main loop between shards polling - * cycles. Defaults to {@code 1000}l minimum {@code 250}. + * The sleep interval in milliseconds used in the main loop between shards polling cycles. + * Defaults to {@code 1000}l minimum {@code 250}. + * * @param idleBetweenPolls the interval to sleep between shards polling cycles. */ public void setIdleBetweenPolls(int idleBetweenPolls) { @@ -303,8 +308,9 @@ public void setIdleBetweenPolls(int idleBetweenPolls) { } /** - * Specify an {@link InboundMessageMapper} to extract message headers embedded into - * the record data. + * Specify an {@link InboundMessageMapper} to extract message headers embedded into the record + * data. + * * @param embeddedHeadersMapper the {@link InboundMessageMapper} to use. * @since 2.0 */ @@ -313,8 +319,9 @@ public void setEmbeddedHeadersMapper(InboundMessageMapper embeddedHeader } /** - * Specify a {@link LockRegistry} for an exclusive access to provided streams. This is - * not used when shards-based configuration is provided. + * Specify a {@link LockRegistry} for an exclusive access to provided streams. This is not used + * when shards-based configuration is provided. + * * @param lockRegistry the {@link LockRegistry} to use. * @since 2.0 */ @@ -323,9 +330,9 @@ public void setLockRegistry(LockRegistry lockRegistry) { } /** - * Set to true to bind the source consumer record in the header named - * {@link IntegrationMessageHeaderAccessor#SOURCE_DATA}. Does not apply to batch - * listeners. + * Set to true to bind the source consumer record in the header named {@link + * IntegrationMessageHeaderAccessor#SOURCE_DATA}. Does not apply to batch listeners. + * * @param bindSourceRecord true to bind. * @since 2.2 */ @@ -338,12 +345,16 @@ protected void onInit() { super.onInit(); if (this.consumerExecutor == null) { - this.consumerExecutor = Executors.newCachedThreadPool(new CustomizableThreadFactory( - (getComponentName() == null ? "" : getComponentName()) + "-kinesis-consumer-")); + this.consumerExecutor = + Executors.newCachedThreadPool( + new CustomizableThreadFactory( + (getComponentName() == null ? "" : getComponentName()) + "-kinesis-consumer-")); } if (this.dispatcherExecutor == null) { - this.dispatcherExecutor = Executors.newCachedThreadPool(new CustomizableThreadFactory( - (getComponentName() == null ? "" : getComponentName()) + "-kinesis-dispatcher-")); + this.dispatcherExecutor = + Executors.newCachedThreadPool( + new CustomizableThreadFactory( + (getComponentName() == null ? "" : getComponentName()) + "-kinesis-dispatcher-")); } if (this.streams == null) { @@ -366,14 +377,19 @@ public void destroy() { @ManagedOperation public void stopConsumer(String stream, String shard) { - ShardConsumer shardConsumer = this.shardConsumers.remove(KinesisShardOffset.latest(stream, shard)); + ShardConsumer shardConsumer = + this.shardConsumers.remove(KinesisShardOffset.latest(stream, shard)); if (shardConsumer != null) { shardConsumer.stop(); } else { if (this.logger.isDebugEnabled()) { this.logger.debug( - "There is no ShardConsumer for shard [" + shard + "] in stream [" + shard + "] to stop."); + "There is no ShardConsumer for shard [" + + shard + + "] in stream [" + + shard + + "] to stop."); } } } @@ -410,19 +426,28 @@ public void resetCheckpointForShardToTrimHorizon(String stream, String shard) { } @ManagedOperation - public void resetCheckpointForShardToSequenceNumber(String stream, String shard, String sequenceNumber) { - restartShardConsumerForOffset(KinesisShardOffset.atSequenceNumber(stream, shard, sequenceNumber)); + public void resetCheckpointForShardToSequenceNumber( + String stream, String shard, String sequenceNumber) { + restartShardConsumerForOffset( + KinesisShardOffset.atSequenceNumber(stream, shard, sequenceNumber)); } @ManagedOperation public void resetCheckpointForShardAtTimestamp(String stream, String shard, long timestamp) { - restartShardConsumerForOffset(KinesisShardOffset.atTimestamp(stream, shard, new Date(timestamp))); + restartShardConsumerForOffset( + KinesisShardOffset.atTimestamp(stream, shard, new Date(timestamp))); } private void restartShardConsumerForOffset(KinesisShardOffset shardOffset) { - Assert.isTrue(this.shardOffsets.contains(shardOffset), - "The [" + KinesisMessageDrivenChannelAdapter.this + "] doesn't operate shard [" + shardOffset.getShard() - + "] for stream [" + shardOffset.getStream() + "]"); + Assert.isTrue( + this.shardOffsets.contains(shardOffset), + "The [" + + KinesisMessageDrivenChannelAdapter.this + + "] doesn't operate shard [" + + shardOffset.getShard() + + "] for stream [" + + shardOffset.getStream() + + "]"); if (logger.isDebugEnabled()) { logger.debug("Resetting consumer for [" + shardOffset + "]..."); @@ -454,10 +479,12 @@ public void resetCheckpoints() { @Override protected void doStart() { super.doStart(); - if (ListenerMode.batch.equals(this.listenerMode) && CheckpointMode.record.equals(this.checkpointMode)) { + if (ListenerMode.batch.equals(this.listenerMode) + && CheckpointMode.record.equals(this.checkpointMode)) { this.checkpointMode = CheckpointMode.batch; - logger.warn("The 'checkpointMode' is overridden from [CheckpointMode.record] to [CheckpointMode.batch] " - + "because it does not make sense in case of [ListenerMode.batch]."); + logger.warn( + "The 'checkpointMode' is overridden from [CheckpointMode.record] to [CheckpointMode.batch] " + + "because it does not make sense in case of [ListenerMode.batch]."); } if (this.streams != null) { @@ -499,6 +526,51 @@ private Collection shardConsumerSubset(int i) { } } + private List readShardList(String stream) { + return this.readShardList(stream, 0); + } + + private List readShardList(String stream, int retryCount) { + List shardList = new ArrayList<>(); + + if (retryCount > this.describeStreamRetries) { + throw new IllegalStateException( + "Kinesis could not read shards from stream with name [" + stream + "] "); + } + + ListShardsRequest listShardsRequest = new ListShardsRequest().withStreamName(stream); + + try { + ListShardsResult listShardsResult = this.amazonKinesis.listShards(listShardsRequest); + + shardList.addAll(listShardsResult.getShards()); + + } + catch (LimitExceededException limitExceededException) { + + logger.info( + "Got LimitExceededException when listing stream [" + + stream + + "]. " + + "Backing off for [" + + this.describeStreamBackoff + + "] millis."); + + try { + Thread.sleep(this.describeStreamBackoff); + readShardList(stream, retryCount++); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IllegalStateException( + "The [describeStream] thread for the stream [" + stream + "] has been interrupted.", + ex); + } + } + + return shardList; + } + private void populateShardsForStreams() { this.shardOffsets.clear(); final CountDownLatch shardsGatherLatch = new CountDownLatch(this.streams.length); @@ -507,103 +579,89 @@ private void populateShardsForStreams() { } try { if (!shardsGatherLatch.await(this.startTimeout, TimeUnit.MILLISECONDS)) { - throw new IllegalStateException("The [ " + KinesisMessageDrivenChannelAdapter.this - + "] could not start during timeout: " + this.startTimeout); + throw new IllegalStateException( + "The [ " + + KinesisMessageDrivenChannelAdapter.this + + "] could not start during timeout: " + + this.startTimeout); } } catch (InterruptedException e) { throw new IllegalStateException( - "The [ " + KinesisMessageDrivenChannelAdapter.this + "] has been interrupted from start."); + "The [ " + + KinesisMessageDrivenChannelAdapter.this + + "] has been interrupted from start."); } } - private void populateShardsForStream(final String stream, final CountDownLatch shardsGatherLatch) { - this.dispatcherExecutor.execute(() -> { - try { - int describeStreamRetries = 0; - List shardsToConsume = new ArrayList<>(); + private List detectShardsToConsume(String stream) { + return detectShardsToConsume(stream, 0); + } - String exclusiveStartShardId = null; - while (true) { - DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest().withStreamName(stream) - .withExclusiveStartShardId(exclusiveStartShardId); + private List detectShardsToConsume(String stream, int retry) { + List shardsToConsume = new ArrayList<>(); - DescribeStreamResult describeStreamResult = null; - // Call DescribeStream, with backoff and retries (if we get - // LimitExceededException). - try { - describeStreamResult = this.amazonKinesis.describeStream(describeStreamRequest); - } - catch (Exception e) { - logger.info("Got an exception when describing stream [" + stream + "]. " + "Backing off for [" - + this.describeStreamBackoff + "] millis.", e); - } + List shards = readShardList(stream); - if (describeStreamResult == null || !StreamStatus.ACTIVE.toString() - .equals(describeStreamResult.getStreamDescription().getStreamStatus())) { + try { + for (Shard shard : shards) { + String key = buildCheckpointKeyForShard(stream, shard.getShardId()); + String endingSequenceNumber = shard.getSequenceNumberRange().getEndingSequenceNumber(); + if (endingSequenceNumber != null) { + String checkpoint = this.checkpointStore.get(key); + + boolean skipClosedShard = checkpoint != null && new BigInteger(endingSequenceNumber) + .compareTo(new BigInteger(checkpoint)) <= 0; + + if (logger.isTraceEnabled()) { + logger.trace("The shard [" + shard + "] in stream [" + stream + + "] is closed CLOSED with endingSequenceNumber [" + endingSequenceNumber + + "].\nThe last processed checkpoint is [" + checkpoint + "]." + + (skipClosedShard ? "\nThe shard will be skipped." : "")); + } - if (describeStreamRetries++ > this.describeStreamRetries) { - ResourceNotFoundException resourceNotFoundException = new ResourceNotFoundException( - "The stream [" + stream + "] isn't ACTIVE or doesn't exist."); - resourceNotFoundException.setServiceName("Kinesis"); - throw resourceNotFoundException; - } - try { - Thread.sleep(this.describeStreamBackoff); - continue; - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException( - "The [describeStream] thread for the stream [" + stream + "] has been interrupted.", - e); - } + if (skipClosedShard) { + // Skip CLOSED shard which has been read before + // according a checkpoint + continue; } + } - List shards = describeStreamResult.getStreamDescription().getShards(); + shardsToConsume.add(shard); + } + } + catch (Exception e) { + String exceptionMessage = "Got an exception when processing shards in stream [" + stream + "]"; + logger.info(exceptionMessage + ".\n Retrying... ", e); + if (retry > 5) { + throw new IllegalStateException("Error processing shards in stream [\" + stream + \"].", e); + } + //Retry + detectShardsToConsume(stream, retry++); + sleep(this.describeStreamBackoff, new IllegalStateException(exceptionMessage), false); + } - try { - for (Shard shard : shards) { - String key = buildCheckpointKeyForShard(stream, shard.getShardId()); - String endingSequenceNumber = shard.getSequenceNumberRange().getEndingSequenceNumber(); - if (endingSequenceNumber != null) { - String checkpoint = this.checkpointStore.get(key); - - boolean skipClosedShard = checkpoint != null && new BigInteger(endingSequenceNumber) - .compareTo(new BigInteger(checkpoint)) <= 0; - - if (logger.isTraceEnabled()) { - logger.trace("The shard [" + shard + "] in stream [" + stream - + "] is closed CLOSED with endingSequenceNumber [" + endingSequenceNumber - + "].\nThe last processed checkpoint is [" + checkpoint + "]." - + (skipClosedShard ? "\nThe shard will be skipped." : "")); - } + return shardsToConsume; - if (skipClosedShard) { - // Skip CLOSED shard which has been read before - // according a checkpoint - continue; - } - } + } - shardsToConsume.add(shard); - } - } - catch (Exception e) { - logger.info( - "Got an exception when processing shards in stream [" + stream + "].\n" + "Retrying...", - e); - continue; - } + private void sleep(long sleepAmount, RuntimeException error, boolean interruptThread) { + try { + Thread.sleep(sleepAmount); + } + catch (Exception e) { + if (interruptThread) { + Thread.currentThread().interrupt(); + } + logger.error(error.getMessage(), e); + throw error; + } + } - if (describeStreamResult.getStreamDescription().getHasMoreShards()) { - exclusiveStartShardId = shards.get(shards.size() - 1).getShardId(); - describeStreamRetries = 0; - } - else { - break; - } - } + private void populateShardsForStream(final String stream, final CountDownLatch shardsGatherLatch) { + this.dispatcherExecutor.execute(() -> { + try { + List shardsToConsume = detectShardsToConsume(stream); for (Shard shard : shardsToConsume) { KinesisShardOffset shardOffset = new KinesisShardOffset(this.streamInitialSequence); @@ -644,7 +702,8 @@ private void populateConsumer(KinesisShardOffset shardOffset) { if (this.active) { synchronized (this.consumerInvokers) { if (this.consumerInvokers.size() < this.maxConcurrency) { - ConsumerInvoker consumerInvoker = new ConsumerInvoker(Collections.singleton(shardConsumer)); + ConsumerInvoker consumerInvoker = + new ConsumerInvoker(Collections.singleton(shardConsumer)); this.consumerInvokers.add(consumerInvoker); this.consumerExecutor.execute(consumerInvoker); } @@ -692,8 +751,9 @@ private void stopConsumers() { } /** - * If there's an error channel, we create a new attributes holder here. Then set the - * attributes for use by the {@link ErrorMessageStrategy}. + * If there's an error channel, we create a new attributes holder here. Then set the attributes + * for use by the {@link ErrorMessageStrategy}. + * * @param record the Kinesis record to use. * @param message the Spring Messaging message to use. */ @@ -706,7 +766,8 @@ private void setAttributesIfNecessary(Object record, Message message) { } @Override - protected AttributeAccessor getErrorMessageAttributes(org.springframework.messaging.Message message) { + protected AttributeAccessor getErrorMessageAttributes( + org.springframework.messaging.Message message) { AttributeAccessor attributes = attributesHolder.get(); if (attributes == null) { return super.getErrorMessageAttributes(message); @@ -718,8 +779,21 @@ protected AttributeAccessor getErrorMessageAttributes(org.springframework.messag @Override public String toString() { - return "KinesisMessageDrivenChannelAdapter{" + "shardOffsets=" + this.shardOffsets + ", consumerGroup='" - + this.consumerGroup + '\'' + '}'; + return "KinesisMessageDrivenChannelAdapter{" + + "shardOffsets=" + + this.shardOffsets + + ", consumerGroup='" + + this.consumerGroup + + '\'' + + '}'; + } + + private enum ConsumerState { + NEW, + EXPIRED, + CONSUME, + SLEEP, + STOP } private final class ConsumerDispatcher implements SchedulingAwareRunnable { @@ -741,8 +815,8 @@ public void run() { } } - Iterator iterator = KinesisMessageDrivenChannelAdapter.this.shardConsumers.values() - .iterator(); + Iterator iterator = + KinesisMessageDrivenChannelAdapter.this.shardConsumers.values().iterator(); while (iterator.hasNext()) { ShardConsumer shardConsumer = iterator.next(); shardConsumer.execute(); @@ -763,14 +837,8 @@ public void run() { } } } - - try { - Thread.sleep(KinesisMessageDrivenChannelAdapter.this.idleBetweenPolls); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException("ConsumerDispatcher Thread [" + this + "] has been interrupted", e); - } + String errorMsg = "ConsumerDispatcher Thread [" + this + "] has been interrupted"; + sleep(KinesisMessageDrivenChannelAdapter.this.idleBetweenPolls, new IllegalStateException(errorMsg), true); } } @@ -778,7 +846,6 @@ public void run() { public boolean isLongLived() { return true; } - } private final class ShardConsumer { @@ -787,12 +854,10 @@ private final class ShardConsumer { private final ShardCheckpointer checkpointer; - private long nextCheckpointTimeInMillis; - - private final Runnable processTask = processTask(); - private final String key; + private long nextCheckpointTimeInMillis; + private Runnable notifier; private volatile ConsumerState state = ConsumerState.NEW; @@ -803,11 +868,13 @@ private final class ShardConsumer { private volatile long sleepUntil; + private final Runnable processTask = processTask(); + ShardConsumer(KinesisShardOffset shardOffset) { this.shardOffset = new KinesisShardOffset(shardOffset); this.key = buildCheckpointKeyForShard(shardOffset.getStream(), shardOffset.getShard()); - this.checkpointer = new ShardCheckpointer(KinesisMessageDrivenChannelAdapter.this.checkpointStore, - this.key); + this.checkpointer = + new ShardCheckpointer(KinesisMessageDrivenChannelAdapter.this.checkpointStore, this.key); } void setNotifier(Runnable notifier) { @@ -832,36 +899,39 @@ void close() { void execute() { if (this.task == null) { switch (this.state) { - case NEW: case EXPIRED: - this.task = () -> { - try { - if (this.shardOffset.isReset()) { - this.checkpointer.remove(); - } - else { - String checkpoint = this.checkpointer.getCheckpoint(); - if (checkpoint != null) { - this.shardOffset.setSequenceNumber(checkpoint); - this.shardOffset.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER); + this.task = + () -> { + try { + if (this.shardOffset.isReset()) { + this.checkpointer.remove(); + } + else { + String checkpoint = this.checkpointer.getCheckpoint(); + if (checkpoint != null) { + this.shardOffset.setSequenceNumber(checkpoint); + this.shardOffset.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER); + } + } + if (logger.isInfoEnabled() && this.state == ConsumerState.NEW) { + logger.info("The [" + this + "] has been started."); + } + GetShardIteratorRequest shardIteratorRequest = + this.shardOffset.toShardIteratorRequest(); + this.shardIterator = + KinesisMessageDrivenChannelAdapter.this + .amazonKinesis + .getShardIterator(shardIteratorRequest) + .getShardIterator(); + if (ConsumerState.STOP != this.state) { + this.state = ConsumerState.CONSUME; + } } - } - if (logger.isInfoEnabled() && this.state == ConsumerState.NEW) { - logger.info("The [" + this + "] has been started."); - } - GetShardIteratorRequest shardIteratorRequest = - this.shardOffset.toShardIteratorRequest(); - this.shardIterator = KinesisMessageDrivenChannelAdapter.this.amazonKinesis - .getShardIterator(shardIteratorRequest).getShardIterator(); - if (ConsumerState.STOP != this.state) { - this.state = ConsumerState.CONSUME; - } - } - finally { - this.task = null; - } - }; + finally { + this.task = null; + } + }; break; case CONSUME: @@ -878,9 +948,12 @@ void execute() { case STOP: if (this.shardIterator == null) { if (logger.isInfoEnabled()) { - logger.info("Stopping the [" + this + "] on the checkpoint [" - + this.checkpointer.getCheckpoint() - + "] because the shard has been CLOSED and exhausted."); + logger.info( + "Stopping the [" + + this + + "] on the checkpoint [" + + this.checkpointer.getCheckpoint() + + "] because the shard has been CLOSED and exhausted."); } } else { @@ -934,18 +1007,22 @@ private Runnable processTask() { // Shard is closed: nothing to consume any more. // Resharding is possible. if (KinesisMessageDrivenChannelAdapter.this.applicationEventPublisher != null) { - KinesisMessageDrivenChannelAdapter.this.applicationEventPublisher - .publishEvent(new KinesisShardEndedEvent(KinesisMessageDrivenChannelAdapter.this, - this.key)); + KinesisMessageDrivenChannelAdapter.this.applicationEventPublisher.publishEvent( + new KinesisShardEndedEvent(KinesisMessageDrivenChannelAdapter.this, this.key)); } stop(); } if (ConsumerState.STOP != this.state && result.getRecords().isEmpty()) { if (logger.isDebugEnabled()) { - logger.debug("No records for [" + this + "] on sequenceNumber [" - + this.checkpointer.getLastCheckpointValue() + "]. Suspend consuming for [" - + KinesisMessageDrivenChannelAdapter.this.consumerBackoff + "] milliseconds."); + logger.debug( + "No records for [" + + this + + "] on sequenceNumber [" + + this.checkpointer.getLastCheckpointValue() + + "]. Suspend consuming for [" + + KinesisMessageDrivenChannelAdapter.this.consumerBackoff + + "] milliseconds."); } prepareSleepState(); } @@ -966,15 +1043,21 @@ private GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) { // Lets acquire iterator again (using checkpointer for iterator start // sequence number). if (logger.isInfoEnabled()) { - logger.info("Shard iterator for [" + ShardConsumer.this + "] expired.\n" - + "A new one will be started from the check pointed sequence number."); + logger.info( + "Shard iterator for [" + + ShardConsumer.this + + "] expired.\n" + + "A new one will be started from the check pointed sequence number."); } this.state = ConsumerState.EXPIRED; } catch (ProvisionedThroughputExceededException e) { if (logger.isWarnEnabled()) { - logger.warn("GetRecords request throttled for [" + ShardConsumer.this + "] with the reason: " - + e.getErrorMessage()); + logger.warn( + "GetRecords request throttled for [" + + ShardConsumer.this + + "] with the reason: " + + e.getErrorMessage()); } // We are throttled, so let's sleep prepareSleepState(); @@ -984,8 +1067,8 @@ private GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) { } private void prepareSleepState() { - ShardConsumer.this.sleepUntil = System.currentTimeMillis() - + KinesisMessageDrivenChannelAdapter.this.consumerBackoff; + ShardConsumer.this.sleepUntil = + System.currentTimeMillis() + KinesisMessageDrivenChannelAdapter.this.consumerBackoff; ShardConsumer.this.state = ConsumerState.SLEEP; } @@ -1015,7 +1098,8 @@ private void processSingleRecord(Record record) { } private void processMultipleRecords(List records) { - AbstractIntegrationMessageBuilder messageBuilder = getMessageBuilderFactory().withPayload(records); + AbstractIntegrationMessageBuilder messageBuilder = + getMessageBuilderFactory().withPayload(records); if (KinesisMessageDrivenChannelAdapter.this.embeddedHeadersMapper != null) { List> payload = records.stream() @@ -1029,18 +1113,23 @@ else if (KinesisMessageDrivenChannelAdapter.this.converter != null) { final List partitionKeys = new ArrayList<>(); final List sequenceNumbers = new ArrayList<>(); - List payload = records.stream() - .map(r -> { - partitionKeys.add(r.getPartitionKey()); - sequenceNumbers.add(r.getSequenceNumber()); - - return KinesisMessageDrivenChannelAdapter.this.converter.convert(r.getData().array()); - }) - .collect(Collectors.toList()); + List payload = + records.stream() + .map( + r -> { + partitionKeys.add(r.getPartitionKey()); + sequenceNumbers.add(r.getSequenceNumber()); + + return KinesisMessageDrivenChannelAdapter.this.converter.convert( + r.getData().array()); + }) + .collect(Collectors.toList()); - messageBuilder = getMessageBuilderFactory().withPayload(payload) - .setHeader(AwsHeaders.RECEIVED_PARTITION_KEY, partitionKeys) - .setHeader(AwsHeaders.RECEIVED_SEQUENCE_NUMBER, sequenceNumbers); + messageBuilder = + getMessageBuilderFactory() + .withPayload(payload) + .setHeader(AwsHeaders.RECEIVED_PARTITION_KEY, partitionKeys) + .setHeader(AwsHeaders.RECEIVED_SEQUENCE_NUMBER, sequenceNumbers); } performSend(messageBuilder, records); @@ -1052,8 +1141,9 @@ private AbstractIntegrationMessageBuilder prepareMessageForRecord(Record if (KinesisMessageDrivenChannelAdapter.this.embeddedHeadersMapper != null) { try { - messageToUse = KinesisMessageDrivenChannelAdapter.this.embeddedHeadersMapper - .toMessage((byte[]) payload); + messageToUse = + KinesisMessageDrivenChannelAdapter.this.embeddedHeadersMapper.toMessage( + (byte[]) payload); payload = messageToUse.getPayload(); } @@ -1066,9 +1156,11 @@ private AbstractIntegrationMessageBuilder prepareMessageForRecord(Record payload = KinesisMessageDrivenChannelAdapter.this.converter.convert((byte[]) payload); } - AbstractIntegrationMessageBuilder messageBuilder = getMessageBuilderFactory().withPayload(payload) - .setHeader(AwsHeaders.RECEIVED_PARTITION_KEY, record.getPartitionKey()) - .setHeader(AwsHeaders.RECEIVED_SEQUENCE_NUMBER, record.getSequenceNumber()); + AbstractIntegrationMessageBuilder messageBuilder = + getMessageBuilderFactory() + .withPayload(payload) + .setHeader(AwsHeaders.RECEIVED_PARTITION_KEY, record.getPartitionKey()) + .setHeader(AwsHeaders.RECEIVED_SEQUENCE_NUMBER, record.getSequenceNumber()); if (KinesisMessageDrivenChannelAdapter.this.bindSourceRecord) { messageBuilder.setHeader(IntegrationMessageHeaderAccessor.SOURCE_DATA, record); @@ -1081,8 +1173,10 @@ private AbstractIntegrationMessageBuilder prepareMessageForRecord(Record return messageBuilder; } - private void performSend(AbstractIntegrationMessageBuilder messageBuilder, Object rawRecord) { - messageBuilder.setHeader(AwsHeaders.RECEIVED_STREAM, this.shardOffset.getStream()) + private void performSend( + AbstractIntegrationMessageBuilder messageBuilder, Object rawRecord) { + messageBuilder + .setHeader(AwsHeaders.RECEIVED_STREAM, this.shardOffset.getStream()) .setHeader(AwsHeaders.SHARD, this.shardOffset.getShard()); if (CheckpointMode.manual.equals(KinesisMessageDrivenChannelAdapter.this.checkpointMode)) { @@ -1095,8 +1189,15 @@ private void performSend(AbstractIntegrationMessageBuilder messageBuilder, Ob sendMessage(messageToSend); } catch (Exception e) { - logger.info("Got an exception during sending a '" + messageToSend + "'" + "\nfor the '" + rawRecord - + "'.\n" + "Consider to use 'errorChannel' flow for the compensation logic.", e); + logger.info( + "Got an exception during sending a '" + + messageToSend + + "'" + + "\nfor the '" + + rawRecord + + "'.\n" + + "Consider to use 'errorChannel' flow for the compensation logic.", + e); } } @@ -1122,7 +1223,8 @@ private void checkpointIfPeriodicMode(@Nullable Record record) { this.checkpointer.checkpoint(record.getSequenceNumber()); } this.nextCheckpointTimeInMillis = - System.currentTimeMillis() + KinesisMessageDrivenChannelAdapter.this.checkpointsInterval; + System.currentTimeMillis() + + KinesisMessageDrivenChannelAdapter.this.checkpointsInterval; } } @@ -1130,13 +1232,6 @@ private void checkpointIfPeriodicMode(@Nullable Record record) { public String toString() { return "ShardConsumer{" + "shardOffset=" + this.shardOffset + ", state=" + this.state + '}'; } - - } - - private enum ConsumerState { - - NEW, EXPIRED, CONSUME, SLEEP, STOP - } private final class ConsumerInvoker implements SchedulingAwareRunnable { @@ -1171,7 +1266,8 @@ public void run() { catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new IllegalStateException("ConsumerInvoker thread [" + this + "] has been interrupted", e); + throw new IllegalStateException( + "ConsumerInvoker thread [" + this + "] has been interrupted", e); } for (Iterator iterator = this.consumers.iterator(); iterator.hasNext(); ) { @@ -1185,8 +1281,13 @@ public void run() { shardConsumer.task.run(); } catch (Exception e) { - logger.info("Got an exception " + e + " during [" + shardConsumer + "] task invocation" - + ".\nProcess will be retried on the next iteration."); + logger.info( + "Got an exception " + + e + + " during [" + + shardConsumer + + "] task invocation" + + ".\nProcess will be retried on the next iteration."); } } } @@ -1209,12 +1310,12 @@ public void run() { public boolean isLongLived() { return true; } - } private final class ShardConsumerManager implements SchedulingAwareRunnable { - private final Map shardOffsetsToConsumer = new ConcurrentHashMap<>(); + private final Map shardOffsetsToConsumer = + new ConcurrentHashMap<>(); private final Map locks = new HashMap<>(); @@ -1224,7 +1325,8 @@ private final class ShardConsumerManager implements SchedulingAwareRunnable { } void addShardToConsume(KinesisShardOffset kinesisShardOffset) { - String lockKey = buildCheckpointKeyForShard(kinesisShardOffset.getStream(), kinesisShardOffset.getShard()); + String lockKey = + buildCheckpointKeyForShard(kinesisShardOffset.getStream(), kinesisShardOffset.getShard()); this.shardOffsetsToConsumer.put(lockKey, kinesisShardOffset); } @@ -1237,31 +1339,34 @@ public void run() { try { while (!Thread.currentThread().isInterrupted()) { - this.shardOffsetsToConsumer.entrySet().removeIf(entry -> { - boolean remove = true; - if (KinesisMessageDrivenChannelAdapter.this.lockRegistry != null) { - String key = entry.getKey(); - Lock lock = KinesisMessageDrivenChannelAdapter.this.lockRegistry.obtain(key); - try { - if (lock.tryLock()) { - this.locks.put(key, lock); - } - else { - remove = false; - } - - } - catch (Exception e) { - logger.error("Error during locking: " + lock, e); - } - } - - if (remove) { - populateConsumer(entry.getValue()); - } - - return remove; - }); + this.shardOffsetsToConsumer + .entrySet() + .removeIf( + entry -> { + boolean remove = true; + if (KinesisMessageDrivenChannelAdapter.this.lockRegistry != null) { + String key = entry.getKey(); + Lock lock = KinesisMessageDrivenChannelAdapter.this.lockRegistry.obtain(key); + try { + if (lock.tryLock()) { + this.locks.put(key, lock); + } + else { + remove = false; + } + + } + catch (Exception e) { + logger.error("Error during locking: " + lock, e); + } + } + + if (remove) { + populateConsumer(entry.getValue()); + } + + return remove; + }); while (KinesisMessageDrivenChannelAdapter.this.lockRegistry != null) { String lockKey = this.forUnlocking.poll(); @@ -1281,14 +1386,7 @@ public void run() { } } - try { - Thread.sleep(250); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException( - "ShardConsumerManager Thread [" + this + "] has been interrupted", e); - } + sleep(250, new IllegalStateException("ShardConsumerManager Thread [" + this + "] has been interrupted"), true); } } finally { @@ -1311,7 +1409,5 @@ public void run() { public boolean isLongLived() { return true; } - } - } diff --git a/src/main/java/org/springframework/integration/aws/support/AbstractMessageAttributesHeaderMapper.java b/src/main/java/org/springframework/integration/aws/support/AbstractMessageAttributesHeaderMapper.java index 4694b385..9381ad40 100644 --- a/src/main/java/org/springframework/integration/aws/support/AbstractMessageAttributesHeaderMapper.java +++ b/src/main/java/org/springframework/integration/aws/support/AbstractMessageAttributesHeaderMapper.java @@ -46,8 +46,8 @@ public abstract class AbstractMessageAttributesHeaderMapper implements Header private static final Log logger = LogFactory.getLog(SqsHeaderMapper.class); - private volatile String[] outboundHeaderNames = { "!" + MessageHeaders.ID, "!" + MessageHeaders.TIMESTAMP, - "!" + AwsHeaders.MESSAGE_ID, "!" + AwsHeaders.QUEUE, "!" + AwsHeaders.TOPIC, "*" }; + private volatile String[] outboundHeaderNames = {"!" + MessageHeaders.ID, "!" + MessageHeaders.TIMESTAMP, + "!" + AwsHeaders.MESSAGE_ID, "!" + AwsHeaders.QUEUE, "!" + AwsHeaders.TOPIC, "*"}; /** * Provide the header names that should be mapped to a AWS request object attributes diff --git a/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java b/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java index 3fdae5ae..c7da48a6 100644 --- a/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java +++ b/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java @@ -60,21 +60,20 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import com.amazonaws.services.kinesis.AmazonKinesis; -import com.amazonaws.services.kinesis.model.DescribeStreamRequest; -import com.amazonaws.services.kinesis.model.DescribeStreamResult; import com.amazonaws.services.kinesis.model.ExpiredIteratorException; import com.amazonaws.services.kinesis.model.GetRecordsRequest; import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.GetShardIteratorResult; +import com.amazonaws.services.kinesis.model.ListShardsRequest; +import com.amazonaws.services.kinesis.model.ListShardsResult; import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.SequenceNumberRange; import com.amazonaws.services.kinesis.model.Shard; -import com.amazonaws.services.kinesis.model.StreamDescription; -import com.amazonaws.services.kinesis.model.StreamStatus; /** * @author Artem Bilan + * @author Matthias Wesolowski * @since 1.1 */ @SpringJUnitConfig @@ -109,7 +108,7 @@ void setup() { } @Test - @SuppressWarnings({ "unchecked", "rawtypes" }) + @SuppressWarnings({"unchecked", "rawtypes"}) void testKinesisMessageDrivenChannelAdapter() { this.kinesisMessageDrivenChannelAdapter.start(); final Set shardOffsets = TestUtils.getPropertyValue(this.kinesisMessageDrivenChannelAdapter, @@ -214,34 +213,35 @@ void testResharding() throws InterruptedException { assertThat(n).isLessThan(100); // When resharding happens the describeStream() is performed again - verify(this.amazonKinesisForResharding, atLeast(1)).describeStream(any(DescribeStreamRequest.class)); + verify(this.amazonKinesisForResharding, atLeast(1)) + .listShards(any(ListShardsRequest.class)); this.reshardingChannelAdapter.stop(); KinesisShardEndedEvent kinesisShardEndedEvent = this.config.shardEndedEventReference.get(); assertThat(kinesisShardEndedEvent).isNotNull() - .extracting(KinesisShardEndedEvent::getShardKey) - .isEqualTo("SpringIntegration:streamForResharding:closedShard"); + .extracting(KinesisShardEndedEvent::getShardKey) + .isEqualTo("SpringIntegration:streamForResharding:closedShard"); } @Configuration @EnableIntegration public static class Config { + private final AtomicReference shardEndedEventReference = new AtomicReference<>(); + @Bean public AmazonKinesis amazonKinesis() { AmazonKinesis amazonKinesis = mock(AmazonKinesis.class); - given(amazonKinesis.describeStream(new DescribeStreamRequest().withStreamName(STREAM1))).willReturn( - new DescribeStreamResult().withStreamDescription( - new StreamDescription().withStreamName(STREAM1).withStreamStatus(StreamStatus.UPDATING)), - new DescribeStreamResult().withStreamDescription(new StreamDescription().withStreamName(STREAM1) - .withStreamStatus(StreamStatus.ACTIVE).withHasMoreShards(false) + given(amazonKinesis.listShards(new ListShardsRequest().withStreamName(STREAM1))).willReturn( + new ListShardsResult() .withShards(new Shard().withShardId("1").withSequenceNumberRange(new SequenceNumberRange()), new Shard().withShardId("2").withSequenceNumberRange(new SequenceNumberRange()), new Shard().withShardId("3").withSequenceNumberRange( - new SequenceNumberRange().withEndingSequenceNumber("1"))))); + new SequenceNumberRange().withEndingSequenceNumber("1"))) + ); String shard1Iterator1 = "shard1Iterator1"; String shard1Iterator2 = "shard1Iterator2"; @@ -329,12 +329,10 @@ public PollableChannel kinesisChannel() { public AmazonKinesis amazonKinesisForResharding() { AmazonKinesis amazonKinesis = mock(AmazonKinesis.class); - given(amazonKinesis.describeStream(new DescribeStreamRequest().withStreamName(STREAM_FOR_RESHARDING))) - .willReturn(new DescribeStreamResult() - .withStreamDescription(new StreamDescription().withStreamName(STREAM_FOR_RESHARDING) - .withStreamStatus(StreamStatus.ACTIVE).withHasMoreShards(false) - .withShards(new Shard().withShardId("closedShard").withSequenceNumberRange( - new SequenceNumberRange().withEndingSequenceNumber("1"))))); + given(amazonKinesis.listShards(new ListShardsRequest().withStreamName(STREAM_FOR_RESHARDING))) + .willReturn(new ListShardsResult() + .withShards(new Shard().withShardId("closedShard").withSequenceNumberRange( + new SequenceNumberRange().withEndingSequenceNumber("1")))); String shard1Iterator1 = "shard1Iterator1"; @@ -370,8 +368,6 @@ public KinesisMessageDrivenChannelAdapter reshardingChannelAdapter() { return adapter; } - private final AtomicReference shardEndedEventReference = new AtomicReference<>(); - @EventListener public void handleKinesisShardEndedEvent(KinesisShardEndedEvent event) { this.shardEndedEventReference.set(event); diff --git a/src/test/java/org/springframework/integration/aws/inbound/S3StreamingChannelAdapterTests.java b/src/test/java/org/springframework/integration/aws/inbound/S3StreamingChannelAdapterTests.java index 783df506..31ed1c5a 100644 --- a/src/test/java/org/springframework/integration/aws/inbound/S3StreamingChannelAdapterTests.java +++ b/src/test/java/org/springframework/integration/aws/inbound/S3StreamingChannelAdapterTests.java @@ -73,11 +73,11 @@ @DirtiesContext public class S3StreamingChannelAdapterTests { + private static final String S3_BUCKET = "S3_BUCKET"; + @TempDir static Path TEMPORARY_FOLDER; - private static final String S3_BUCKET = "S3_BUCKET"; - private static List S3_OBJECTS; @Autowired diff --git a/src/test/java/org/springframework/integration/aws/inbound/SqsMessageDrivenChannelAdapterTests.java b/src/test/java/org/springframework/integration/aws/inbound/SqsMessageDrivenChannelAdapterTests.java index 15dc9f5b..4897a887 100644 --- a/src/test/java/org/springframework/integration/aws/inbound/SqsMessageDrivenChannelAdapterTests.java +++ b/src/test/java/org/springframework/integration/aws/inbound/SqsMessageDrivenChannelAdapterTests.java @@ -107,7 +107,7 @@ void testSqsMessageDrivenChannelAdapter() { .hasCauseExactlyInstanceOf(IllegalArgumentException.class) .hasMessageContaining("Queue with name 'foo' does not exist"); - assertThat(this.sqsMessageDrivenChannelAdapter.getQueues()).isEqualTo(new String[] { "testQueue" }); + assertThat(this.sqsMessageDrivenChannelAdapter.getQueues()).isEqualTo(new String[] {"testQueue"}); } @Configuration diff --git a/src/test/java/org/springframework/integration/aws/kinesis/KplKclIntegrationTests.java b/src/test/java/org/springframework/integration/aws/kinesis/KplKclIntegrationTests.java index a081a959..3316fd30 100644 --- a/src/test/java/org/springframework/integration/aws/kinesis/KplKclIntegrationTests.java +++ b/src/test/java/org/springframework/integration/aws/kinesis/KplKclIntegrationTests.java @@ -79,7 +79,7 @@ @LocalstackDockerProperties(randomizePorts = true, hostNameResolver = EnvironmentHostNameResolver.class, environmentVariableProvider = LocalStackSslEnvironmentProvider.class, - services = { "kinesis", "dynamodb", "cloudwatch" }) + services = {"kinesis", "dynamodb", "cloudwatch"}) @DirtiesContext public class KplKclIntegrationTests { diff --git a/src/test/java/org/springframework/integration/aws/lock/DynamoDbLockRegistryTests.java b/src/test/java/org/springframework/integration/aws/lock/DynamoDbLockRegistryTests.java index d26313f6..339d3d40 100644 --- a/src/test/java/org/springframework/integration/aws/lock/DynamoDbLockRegistryTests.java +++ b/src/test/java/org/springframework/integration/aws/lock/DynamoDbLockRegistryTests.java @@ -66,10 +66,10 @@ @DirtiesContext public class DynamoDbLockRegistryTests { - private final AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); - private static AmazonDynamoDBAsync DYNAMO_DB; + private final AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); + @Autowired private DynamoDbLockRegistry dynamoDbLockRegistry; diff --git a/src/test/java/org/springframework/integration/aws/metadata/DynamoDbMetadataStoreTests.java b/src/test/java/org/springframework/integration/aws/metadata/DynamoDbMetadataStoreTests.java index 7acf710f..8cf323fc 100644 --- a/src/test/java/org/springframework/integration/aws/metadata/DynamoDbMetadataStoreTests.java +++ b/src/test/java/org/springframework/integration/aws/metadata/DynamoDbMetadataStoreTests.java @@ -54,10 +54,10 @@ services = "dynamodb") class DynamoDbMetadataStoreTests { - private static AmazonDynamoDBAsync DYNAMO_DB; - private static final String TEST_TABLE = "testMetadataStore"; + private static AmazonDynamoDBAsync DYNAMO_DB; + private static DynamoDbMetadataStore store; private final String file1 = "/remotepath/filesTodownload/file-1.txt"; diff --git a/src/test/java/org/springframework/integration/aws/outbound/S3MessageHandlerTests.java b/src/test/java/org/springframework/integration/aws/outbound/S3MessageHandlerTests.java index efacf2b5..f9b5c57d 100644 --- a/src/test/java/org/springframework/integration/aws/outbound/S3MessageHandlerTests.java +++ b/src/test/java/org/springframework/integration/aws/outbound/S3MessageHandlerTests.java @@ -111,8 +111,6 @@ @DirtiesContext public class S3MessageHandlerTests { - private static SpelExpressionParser PARSER = new SpelExpressionParser(); - // define the bucket and file names used throughout the test private static final String S3_BUCKET_NAME = "myBucket"; @@ -123,6 +121,8 @@ public class S3MessageHandlerTests { @TempDir static Path temporaryFolder; + private static SpelExpressionParser PARSER = new SpelExpressionParser(); + @Autowired private AmazonS3 amazonS3; diff --git a/src/test/resources/log4j2-test.xml b/src/test/resources/log4j2-test.xml index 0149189d..212f97a2 100644 --- a/src/test/resources/log4j2-test.xml +++ b/src/test/resources/log4j2-test.xml @@ -2,7 +2,7 @@ - + @@ -10,7 +10,7 @@ - +