From 7d873907bdfba38def0a2338861d244d40f7a55d Mon Sep 17 00:00:00 2001 From: Matthias Wesolowski Date: Thu, 11 Jun 2020 00:14:21 +0200 Subject: [PATCH 1/8] fix --- .../KinesisMessageDrivenChannelAdapter.java | 621 ++++++++++-------- ...nesisMessageDrivenChannelAdapterTests.java | 36 +- 2 files changed, 375 insertions(+), 282 deletions(-) diff --git a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java index e8bb6c50..5ecd01ac 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java +++ b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java @@ -44,6 +44,19 @@ import javax.annotation.Nullable; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.GetRecordsRequest; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; +import com.amazonaws.services.kinesis.model.LimitExceededException; +import com.amazonaws.services.kinesis.model.ListShardsRequest; +import com.amazonaws.services.kinesis.model.ListShardsResult; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.Shard; +import com.amazonaws.services.kinesis.model.ShardIteratorType; + import org.springframework.beans.factory.DisposableBean; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; @@ -70,23 +83,9 @@ import org.springframework.util.Assert; import org.springframework.util.StringUtils; -import com.amazonaws.services.kinesis.AmazonKinesis; -import com.amazonaws.services.kinesis.model.DescribeStreamRequest; -import com.amazonaws.services.kinesis.model.DescribeStreamResult; -import com.amazonaws.services.kinesis.model.ExpiredIteratorException; -import com.amazonaws.services.kinesis.model.GetRecordsRequest; -import com.amazonaws.services.kinesis.model.GetRecordsResult; -import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; -import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; -import com.amazonaws.services.kinesis.model.Record; -import com.amazonaws.services.kinesis.model.ResourceNotFoundException; -import com.amazonaws.services.kinesis.model.Shard; -import com.amazonaws.services.kinesis.model.ShardIteratorType; -import com.amazonaws.services.kinesis.model.StreamStatus; - /** - * The {@link MessageProducerSupport} implementation for receiving data from Amazon - * Kinesis stream(s). + * The {@link MessageProducerSupport} implementation for receiving data from Amazon Kinesis + * stream(s). * * @author Artem Bilan * @author Krzysztof Witkowski @@ -115,8 +114,10 @@ public class KinesisMessageDrivenChannelAdapter extends MessageProducerSupport private final ShardConsumerManager shardConsumerManager = new ShardConsumerManager(); - private final ExecutorService shardLocksExecutor = Executors.newSingleThreadExecutor(new CustomizableThreadFactory( - (getComponentName() == null ? "" : getComponentName()) + "-kinesis-shard-locks-")); + private final ExecutorService shardLocksExecutor = + Executors.newSingleThreadExecutor( + new CustomizableThreadFactory( + (getComponentName() == null ? "" : getComponentName()) + "-kinesis-shard-locks-")); private String consumerGroup = "SpringIntegration"; @@ -179,13 +180,16 @@ public KinesisMessageDrivenChannelAdapter(AmazonKinesis amazonKinesis, String... this.streams = Arrays.copyOf(streams, streams.length); } - public KinesisMessageDrivenChannelAdapter(AmazonKinesis amazonKinesis, KinesisShardOffset... shardOffsets) { + public KinesisMessageDrivenChannelAdapter( + AmazonKinesis amazonKinesis, KinesisShardOffset... shardOffsets) { Assert.notNull(amazonKinesis, "'amazonKinesis' must not be null."); Assert.notEmpty(shardOffsets, "'shardOffsets' must not be null."); Assert.noNullElements(shardOffsets, "'shardOffsets' must not contain null elements."); for (KinesisShardOffset shardOffset : shardOffsets) { - Assert.isTrue(StringUtils.hasText(shardOffset.getStream()) && StringUtils.hasText(shardOffset.getShard()), + Assert.isTrue( + StringUtils.hasText(shardOffset.getStream()) + && StringUtils.hasText(shardOffset.getShard()), "The 'shardOffsets' must be provided with particular 'stream' and 'shard' values."); this.shardOffsets.add(new KinesisShardOffset(shardOffset)); } @@ -225,8 +229,9 @@ public void setStreamInitialSequence(KinesisShardOffset streamInitialSequence) { } /** - * Specify a {@link Converter} to deserialize the {@code byte[]} from record's body. - * Can be {@code null} meaning no deserialization. + * Specify a {@link Converter} to deserialize the {@code byte[]} from record's body. Can be {@code + * null} meaning no deserialization. + * * @param converter the {@link Converter} to use or null */ public void setConverter(Converter converter) { @@ -245,6 +250,7 @@ public void setCheckpointMode(CheckpointMode checkpointMode) { /** * Sets the interval between 2 checkpoints. Only used when checkpointMode is periodic. + * * @param checkpointsInterval interval between 2 checkpoints (in milliseconds) * @since 2.2 */ @@ -253,8 +259,8 @@ public void setCheckpointsInterval(long checkpointsInterval) { } /** - * The maximum record to poll per on get-records request. Not greater then - * {@code 10000}. + * The maximum record to poll per on get-records request. Not greater then {@code 10000}. + * * @param recordsLimit the number of records to for per on get-records request. * @see GetRecordsRequest#setLimit */ @@ -282,11 +288,12 @@ public void setStartTimeout(int startTimeout) { } /** - * The maximum number of concurrent {@link ConsumerInvoker}s running. The - * {@link ShardConsumer}s are evenly distributed between {@link ConsumerInvoker}s. - * Messages from within the same shard will be processed sequentially. In other words - * each shard is tied with the particular thread. By default the concurrency is - * unlimited and shard is processed in the {@link #consumerExecutor} directly. + * The maximum number of concurrent {@link ConsumerInvoker}s running. The {@link ShardConsumer}s + * are evenly distributed between {@link ConsumerInvoker}s. Messages from within the same shard + * will be processed sequentially. In other words each shard is tied with the particular thread. + * By default the concurrency is unlimited and shard is processed in the {@link #consumerExecutor} + * directly. + * * @param concurrency the concurrency maximum number */ public void setConcurrency(int concurrency) { @@ -294,8 +301,9 @@ public void setConcurrency(int concurrency) { } /** - * The sleep interval in milliseconds used in the main loop between shards polling - * cycles. Defaults to {@code 1000}l minimum {@code 250}. + * The sleep interval in milliseconds used in the main loop between shards polling cycles. + * Defaults to {@code 1000}l minimum {@code 250}. + * * @param idleBetweenPolls the interval to sleep between shards polling cycles. */ public void setIdleBetweenPolls(int idleBetweenPolls) { @@ -303,8 +311,9 @@ public void setIdleBetweenPolls(int idleBetweenPolls) { } /** - * Specify an {@link InboundMessageMapper} to extract message headers embedded into - * the record data. + * Specify an {@link InboundMessageMapper} to extract message headers embedded into the record + * data. + * * @param embeddedHeadersMapper the {@link InboundMessageMapper} to use. * @since 2.0 */ @@ -313,8 +322,9 @@ public void setEmbeddedHeadersMapper(InboundMessageMapper embeddedHeader } /** - * Specify a {@link LockRegistry} for an exclusive access to provided streams. This is - * not used when shards-based configuration is provided. + * Specify a {@link LockRegistry} for an exclusive access to provided streams. This is not used + * when shards-based configuration is provided. + * * @param lockRegistry the {@link LockRegistry} to use. * @since 2.0 */ @@ -323,9 +333,9 @@ public void setLockRegistry(LockRegistry lockRegistry) { } /** - * Set to true to bind the source consumer record in the header named - * {@link IntegrationMessageHeaderAccessor#SOURCE_DATA}. Does not apply to batch - * listeners. + * Set to true to bind the source consumer record in the header named {@link + * IntegrationMessageHeaderAccessor#SOURCE_DATA}. Does not apply to batch listeners. + * * @param bindSourceRecord true to bind. * @since 2.2 */ @@ -338,12 +348,16 @@ protected void onInit() { super.onInit(); if (this.consumerExecutor == null) { - this.consumerExecutor = Executors.newCachedThreadPool(new CustomizableThreadFactory( - (getComponentName() == null ? "" : getComponentName()) + "-kinesis-consumer-")); + this.consumerExecutor = + Executors.newCachedThreadPool( + new CustomizableThreadFactory( + (getComponentName() == null ? "" : getComponentName()) + "-kinesis-consumer-")); } if (this.dispatcherExecutor == null) { - this.dispatcherExecutor = Executors.newCachedThreadPool(new CustomizableThreadFactory( - (getComponentName() == null ? "" : getComponentName()) + "-kinesis-dispatcher-")); + this.dispatcherExecutor = + Executors.newCachedThreadPool( + new CustomizableThreadFactory( + (getComponentName() == null ? "" : getComponentName()) + "-kinesis-dispatcher-")); } if (this.streams == null) { @@ -366,14 +380,19 @@ public void destroy() { @ManagedOperation public void stopConsumer(String stream, String shard) { - ShardConsumer shardConsumer = this.shardConsumers.remove(KinesisShardOffset.latest(stream, shard)); + ShardConsumer shardConsumer = + this.shardConsumers.remove(KinesisShardOffset.latest(stream, shard)); if (shardConsumer != null) { shardConsumer.stop(); } else { if (this.logger.isDebugEnabled()) { this.logger.debug( - "There is no ShardConsumer for shard [" + shard + "] in stream [" + shard + "] to stop."); + "There is no ShardConsumer for shard [" + + shard + + "] in stream [" + + shard + + "] to stop."); } } } @@ -410,19 +429,28 @@ public void resetCheckpointForShardToTrimHorizon(String stream, String shard) { } @ManagedOperation - public void resetCheckpointForShardToSequenceNumber(String stream, String shard, String sequenceNumber) { - restartShardConsumerForOffset(KinesisShardOffset.atSequenceNumber(stream, shard, sequenceNumber)); + public void resetCheckpointForShardToSequenceNumber( + String stream, String shard, String sequenceNumber) { + restartShardConsumerForOffset( + KinesisShardOffset.atSequenceNumber(stream, shard, sequenceNumber)); } @ManagedOperation public void resetCheckpointForShardAtTimestamp(String stream, String shard, long timestamp) { - restartShardConsumerForOffset(KinesisShardOffset.atTimestamp(stream, shard, new Date(timestamp))); + restartShardConsumerForOffset( + KinesisShardOffset.atTimestamp(stream, shard, new Date(timestamp))); } private void restartShardConsumerForOffset(KinesisShardOffset shardOffset) { - Assert.isTrue(this.shardOffsets.contains(shardOffset), - "The [" + KinesisMessageDrivenChannelAdapter.this + "] doesn't operate shard [" + shardOffset.getShard() - + "] for stream [" + shardOffset.getStream() + "]"); + Assert.isTrue( + this.shardOffsets.contains(shardOffset), + "The [" + + KinesisMessageDrivenChannelAdapter.this + + "] doesn't operate shard [" + + shardOffset.getShard() + + "] for stream [" + + shardOffset.getStream() + + "]"); if (logger.isDebugEnabled()) { logger.debug("Resetting consumer for [" + shardOffset + "]..."); @@ -454,10 +482,12 @@ public void resetCheckpoints() { @Override protected void doStart() { super.doStart(); - if (ListenerMode.batch.equals(this.listenerMode) && CheckpointMode.record.equals(this.checkpointMode)) { + if (ListenerMode.batch.equals(this.listenerMode) + && CheckpointMode.record.equals(this.checkpointMode)) { this.checkpointMode = CheckpointMode.batch; - logger.warn("The 'checkpointMode' is overridden from [CheckpointMode.record] to [CheckpointMode.batch] " - + "because it does not make sense in case of [ListenerMode.batch]."); + logger.warn( + "The 'checkpointMode' is overridden from [CheckpointMode.record] to [CheckpointMode.batch] " + + "because it does not make sense in case of [ListenerMode.batch]."); } if (this.streams != null) { @@ -499,6 +529,51 @@ private Collection shardConsumerSubset(int i) { } } + private List readShardList(String stream) { + return this.readShardList(stream, 0); + } + + private List readShardList(String stream, int retryCount) { + List shardList = new ArrayList<>(); + + if (retryCount > this.describeStreamRetries) { + throw new IllegalStateException( + "Kinesis could not read shards from stream with name:[" + stream + "] "); + } + + ListShardsRequest listShardsRequest = new ListShardsRequest().withStreamName(stream); + + try { + ListShardsResult listShardsResult = this.amazonKinesis.listShards(listShardsRequest); + + shardList.addAll(listShardsResult.getShards()); + + } + catch (LimitExceededException limitExceededException) { + + logger.info( + "Got LimitExceededException when listing stream [" + + stream + + "]. " + + "Backing off for [" + + this.describeStreamBackoff + + "] millis."); + + try { + Thread.sleep(this.describeStreamBackoff); + readShardList(stream, retryCount++); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IllegalStateException( + "The [describeStream] thread for the stream [" + stream + "] has been interrupted.", + ex); + } + } + + return shardList; + } + private void populateShardsForStreams() { this.shardOffsets.clear(); final CountDownLatch shardsGatherLatch = new CountDownLatch(this.streams.length); @@ -507,103 +582,89 @@ private void populateShardsForStreams() { } try { if (!shardsGatherLatch.await(this.startTimeout, TimeUnit.MILLISECONDS)) { - throw new IllegalStateException("The [ " + KinesisMessageDrivenChannelAdapter.this - + "] could not start during timeout: " + this.startTimeout); + throw new IllegalStateException( + "The [ " + + KinesisMessageDrivenChannelAdapter.this + + "] could not start during timeout: " + + this.startTimeout); } } catch (InterruptedException e) { throw new IllegalStateException( - "The [ " + KinesisMessageDrivenChannelAdapter.this + "] has been interrupted from start."); + "The [ " + + KinesisMessageDrivenChannelAdapter.this + + "] has been interrupted from start."); } } - private void populateShardsForStream(final String stream, final CountDownLatch shardsGatherLatch) { - this.dispatcherExecutor.execute(() -> { - try { - int describeStreamRetries = 0; - List shardsToConsume = new ArrayList<>(); + private List detectShardsToConsume(String stream) { + return detectShardsToConsume(stream, 0); + } - String exclusiveStartShardId = null; - while (true) { - DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest().withStreamName(stream) - .withExclusiveStartShardId(exclusiveStartShardId); + private List detectShardsToConsume(String stream, int retry) { + List shardsToConsume = new ArrayList<>(); - DescribeStreamResult describeStreamResult = null; - // Call DescribeStream, with backoff and retries (if we get - // LimitExceededException). - try { - describeStreamResult = this.amazonKinesis.describeStream(describeStreamRequest); - } - catch (Exception e) { - logger.info("Got an exception when describing stream [" + stream + "]. " + "Backing off for [" - + this.describeStreamBackoff + "] millis.", e); - } + List shards = readShardList(stream); - if (describeStreamResult == null || !StreamStatus.ACTIVE.toString() - .equals(describeStreamResult.getStreamDescription().getStreamStatus())) { + try { + for (Shard shard : shards) { + String key = buildCheckpointKeyForShard(stream, shard.getShardId()); + String endingSequenceNumber = shard.getSequenceNumberRange().getEndingSequenceNumber(); + if (endingSequenceNumber != null) { + String checkpoint = this.checkpointStore.get(key); + + boolean skipClosedShard = checkpoint != null && new BigInteger(endingSequenceNumber) + .compareTo(new BigInteger(checkpoint)) <= 0; + + if (logger.isTraceEnabled()) { + logger.trace("The shard [" + shard + "] in stream [" + stream + + "] is closed CLOSED with endingSequenceNumber [" + endingSequenceNumber + + "].\nThe last processed checkpoint is [" + checkpoint + "]." + + (skipClosedShard ? "\nThe shard will be skipped." : "")); + } - if (describeStreamRetries++ > this.describeStreamRetries) { - ResourceNotFoundException resourceNotFoundException = new ResourceNotFoundException( - "The stream [" + stream + "] isn't ACTIVE or doesn't exist."); - resourceNotFoundException.setServiceName("Kinesis"); - throw resourceNotFoundException; - } - try { - Thread.sleep(this.describeStreamBackoff); - continue; - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException( - "The [describeStream] thread for the stream [" + stream + "] has been interrupted.", - e); - } + if (skipClosedShard) { + // Skip CLOSED shard which has been read before + // according a checkpoint + continue; } + } - List shards = describeStreamResult.getStreamDescription().getShards(); + shardsToConsume.add(shard); + } + } + catch (Exception e) { + String exceptionMessage = "Got an exception when processing shards in stream [" + stream + "]"; + logger.info(exceptionMessage + ".\n Retrying... ", e); + if (retry > 5) { + throw new IllegalStateException("Error processing shards in stream [\" + stream + \"].", e); + } + //Retry + detectShardsToConsume(stream, retry++); + sleep(this.describeStreamBackoff, new IllegalStateException(exceptionMessage), false); + } - try { - for (Shard shard : shards) { - String key = buildCheckpointKeyForShard(stream, shard.getShardId()); - String endingSequenceNumber = shard.getSequenceNumberRange().getEndingSequenceNumber(); - if (endingSequenceNumber != null) { - String checkpoint = this.checkpointStore.get(key); - - boolean skipClosedShard = checkpoint != null && new BigInteger(endingSequenceNumber) - .compareTo(new BigInteger(checkpoint)) <= 0; - - if (logger.isTraceEnabled()) { - logger.trace("The shard [" + shard + "] in stream [" + stream - + "] is closed CLOSED with endingSequenceNumber [" + endingSequenceNumber - + "].\nThe last processed checkpoint is [" + checkpoint + "]." - + (skipClosedShard ? "\nThe shard will be skipped." : "")); - } + return shardsToConsume; - if (skipClosedShard) { - // Skip CLOSED shard which has been read before - // according a checkpoint - continue; - } - } + } - shardsToConsume.add(shard); - } - } - catch (Exception e) { - logger.info( - "Got an exception when processing shards in stream [" + stream + "].\n" + "Retrying...", - e); - continue; - } + private void sleep(long sleepAmount, RuntimeException error, boolean interruptThread) { + try { + Thread.sleep(sleepAmount); + } + catch (Exception e) { + if (interruptThread) { + Thread.currentThread().interrupt(); + } + logger.error(error.getMessage(), e); + throw error; + } + } - if (describeStreamResult.getStreamDescription().getHasMoreShards()) { - exclusiveStartShardId = shards.get(shards.size() - 1).getShardId(); - describeStreamRetries = 0; - } - else { - break; - } - } + private void populateShardsForStream(final String stream, final CountDownLatch shardsGatherLatch) { + this.dispatcherExecutor.execute(() -> { + try { + List shardsToConsume = detectShardsToConsume(stream); for (Shard shard : shardsToConsume) { KinesisShardOffset shardOffset = new KinesisShardOffset(this.streamInitialSequence); @@ -644,7 +705,8 @@ private void populateConsumer(KinesisShardOffset shardOffset) { if (this.active) { synchronized (this.consumerInvokers) { if (this.consumerInvokers.size() < this.maxConcurrency) { - ConsumerInvoker consumerInvoker = new ConsumerInvoker(Collections.singleton(shardConsumer)); + ConsumerInvoker consumerInvoker = + new ConsumerInvoker(Collections.singleton(shardConsumer)); this.consumerInvokers.add(consumerInvoker); this.consumerExecutor.execute(consumerInvoker); } @@ -692,8 +754,9 @@ private void stopConsumers() { } /** - * If there's an error channel, we create a new attributes holder here. Then set the - * attributes for use by the {@link ErrorMessageStrategy}. + * If there's an error channel, we create a new attributes holder here. Then set the attributes + * for use by the {@link ErrorMessageStrategy}. + * * @param record the Kinesis record to use. * @param message the Spring Messaging message to use. */ @@ -706,7 +769,8 @@ private void setAttributesIfNecessary(Object record, Message message) { } @Override - protected AttributeAccessor getErrorMessageAttributes(org.springframework.messaging.Message message) { + protected AttributeAccessor getErrorMessageAttributes( + org.springframework.messaging.Message message) { AttributeAccessor attributes = attributesHolder.get(); if (attributes == null) { return super.getErrorMessageAttributes(message); @@ -718,8 +782,13 @@ protected AttributeAccessor getErrorMessageAttributes(org.springframework.messag @Override public String toString() { - return "KinesisMessageDrivenChannelAdapter{" + "shardOffsets=" + this.shardOffsets + ", consumerGroup='" - + this.consumerGroup + '\'' + '}'; + return "KinesisMessageDrivenChannelAdapter{" + + "shardOffsets=" + + this.shardOffsets + + ", consumerGroup='" + + this.consumerGroup + + '\'' + + '}'; } private final class ConsumerDispatcher implements SchedulingAwareRunnable { @@ -741,8 +810,8 @@ public void run() { } } - Iterator iterator = KinesisMessageDrivenChannelAdapter.this.shardConsumers.values() - .iterator(); + Iterator iterator = + KinesisMessageDrivenChannelAdapter.this.shardConsumers.values().iterator(); while (iterator.hasNext()) { ShardConsumer shardConsumer = iterator.next(); shardConsumer.execute(); @@ -763,14 +832,8 @@ public void run() { } } } - - try { - Thread.sleep(KinesisMessageDrivenChannelAdapter.this.idleBetweenPolls); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException("ConsumerDispatcher Thread [" + this + "] has been interrupted", e); - } + String errorMsg = "ConsumerDispatcher Thread [" + this + "] has been interrupted"; + sleep(KinesisMessageDrivenChannelAdapter.this.idleBetweenPolls, new IllegalStateException(errorMsg), true); } } @@ -778,7 +841,6 @@ public void run() { public boolean isLongLived() { return true; } - } private final class ShardConsumer { @@ -806,8 +868,8 @@ private final class ShardConsumer { ShardConsumer(KinesisShardOffset shardOffset) { this.shardOffset = new KinesisShardOffset(shardOffset); this.key = buildCheckpointKeyForShard(shardOffset.getStream(), shardOffset.getShard()); - this.checkpointer = new ShardCheckpointer(KinesisMessageDrivenChannelAdapter.this.checkpointStore, - this.key); + this.checkpointer = + new ShardCheckpointer(KinesisMessageDrivenChannelAdapter.this.checkpointStore, this.key); } void setNotifier(Runnable notifier) { @@ -832,36 +894,39 @@ void close() { void execute() { if (this.task == null) { switch (this.state) { - case NEW: case EXPIRED: - this.task = () -> { - try { - if (this.shardOffset.isReset()) { - this.checkpointer.remove(); - } - else { - String checkpoint = this.checkpointer.getCheckpoint(); - if (checkpoint != null) { - this.shardOffset.setSequenceNumber(checkpoint); - this.shardOffset.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER); + this.task = + () -> { + try { + if (this.shardOffset.isReset()) { + this.checkpointer.remove(); + } + else { + String checkpoint = this.checkpointer.getCheckpoint(); + if (checkpoint != null) { + this.shardOffset.setSequenceNumber(checkpoint); + this.shardOffset.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER); + } + } + if (logger.isInfoEnabled() && this.state == ConsumerState.NEW) { + logger.info("The [" + this + "] has been started."); + } + GetShardIteratorRequest shardIteratorRequest = + this.shardOffset.toShardIteratorRequest(); + this.shardIterator = + KinesisMessageDrivenChannelAdapter.this + .amazonKinesis + .getShardIterator(shardIteratorRequest) + .getShardIterator(); + if (ConsumerState.STOP != this.state) { + this.state = ConsumerState.CONSUME; + } } - } - if (logger.isInfoEnabled() && this.state == ConsumerState.NEW) { - logger.info("The [" + this + "] has been started."); - } - GetShardIteratorRequest shardIteratorRequest = - this.shardOffset.toShardIteratorRequest(); - this.shardIterator = KinesisMessageDrivenChannelAdapter.this.amazonKinesis - .getShardIterator(shardIteratorRequest).getShardIterator(); - if (ConsumerState.STOP != this.state) { - this.state = ConsumerState.CONSUME; - } - } - finally { - this.task = null; - } - }; + finally { + this.task = null; + } + }; break; case CONSUME: @@ -878,9 +943,12 @@ void execute() { case STOP: if (this.shardIterator == null) { if (logger.isInfoEnabled()) { - logger.info("Stopping the [" + this + "] on the checkpoint [" - + this.checkpointer.getCheckpoint() - + "] because the shard has been CLOSED and exhausted."); + logger.info( + "Stopping the [" + + this + + "] on the checkpoint [" + + this.checkpointer.getCheckpoint() + + "] because the shard has been CLOSED and exhausted."); } } else { @@ -934,18 +1002,22 @@ private Runnable processTask() { // Shard is closed: nothing to consume any more. // Resharding is possible. if (KinesisMessageDrivenChannelAdapter.this.applicationEventPublisher != null) { - KinesisMessageDrivenChannelAdapter.this.applicationEventPublisher - .publishEvent(new KinesisShardEndedEvent(KinesisMessageDrivenChannelAdapter.this, - this.key)); + KinesisMessageDrivenChannelAdapter.this.applicationEventPublisher.publishEvent( + new KinesisShardEndedEvent(KinesisMessageDrivenChannelAdapter.this, this.key)); } stop(); } if (ConsumerState.STOP != this.state && result.getRecords().isEmpty()) { if (logger.isDebugEnabled()) { - logger.debug("No records for [" + this + "] on sequenceNumber [" - + this.checkpointer.getLastCheckpointValue() + "]. Suspend consuming for [" - + KinesisMessageDrivenChannelAdapter.this.consumerBackoff + "] milliseconds."); + logger.debug( + "No records for [" + + this + + "] on sequenceNumber [" + + this.checkpointer.getLastCheckpointValue() + + "]. Suspend consuming for [" + + KinesisMessageDrivenChannelAdapter.this.consumerBackoff + + "] milliseconds."); } prepareSleepState(); } @@ -966,15 +1038,21 @@ private GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) { // Lets acquire iterator again (using checkpointer for iterator start // sequence number). if (logger.isInfoEnabled()) { - logger.info("Shard iterator for [" + ShardConsumer.this + "] expired.\n" - + "A new one will be started from the check pointed sequence number."); + logger.info( + "Shard iterator for [" + + ShardConsumer.this + + "] expired.\n" + + "A new one will be started from the check pointed sequence number."); } this.state = ConsumerState.EXPIRED; } catch (ProvisionedThroughputExceededException e) { if (logger.isWarnEnabled()) { - logger.warn("GetRecords request throttled for [" + ShardConsumer.this + "] with the reason: " - + e.getErrorMessage()); + logger.warn( + "GetRecords request throttled for [" + + ShardConsumer.this + + "] with the reason: " + + e.getErrorMessage()); } // We are throttled, so let's sleep prepareSleepState(); @@ -984,8 +1062,8 @@ private GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) { } private void prepareSleepState() { - ShardConsumer.this.sleepUntil = System.currentTimeMillis() - + KinesisMessageDrivenChannelAdapter.this.consumerBackoff; + ShardConsumer.this.sleepUntil = + System.currentTimeMillis() + KinesisMessageDrivenChannelAdapter.this.consumerBackoff; ShardConsumer.this.state = ConsumerState.SLEEP; } @@ -1015,7 +1093,8 @@ private void processSingleRecord(Record record) { } private void processMultipleRecords(List records) { - AbstractIntegrationMessageBuilder messageBuilder = getMessageBuilderFactory().withPayload(records); + AbstractIntegrationMessageBuilder messageBuilder = + getMessageBuilderFactory().withPayload(records); if (KinesisMessageDrivenChannelAdapter.this.embeddedHeadersMapper != null) { List> payload = records.stream() @@ -1029,18 +1108,23 @@ else if (KinesisMessageDrivenChannelAdapter.this.converter != null) { final List partitionKeys = new ArrayList<>(); final List sequenceNumbers = new ArrayList<>(); - List payload = records.stream() - .map(r -> { - partitionKeys.add(r.getPartitionKey()); - sequenceNumbers.add(r.getSequenceNumber()); - - return KinesisMessageDrivenChannelAdapter.this.converter.convert(r.getData().array()); - }) - .collect(Collectors.toList()); + List payload = + records.stream() + .map( + r -> { + partitionKeys.add(r.getPartitionKey()); + sequenceNumbers.add(r.getSequenceNumber()); + + return KinesisMessageDrivenChannelAdapter.this.converter.convert( + r.getData().array()); + }) + .collect(Collectors.toList()); - messageBuilder = getMessageBuilderFactory().withPayload(payload) - .setHeader(AwsHeaders.RECEIVED_PARTITION_KEY, partitionKeys) - .setHeader(AwsHeaders.RECEIVED_SEQUENCE_NUMBER, sequenceNumbers); + messageBuilder = + getMessageBuilderFactory() + .withPayload(payload) + .setHeader(AwsHeaders.RECEIVED_PARTITION_KEY, partitionKeys) + .setHeader(AwsHeaders.RECEIVED_SEQUENCE_NUMBER, sequenceNumbers); } performSend(messageBuilder, records); @@ -1052,8 +1136,9 @@ private AbstractIntegrationMessageBuilder prepareMessageForRecord(Record if (KinesisMessageDrivenChannelAdapter.this.embeddedHeadersMapper != null) { try { - messageToUse = KinesisMessageDrivenChannelAdapter.this.embeddedHeadersMapper - .toMessage((byte[]) payload); + messageToUse = + KinesisMessageDrivenChannelAdapter.this.embeddedHeadersMapper.toMessage( + (byte[]) payload); payload = messageToUse.getPayload(); } @@ -1066,9 +1151,11 @@ private AbstractIntegrationMessageBuilder prepareMessageForRecord(Record payload = KinesisMessageDrivenChannelAdapter.this.converter.convert((byte[]) payload); } - AbstractIntegrationMessageBuilder messageBuilder = getMessageBuilderFactory().withPayload(payload) - .setHeader(AwsHeaders.RECEIVED_PARTITION_KEY, record.getPartitionKey()) - .setHeader(AwsHeaders.RECEIVED_SEQUENCE_NUMBER, record.getSequenceNumber()); + AbstractIntegrationMessageBuilder messageBuilder = + getMessageBuilderFactory() + .withPayload(payload) + .setHeader(AwsHeaders.RECEIVED_PARTITION_KEY, record.getPartitionKey()) + .setHeader(AwsHeaders.RECEIVED_SEQUENCE_NUMBER, record.getSequenceNumber()); if (KinesisMessageDrivenChannelAdapter.this.bindSourceRecord) { messageBuilder.setHeader(IntegrationMessageHeaderAccessor.SOURCE_DATA, record); @@ -1081,8 +1168,10 @@ private AbstractIntegrationMessageBuilder prepareMessageForRecord(Record return messageBuilder; } - private void performSend(AbstractIntegrationMessageBuilder messageBuilder, Object rawRecord) { - messageBuilder.setHeader(AwsHeaders.RECEIVED_STREAM, this.shardOffset.getStream()) + private void performSend( + AbstractIntegrationMessageBuilder messageBuilder, Object rawRecord) { + messageBuilder + .setHeader(AwsHeaders.RECEIVED_STREAM, this.shardOffset.getStream()) .setHeader(AwsHeaders.SHARD, this.shardOffset.getShard()); if (CheckpointMode.manual.equals(KinesisMessageDrivenChannelAdapter.this.checkpointMode)) { @@ -1095,8 +1184,15 @@ private void performSend(AbstractIntegrationMessageBuilder messageBuilder, Ob sendMessage(messageToSend); } catch (Exception e) { - logger.info("Got an exception during sending a '" + messageToSend + "'" + "\nfor the '" + rawRecord - + "'.\n" + "Consider to use 'errorChannel' flow for the compensation logic.", e); + logger.info( + "Got an exception during sending a '" + + messageToSend + + "'" + + "\nfor the '" + + rawRecord + + "'.\n" + + "Consider to use 'errorChannel' flow for the compensation logic.", + e); } } @@ -1122,7 +1218,8 @@ private void checkpointIfPeriodicMode(@Nullable Record record) { this.checkpointer.checkpoint(record.getSequenceNumber()); } this.nextCheckpointTimeInMillis = - System.currentTimeMillis() + KinesisMessageDrivenChannelAdapter.this.checkpointsInterval; + System.currentTimeMillis() + + KinesisMessageDrivenChannelAdapter.this.checkpointsInterval; } } @@ -1130,13 +1227,14 @@ private void checkpointIfPeriodicMode(@Nullable Record record) { public String toString() { return "ShardConsumer{" + "shardOffset=" + this.shardOffset + ", state=" + this.state + '}'; } - } private enum ConsumerState { - - NEW, EXPIRED, CONSUME, SLEEP, STOP - + NEW, + EXPIRED, + CONSUME, + SLEEP, + STOP } private final class ConsumerInvoker implements SchedulingAwareRunnable { @@ -1171,7 +1269,8 @@ public void run() { catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new IllegalStateException("ConsumerInvoker thread [" + this + "] has been interrupted", e); + throw new IllegalStateException( + "ConsumerInvoker thread [" + this + "] has been interrupted", e); } for (Iterator iterator = this.consumers.iterator(); iterator.hasNext(); ) { @@ -1185,8 +1284,13 @@ public void run() { shardConsumer.task.run(); } catch (Exception e) { - logger.info("Got an exception " + e + " during [" + shardConsumer + "] task invocation" - + ".\nProcess will be retried on the next iteration."); + logger.info( + "Got an exception " + + e + + " during [" + + shardConsumer + + "] task invocation" + + ".\nProcess will be retried on the next iteration."); } } } @@ -1209,12 +1313,12 @@ public void run() { public boolean isLongLived() { return true; } - } private final class ShardConsumerManager implements SchedulingAwareRunnable { - private final Map shardOffsetsToConsumer = new ConcurrentHashMap<>(); + private final Map shardOffsetsToConsumer = + new ConcurrentHashMap<>(); private final Map locks = new HashMap<>(); @@ -1224,7 +1328,8 @@ private final class ShardConsumerManager implements SchedulingAwareRunnable { } void addShardToConsume(KinesisShardOffset kinesisShardOffset) { - String lockKey = buildCheckpointKeyForShard(kinesisShardOffset.getStream(), kinesisShardOffset.getShard()); + String lockKey = + buildCheckpointKeyForShard(kinesisShardOffset.getStream(), kinesisShardOffset.getShard()); this.shardOffsetsToConsumer.put(lockKey, kinesisShardOffset); } @@ -1237,31 +1342,34 @@ public void run() { try { while (!Thread.currentThread().isInterrupted()) { - this.shardOffsetsToConsumer.entrySet().removeIf(entry -> { - boolean remove = true; - if (KinesisMessageDrivenChannelAdapter.this.lockRegistry != null) { - String key = entry.getKey(); - Lock lock = KinesisMessageDrivenChannelAdapter.this.lockRegistry.obtain(key); - try { - if (lock.tryLock()) { - this.locks.put(key, lock); - } - else { - remove = false; - } - - } - catch (Exception e) { - logger.error("Error during locking: " + lock, e); - } - } - - if (remove) { - populateConsumer(entry.getValue()); - } - - return remove; - }); + this.shardOffsetsToConsumer + .entrySet() + .removeIf( + entry -> { + boolean remove = true; + if (KinesisMessageDrivenChannelAdapter.this.lockRegistry != null) { + String key = entry.getKey(); + Lock lock = KinesisMessageDrivenChannelAdapter.this.lockRegistry.obtain(key); + try { + if (lock.tryLock()) { + this.locks.put(key, lock); + } + else { + remove = false; + } + + } + catch (Exception e) { + logger.error("Error during locking: " + lock, e); + } + } + + if (remove) { + populateConsumer(entry.getValue()); + } + + return remove; + }); while (KinesisMessageDrivenChannelAdapter.this.lockRegistry != null) { String lockKey = this.forUnlocking.poll(); @@ -1281,14 +1389,7 @@ public void run() { } } - try { - Thread.sleep(250); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException( - "ShardConsumerManager Thread [" + this + "] has been interrupted", e); - } + sleep(250, new IllegalStateException("ShardConsumerManager Thread [" + this + "] has been interrupted"), true); } } finally { @@ -1311,7 +1412,5 @@ public void run() { public boolean isLongLived() { return true; } - } - } diff --git a/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java b/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java index 3fdae5ae..a22ffb99 100644 --- a/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java +++ b/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -60,18 +60,16 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import com.amazonaws.services.kinesis.AmazonKinesis; -import com.amazonaws.services.kinesis.model.DescribeStreamRequest; -import com.amazonaws.services.kinesis.model.DescribeStreamResult; import com.amazonaws.services.kinesis.model.ExpiredIteratorException; import com.amazonaws.services.kinesis.model.GetRecordsRequest; import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.GetShardIteratorResult; +import com.amazonaws.services.kinesis.model.ListShardsRequest; +import com.amazonaws.services.kinesis.model.ListShardsResult; import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.SequenceNumberRange; import com.amazonaws.services.kinesis.model.Shard; -import com.amazonaws.services.kinesis.model.StreamDescription; -import com.amazonaws.services.kinesis.model.StreamStatus; /** * @author Artem Bilan @@ -109,7 +107,7 @@ void setup() { } @Test - @SuppressWarnings({ "unchecked", "rawtypes" }) + @SuppressWarnings({"unchecked", "rawtypes"}) void testKinesisMessageDrivenChannelAdapter() { this.kinesisMessageDrivenChannelAdapter.start(); final Set shardOffsets = TestUtils.getPropertyValue(this.kinesisMessageDrivenChannelAdapter, @@ -214,15 +212,15 @@ void testResharding() throws InterruptedException { assertThat(n).isLessThan(100); // When resharding happens the describeStream() is performed again - verify(this.amazonKinesisForResharding, atLeast(1)).describeStream(any(DescribeStreamRequest.class)); + verify(this.amazonKinesisForResharding, atLeast(1)).listShards(any(ListShardsRequest.class)); this.reshardingChannelAdapter.stop(); KinesisShardEndedEvent kinesisShardEndedEvent = this.config.shardEndedEventReference.get(); assertThat(kinesisShardEndedEvent).isNotNull() - .extracting(KinesisShardEndedEvent::getShardKey) - .isEqualTo("SpringIntegration:streamForResharding:closedShard"); + .extracting(KinesisShardEndedEvent::getShardKey) + .isEqualTo("SpringIntegration:streamForResharding:closedShard"); } @Configuration @@ -233,15 +231,13 @@ public static class Config { public AmazonKinesis amazonKinesis() { AmazonKinesis amazonKinesis = mock(AmazonKinesis.class); - given(amazonKinesis.describeStream(new DescribeStreamRequest().withStreamName(STREAM1))).willReturn( - new DescribeStreamResult().withStreamDescription( - new StreamDescription().withStreamName(STREAM1).withStreamStatus(StreamStatus.UPDATING)), - new DescribeStreamResult().withStreamDescription(new StreamDescription().withStreamName(STREAM1) - .withStreamStatus(StreamStatus.ACTIVE).withHasMoreShards(false) + given(amazonKinesis.listShards(new ListShardsRequest().withStreamName(STREAM1))).willReturn( + new ListShardsResult() .withShards(new Shard().withShardId("1").withSequenceNumberRange(new SequenceNumberRange()), new Shard().withShardId("2").withSequenceNumberRange(new SequenceNumberRange()), new Shard().withShardId("3").withSequenceNumberRange( - new SequenceNumberRange().withEndingSequenceNumber("1"))))); + new SequenceNumberRange().withEndingSequenceNumber("1"))) + ); String shard1Iterator1 = "shard1Iterator1"; String shard1Iterator2 = "shard1Iterator2"; @@ -329,12 +325,10 @@ public PollableChannel kinesisChannel() { public AmazonKinesis amazonKinesisForResharding() { AmazonKinesis amazonKinesis = mock(AmazonKinesis.class); - given(amazonKinesis.describeStream(new DescribeStreamRequest().withStreamName(STREAM_FOR_RESHARDING))) - .willReturn(new DescribeStreamResult() - .withStreamDescription(new StreamDescription().withStreamName(STREAM_FOR_RESHARDING) - .withStreamStatus(StreamStatus.ACTIVE).withHasMoreShards(false) - .withShards(new Shard().withShardId("closedShard").withSequenceNumberRange( - new SequenceNumberRange().withEndingSequenceNumber("1"))))); + given(amazonKinesis.listShards(new ListShardsRequest().withStreamName(STREAM_FOR_RESHARDING))) + .willReturn(new ListShardsResult() + .withShards(new Shard().withShardId("closedShard").withSequenceNumberRange( + new SequenceNumberRange().withEndingSequenceNumber("1")))); String shard1Iterator1 = "shard1Iterator1"; From 8477d38b90f877a359bfe6058bee5ead44cfde0a Mon Sep 17 00:00:00 2001 From: Matthias Wesolowski Date: Thu, 11 Jun 2020 01:08:25 +0200 Subject: [PATCH 2/8] checkstyle fixes --- .../aws/config/xml/AwsParserUtils.java | 2 +- .../xml/S3OutboundChannelAdapterParser.java | 2 +- .../config/xml/S3OutboundGatewayParser.java | 2 +- .../xml/SnsInboundChannelAdapterParser.java | 2 +- .../xml/SnsOutboundChannelAdapterParser.java | 2 +- .../SqsMessageDrivenChannelAdapterParser.java | 2 +- .../xml/SqsOutboundChannelAdapterParser.java | 2 +- .../inbound/S3InboundFileSynchronizer.java | 2 +- ...InboundFileSynchronizingMessageSource.java | 2 +- .../aws/inbound/S3StreamingMessageSource.java | 2 +- .../aws/inbound/SnsInboundChannelAdapter.java | 2 +- .../SqsMessageDrivenChannelAdapter.java | 2 +- .../KclMessageDrivenChannelAdapter.java | 2 +- .../KinesisMessageDrivenChannelAdapter.java | 50 +++--- .../inbound/kinesis/KinesisShardOffset.java | 146 +++++++++--------- .../inbound/kinesis/ShardCheckpointer.java | 2 +- .../aws/metadata/DynamoDbMetadataStore.java | 2 +- .../aws/outbound/KinesisMessageHandler.java | 2 +- .../aws/outbound/KplMessageHandler.java | 2 +- ...AbstractMessageAttributesHeaderMapper.java | 6 +- .../support/AwsRequestFailureException.java | 2 +- .../integration/aws/support/S3FileInfo.java | 2 +- .../aws/support/S3RemoteFileTemplate.java | 2 +- .../aws/support/SnsBodyBuilder.java | 2 +- .../aws/support/SqsHeaderMapper.java | 2 +- .../S3PersistentAcceptOnceFileListFilter.java | 2 +- .../filters/S3RegexPatternFileListFilter.java | 2 +- .../S3SimplePatternFileListFilter.java | 2 +- .../aws/EnvironmentHostNameResolver.java | 2 +- .../aws/ExtendedDockerTestUtils.java | 2 +- .../aws/LocalStackSslEnvironmentProvider.java | 10 +- ...nesisMessageDrivenChannelAdapterTests.java | 4 +- .../S3StreamingChannelAdapterTests.java | 6 +- .../SnsInboundChannelAdapterTests.java | 2 +- .../SqsMessageDrivenChannelAdapterTests.java | 4 +- .../aws/kinesis/KinesisIntegrationTests.java | 2 +- .../aws/kinesis/KplKclIntegrationTests.java | 4 +- ...amoDbLockRegistryLeaderInitiatorTests.java | 2 +- .../aws/lock/DynamoDbLockRegistryTests.java | 6 +- .../metadata/DynamoDbMetadataStoreTests.java | 6 +- .../outbound/KinesisMessageHandlerTests.java | 2 +- .../KinesisProducingMessageHandlerTests.java | 2 +- .../aws/outbound/S3MessageHandlerTests.java | 6 +- .../aws/outbound/SnsMessageBuilderTests.java | 2 +- .../aws/outbound/SnsMessageHandlerTests.java | 2 +- .../aws/outbound/SqsMessageHandlerTests.java | 2 +- src/test/resources/log4j2-test.xml | 4 +- 47 files changed, 161 insertions(+), 161 deletions(-) diff --git a/src/main/java/org/springframework/integration/aws/config/xml/AwsParserUtils.java b/src/main/java/org/springframework/integration/aws/config/xml/AwsParserUtils.java index 92e33eec..6d28ae3a 100644 --- a/src/main/java/org/springframework/integration/aws/config/xml/AwsParserUtils.java +++ b/src/main/java/org/springframework/integration/aws/config/xml/AwsParserUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/config/xml/S3OutboundChannelAdapterParser.java b/src/main/java/org/springframework/integration/aws/config/xml/S3OutboundChannelAdapterParser.java index 92e384ce..53e95d50 100644 --- a/src/main/java/org/springframework/integration/aws/config/xml/S3OutboundChannelAdapterParser.java +++ b/src/main/java/org/springframework/integration/aws/config/xml/S3OutboundChannelAdapterParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/config/xml/S3OutboundGatewayParser.java b/src/main/java/org/springframework/integration/aws/config/xml/S3OutboundGatewayParser.java index be437160..b5581518 100644 --- a/src/main/java/org/springframework/integration/aws/config/xml/S3OutboundGatewayParser.java +++ b/src/main/java/org/springframework/integration/aws/config/xml/S3OutboundGatewayParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/config/xml/SnsInboundChannelAdapterParser.java b/src/main/java/org/springframework/integration/aws/config/xml/SnsInboundChannelAdapterParser.java index 15a85338..ed1858b0 100644 --- a/src/main/java/org/springframework/integration/aws/config/xml/SnsInboundChannelAdapterParser.java +++ b/src/main/java/org/springframework/integration/aws/config/xml/SnsInboundChannelAdapterParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2019 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/config/xml/SnsOutboundChannelAdapterParser.java b/src/main/java/org/springframework/integration/aws/config/xml/SnsOutboundChannelAdapterParser.java index 0bf1576c..1a25ae29 100644 --- a/src/main/java/org/springframework/integration/aws/config/xml/SnsOutboundChannelAdapterParser.java +++ b/src/main/java/org/springframework/integration/aws/config/xml/SnsOutboundChannelAdapterParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/config/xml/SqsMessageDrivenChannelAdapterParser.java b/src/main/java/org/springframework/integration/aws/config/xml/SqsMessageDrivenChannelAdapterParser.java index 1e437fdd..894f9362 100644 --- a/src/main/java/org/springframework/integration/aws/config/xml/SqsMessageDrivenChannelAdapterParser.java +++ b/src/main/java/org/springframework/integration/aws/config/xml/SqsMessageDrivenChannelAdapterParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/config/xml/SqsOutboundChannelAdapterParser.java b/src/main/java/org/springframework/integration/aws/config/xml/SqsOutboundChannelAdapterParser.java index cdaac23f..974dc0b9 100644 --- a/src/main/java/org/springframework/integration/aws/config/xml/SqsOutboundChannelAdapterParser.java +++ b/src/main/java/org/springframework/integration/aws/config/xml/SqsOutboundChannelAdapterParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2019 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/inbound/S3InboundFileSynchronizer.java b/src/main/java/org/springframework/integration/aws/inbound/S3InboundFileSynchronizer.java index dbce24a9..658aa079 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/S3InboundFileSynchronizer.java +++ b/src/main/java/org/springframework/integration/aws/inbound/S3InboundFileSynchronizer.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/inbound/S3InboundFileSynchronizingMessageSource.java b/src/main/java/org/springframework/integration/aws/inbound/S3InboundFileSynchronizingMessageSource.java index cc848ea8..02b14de4 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/S3InboundFileSynchronizingMessageSource.java +++ b/src/main/java/org/springframework/integration/aws/inbound/S3InboundFileSynchronizingMessageSource.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/inbound/S3StreamingMessageSource.java b/src/main/java/org/springframework/integration/aws/inbound/S3StreamingMessageSource.java index 4a14bbac..1ba7a4ee 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/S3StreamingMessageSource.java +++ b/src/main/java/org/springframework/integration/aws/inbound/S3StreamingMessageSource.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/inbound/SnsInboundChannelAdapter.java b/src/main/java/org/springframework/integration/aws/inbound/SnsInboundChannelAdapter.java index dad30245..db7881e6 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/SnsInboundChannelAdapter.java +++ b/src/main/java/org/springframework/integration/aws/inbound/SnsInboundChannelAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/inbound/SqsMessageDrivenChannelAdapter.java b/src/main/java/org/springframework/integration/aws/inbound/SqsMessageDrivenChannelAdapter.java index 5f9c1216..e0413f88 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/SqsMessageDrivenChannelAdapter.java +++ b/src/main/java/org/springframework/integration/aws/inbound/SqsMessageDrivenChannelAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java index 5bded3d0..60136614 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java +++ b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 the original author or authors. + * Copyright 2019-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java index 5ecd01ac..db4bc0e7 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java +++ b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java @@ -44,19 +44,6 @@ import javax.annotation.Nullable; -import com.amazonaws.services.kinesis.AmazonKinesis; -import com.amazonaws.services.kinesis.model.ExpiredIteratorException; -import com.amazonaws.services.kinesis.model.GetRecordsRequest; -import com.amazonaws.services.kinesis.model.GetRecordsResult; -import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; -import com.amazonaws.services.kinesis.model.LimitExceededException; -import com.amazonaws.services.kinesis.model.ListShardsRequest; -import com.amazonaws.services.kinesis.model.ListShardsResult; -import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; -import com.amazonaws.services.kinesis.model.Record; -import com.amazonaws.services.kinesis.model.Shard; -import com.amazonaws.services.kinesis.model.ShardIteratorType; - import org.springframework.beans.factory.DisposableBean; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; @@ -83,6 +70,19 @@ import org.springframework.util.Assert; import org.springframework.util.StringUtils; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.GetRecordsRequest; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; +import com.amazonaws.services.kinesis.model.LimitExceededException; +import com.amazonaws.services.kinesis.model.ListShardsRequest; +import com.amazonaws.services.kinesis.model.ListShardsResult; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.Shard; +import com.amazonaws.services.kinesis.model.ShardIteratorType; + /** * The {@link MessageProducerSupport} implementation for receiving data from Amazon Kinesis * stream(s). @@ -791,6 +791,14 @@ public String toString() { + '}'; } + private enum ConsumerState { + NEW, + EXPIRED, + CONSUME, + SLEEP, + STOP + } + private final class ConsumerDispatcher implements SchedulingAwareRunnable { private final Set inReshardingProcess = new HashSet<>(); @@ -849,12 +857,10 @@ private final class ShardConsumer { private final ShardCheckpointer checkpointer; - private long nextCheckpointTimeInMillis; - - private final Runnable processTask = processTask(); - private final String key; + private long nextCheckpointTimeInMillis; + private Runnable notifier; private volatile ConsumerState state = ConsumerState.NEW; @@ -865,6 +871,8 @@ private final class ShardConsumer { private volatile long sleepUntil; + private final Runnable processTask = processTask(); + ShardConsumer(KinesisShardOffset shardOffset) { this.shardOffset = new KinesisShardOffset(shardOffset); this.key = buildCheckpointKeyForShard(shardOffset.getStream(), shardOffset.getShard()); @@ -1229,14 +1237,6 @@ public String toString() { } } - private enum ConsumerState { - NEW, - EXPIRED, - CONSUME, - SLEEP, - STOP - } - private final class ConsumerInvoker implements SchedulingAwareRunnable { private final Queue consumers = new ConcurrentLinkedQueue<>(); diff --git a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisShardOffset.java b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisShardOffset.java index 3b7b2294..1b5fa341 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisShardOffset.java +++ b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisShardOffset.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -58,54 +58,112 @@ public KinesisShardOffset(KinesisShardOffset other) { this.reset = other.isReset(); } - public void setIteratorType(ShardIteratorType iteratorType) { - this.iteratorType = iteratorType; + public static KinesisShardOffset latest() { + return latest(null, null); } - public ShardIteratorType getIteratorType() { - return this.iteratorType; + public static KinesisShardOffset latest(String stream, String shard) { + KinesisShardOffset kinesisShardOffset = new KinesisShardOffset(ShardIteratorType.LATEST); + kinesisShardOffset.stream = stream; + kinesisShardOffset.shard = shard; + return kinesisShardOffset; } - public void setSequenceNumber(String sequenceNumber) { - this.sequenceNumber = sequenceNumber; + public static KinesisShardOffset trimHorizon() { + return trimHorizon(null, null); } - public void setTimestamp(Date timestamp) { - this.timestamp = timestamp; + public static KinesisShardOffset trimHorizon(String stream, String shard) { + KinesisShardOffset kinesisShardOffset = new KinesisShardOffset(ShardIteratorType.TRIM_HORIZON); + kinesisShardOffset.stream = stream; + kinesisShardOffset.shard = shard; + return kinesisShardOffset; } - public void setStream(String stream) { - this.stream = stream; + public static KinesisShardOffset atSequenceNumber(String sequenceNumber) { + return atSequenceNumber(null, null, sequenceNumber); } - public void setShard(String shard) { - this.shard = shard; + public static KinesisShardOffset atSequenceNumber(String stream, String shard, String sequenceNumber) { + KinesisShardOffset kinesisShardOffset = new KinesisShardOffset(ShardIteratorType.AT_SEQUENCE_NUMBER); + kinesisShardOffset.stream = stream; + kinesisShardOffset.shard = shard; + kinesisShardOffset.sequenceNumber = sequenceNumber; + return kinesisShardOffset; } - public void setReset(boolean reset) { - this.reset = reset; + public static KinesisShardOffset afterSequenceNumber(String sequenceNumber) { + return afterSequenceNumber(null, null, sequenceNumber); + } + + public static KinesisShardOffset afterSequenceNumber(String stream, String shard, String sequenceNumber) { + KinesisShardOffset kinesisShardOffset = new KinesisShardOffset(ShardIteratorType.AFTER_SEQUENCE_NUMBER); + kinesisShardOffset.stream = stream; + kinesisShardOffset.shard = shard; + kinesisShardOffset.sequenceNumber = sequenceNumber; + return kinesisShardOffset; + } + + public static KinesisShardOffset atTimestamp(Date timestamp) { + return atTimestamp(null, null, timestamp); + } + + public static KinesisShardOffset atTimestamp(String stream, String shard, Date timestamp) { + KinesisShardOffset kinesisShardOffset = new KinesisShardOffset(ShardIteratorType.AT_TIMESTAMP); + kinesisShardOffset.stream = stream; + kinesisShardOffset.shard = shard; + kinesisShardOffset.timestamp = timestamp; + return kinesisShardOffset; + } + + public ShardIteratorType getIteratorType() { + return this.iteratorType; + } + + public void setIteratorType(ShardIteratorType iteratorType) { + this.iteratorType = iteratorType; } public String getSequenceNumber() { return this.sequenceNumber; } + public void setSequenceNumber(String sequenceNumber) { + this.sequenceNumber = sequenceNumber; + } + public Date getTimestamp() { return this.timestamp; } + public void setTimestamp(Date timestamp) { + this.timestamp = timestamp; + } + public String getStream() { return this.stream; } + public void setStream(String stream) { + this.stream = stream; + } + public String getShard() { return this.shard; } + public void setShard(String shard) { + this.shard = shard; + } + public boolean isReset() { return this.reset; } + public void setReset(boolean reset) { + this.reset = reset; + } + public KinesisShardOffset reset() { this.reset = true; return this; @@ -143,62 +201,4 @@ public String toString() { + '\'' + ", reset=" + this.reset + '}'; } - public static KinesisShardOffset latest() { - return latest(null, null); - } - - public static KinesisShardOffset latest(String stream, String shard) { - KinesisShardOffset kinesisShardOffset = new KinesisShardOffset(ShardIteratorType.LATEST); - kinesisShardOffset.stream = stream; - kinesisShardOffset.shard = shard; - return kinesisShardOffset; - } - - public static KinesisShardOffset trimHorizon() { - return trimHorizon(null, null); - } - - public static KinesisShardOffset trimHorizon(String stream, String shard) { - KinesisShardOffset kinesisShardOffset = new KinesisShardOffset(ShardIteratorType.TRIM_HORIZON); - kinesisShardOffset.stream = stream; - kinesisShardOffset.shard = shard; - return kinesisShardOffset; - } - - public static KinesisShardOffset atSequenceNumber(String sequenceNumber) { - return atSequenceNumber(null, null, sequenceNumber); - } - - public static KinesisShardOffset atSequenceNumber(String stream, String shard, String sequenceNumber) { - KinesisShardOffset kinesisShardOffset = new KinesisShardOffset(ShardIteratorType.AT_SEQUENCE_NUMBER); - kinesisShardOffset.stream = stream; - kinesisShardOffset.shard = shard; - kinesisShardOffset.sequenceNumber = sequenceNumber; - return kinesisShardOffset; - } - - public static KinesisShardOffset afterSequenceNumber(String sequenceNumber) { - return afterSequenceNumber(null, null, sequenceNumber); - } - - public static KinesisShardOffset afterSequenceNumber(String stream, String shard, String sequenceNumber) { - KinesisShardOffset kinesisShardOffset = new KinesisShardOffset(ShardIteratorType.AFTER_SEQUENCE_NUMBER); - kinesisShardOffset.stream = stream; - kinesisShardOffset.shard = shard; - kinesisShardOffset.sequenceNumber = sequenceNumber; - return kinesisShardOffset; - } - - public static KinesisShardOffset atTimestamp(Date timestamp) { - return atTimestamp(null, null, timestamp); - } - - public static KinesisShardOffset atTimestamp(String stream, String shard, Date timestamp) { - KinesisShardOffset kinesisShardOffset = new KinesisShardOffset(ShardIteratorType.AT_TIMESTAMP); - kinesisShardOffset.stream = stream; - kinesisShardOffset.shard = shard; - kinesisShardOffset.timestamp = timestamp; - return kinesisShardOffset; - } - } diff --git a/src/main/java/org/springframework/integration/aws/inbound/kinesis/ShardCheckpointer.java b/src/main/java/org/springframework/integration/aws/inbound/kinesis/ShardCheckpointer.java index 9b6c90e5..ac5e91eb 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/kinesis/ShardCheckpointer.java +++ b/src/main/java/org/springframework/integration/aws/inbound/kinesis/ShardCheckpointer.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/metadata/DynamoDbMetadataStore.java b/src/main/java/org/springframework/integration/aws/metadata/DynamoDbMetadataStore.java index 2b7e0db4..cecd5ce4 100644 --- a/src/main/java/org/springframework/integration/aws/metadata/DynamoDbMetadataStore.java +++ b/src/main/java/org/springframework/integration/aws/metadata/DynamoDbMetadataStore.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/outbound/KinesisMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/KinesisMessageHandler.java index ef70d69f..e3bf92ed 100644 --- a/src/main/java/org/springframework/integration/aws/outbound/KinesisMessageHandler.java +++ b/src/main/java/org/springframework/integration/aws/outbound/KinesisMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java index 67aa8d4f..9d9ef217 100644 --- a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java +++ b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 the original author or authors. + * Copyright 2019-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/support/AbstractMessageAttributesHeaderMapper.java b/src/main/java/org/springframework/integration/aws/support/AbstractMessageAttributesHeaderMapper.java index 4694b385..b4395e16 100644 --- a/src/main/java/org/springframework/integration/aws/support/AbstractMessageAttributesHeaderMapper.java +++ b/src/main/java/org/springframework/integration/aws/support/AbstractMessageAttributesHeaderMapper.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2019 the original author or authors. + * Copyright 2018-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -46,8 +46,8 @@ public abstract class AbstractMessageAttributesHeaderMapper implements Header private static final Log logger = LogFactory.getLog(SqsHeaderMapper.class); - private volatile String[] outboundHeaderNames = { "!" + MessageHeaders.ID, "!" + MessageHeaders.TIMESTAMP, - "!" + AwsHeaders.MESSAGE_ID, "!" + AwsHeaders.QUEUE, "!" + AwsHeaders.TOPIC, "*" }; + private volatile String[] outboundHeaderNames = {"!" + MessageHeaders.ID, "!" + MessageHeaders.TIMESTAMP, + "!" + AwsHeaders.MESSAGE_ID, "!" + AwsHeaders.QUEUE, "!" + AwsHeaders.TOPIC, "*"}; /** * Provide the header names that should be mapped to a AWS request object attributes diff --git a/src/main/java/org/springframework/integration/aws/support/AwsRequestFailureException.java b/src/main/java/org/springframework/integration/aws/support/AwsRequestFailureException.java index 7f594f90..c3737450 100644 --- a/src/main/java/org/springframework/integration/aws/support/AwsRequestFailureException.java +++ b/src/main/java/org/springframework/integration/aws/support/AwsRequestFailureException.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/support/S3FileInfo.java b/src/main/java/org/springframework/integration/aws/support/S3FileInfo.java index 6dd64a04..d2fd5d30 100644 --- a/src/main/java/org/springframework/integration/aws/support/S3FileInfo.java +++ b/src/main/java/org/springframework/integration/aws/support/S3FileInfo.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/support/S3RemoteFileTemplate.java b/src/main/java/org/springframework/integration/aws/support/S3RemoteFileTemplate.java index 82a74c56..5ba5f2c1 100644 --- a/src/main/java/org/springframework/integration/aws/support/S3RemoteFileTemplate.java +++ b/src/main/java/org/springframework/integration/aws/support/S3RemoteFileTemplate.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/support/SnsBodyBuilder.java b/src/main/java/org/springframework/integration/aws/support/SnsBodyBuilder.java index 636d2ff2..1d3e8773 100644 --- a/src/main/java/org/springframework/integration/aws/support/SnsBodyBuilder.java +++ b/src/main/java/org/springframework/integration/aws/support/SnsBodyBuilder.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/support/SqsHeaderMapper.java b/src/main/java/org/springframework/integration/aws/support/SqsHeaderMapper.java index d81f5552..5435355b 100644 --- a/src/main/java/org/springframework/integration/aws/support/SqsHeaderMapper.java +++ b/src/main/java/org/springframework/integration/aws/support/SqsHeaderMapper.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2019 the original author or authors. + * Copyright 2018-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/support/filters/S3PersistentAcceptOnceFileListFilter.java b/src/main/java/org/springframework/integration/aws/support/filters/S3PersistentAcceptOnceFileListFilter.java index 7aedb02a..14052c44 100644 --- a/src/main/java/org/springframework/integration/aws/support/filters/S3PersistentAcceptOnceFileListFilter.java +++ b/src/main/java/org/springframework/integration/aws/support/filters/S3PersistentAcceptOnceFileListFilter.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/support/filters/S3RegexPatternFileListFilter.java b/src/main/java/org/springframework/integration/aws/support/filters/S3RegexPatternFileListFilter.java index 65c98152..4b52b3f6 100644 --- a/src/main/java/org/springframework/integration/aws/support/filters/S3RegexPatternFileListFilter.java +++ b/src/main/java/org/springframework/integration/aws/support/filters/S3RegexPatternFileListFilter.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/support/filters/S3SimplePatternFileListFilter.java b/src/main/java/org/springframework/integration/aws/support/filters/S3SimplePatternFileListFilter.java index e2548e17..093d8aa5 100644 --- a/src/main/java/org/springframework/integration/aws/support/filters/S3SimplePatternFileListFilter.java +++ b/src/main/java/org/springframework/integration/aws/support/filters/S3SimplePatternFileListFilter.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/EnvironmentHostNameResolver.java b/src/test/java/org/springframework/integration/aws/EnvironmentHostNameResolver.java index b8588c20..2de0526f 100644 --- a/src/test/java/org/springframework/integration/aws/EnvironmentHostNameResolver.java +++ b/src/test/java/org/springframework/integration/aws/EnvironmentHostNameResolver.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 the original author or authors. + * Copyright 2019-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/ExtendedDockerTestUtils.java b/src/test/java/org/springframework/integration/aws/ExtendedDockerTestUtils.java index 0f4f0d9d..80c14def 100644 --- a/src/test/java/org/springframework/integration/aws/ExtendedDockerTestUtils.java +++ b/src/test/java/org/springframework/integration/aws/ExtendedDockerTestUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 the original author or authors. + * Copyright 2019-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/LocalStackSslEnvironmentProvider.java b/src/test/java/org/springframework/integration/aws/LocalStackSslEnvironmentProvider.java index dee547a4..58c6c3e0 100644 --- a/src/test/java/org/springframework/integration/aws/LocalStackSslEnvironmentProvider.java +++ b/src/test/java/org/springframework/integration/aws/LocalStackSslEnvironmentProvider.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 the original author or authors. + * Copyright 2019-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,13 +34,13 @@ */ public class LocalStackSslEnvironmentProvider implements IEnvironmentVariableProvider { - static { - System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true"); - } - @Override public Map getEnvironmentVariables() { return Collections.singletonMap("USE_SSL", "true"); } + static { + System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true"); + } + } diff --git a/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java b/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java index a22ffb99..976d8826 100644 --- a/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java +++ b/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java @@ -227,6 +227,8 @@ void testResharding() throws InterruptedException { @EnableIntegration public static class Config { + private final AtomicReference shardEndedEventReference = new AtomicReference<>(); + @Bean public AmazonKinesis amazonKinesis() { AmazonKinesis amazonKinesis = mock(AmazonKinesis.class); @@ -364,8 +366,6 @@ public KinesisMessageDrivenChannelAdapter reshardingChannelAdapter() { return adapter; } - private final AtomicReference shardEndedEventReference = new AtomicReference<>(); - @EventListener public void handleKinesisShardEndedEvent(KinesisShardEndedEvent event) { this.shardEndedEventReference.set(event); diff --git a/src/test/java/org/springframework/integration/aws/inbound/S3StreamingChannelAdapterTests.java b/src/test/java/org/springframework/integration/aws/inbound/S3StreamingChannelAdapterTests.java index 783df506..b5a48c5e 100644 --- a/src/test/java/org/springframework/integration/aws/inbound/S3StreamingChannelAdapterTests.java +++ b/src/test/java/org/springframework/integration/aws/inbound/S3StreamingChannelAdapterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -73,11 +73,11 @@ @DirtiesContext public class S3StreamingChannelAdapterTests { + private static final String S3_BUCKET = "S3_BUCKET"; + @TempDir static Path TEMPORARY_FOLDER; - private static final String S3_BUCKET = "S3_BUCKET"; - private static List S3_OBJECTS; @Autowired diff --git a/src/test/java/org/springframework/integration/aws/inbound/SnsInboundChannelAdapterTests.java b/src/test/java/org/springframework/integration/aws/inbound/SnsInboundChannelAdapterTests.java index d804bcda..0659c6b1 100644 --- a/src/test/java/org/springframework/integration/aws/inbound/SnsInboundChannelAdapterTests.java +++ b/src/test/java/org/springframework/integration/aws/inbound/SnsInboundChannelAdapterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/inbound/SqsMessageDrivenChannelAdapterTests.java b/src/test/java/org/springframework/integration/aws/inbound/SqsMessageDrivenChannelAdapterTests.java index 15dc9f5b..e209168d 100644 --- a/src/test/java/org/springframework/integration/aws/inbound/SqsMessageDrivenChannelAdapterTests.java +++ b/src/test/java/org/springframework/integration/aws/inbound/SqsMessageDrivenChannelAdapterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2019 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -107,7 +107,7 @@ void testSqsMessageDrivenChannelAdapter() { .hasCauseExactlyInstanceOf(IllegalArgumentException.class) .hasMessageContaining("Queue with name 'foo' does not exist"); - assertThat(this.sqsMessageDrivenChannelAdapter.getQueues()).isEqualTo(new String[] { "testQueue" }); + assertThat(this.sqsMessageDrivenChannelAdapter.getQueues()).isEqualTo(new String[] {"testQueue"}); } @Configuration diff --git a/src/test/java/org/springframework/integration/aws/kinesis/KinesisIntegrationTests.java b/src/test/java/org/springframework/integration/aws/kinesis/KinesisIntegrationTests.java index dd5aa9b9..c3c17726 100644 --- a/src/test/java/org/springframework/integration/aws/kinesis/KinesisIntegrationTests.java +++ b/src/test/java/org/springframework/integration/aws/kinesis/KinesisIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/kinesis/KplKclIntegrationTests.java b/src/test/java/org/springframework/integration/aws/kinesis/KplKclIntegrationTests.java index a081a959..e34f78f7 100644 --- a/src/test/java/org/springframework/integration/aws/kinesis/KplKclIntegrationTests.java +++ b/src/test/java/org/springframework/integration/aws/kinesis/KplKclIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -79,7 +79,7 @@ @LocalstackDockerProperties(randomizePorts = true, hostNameResolver = EnvironmentHostNameResolver.class, environmentVariableProvider = LocalStackSslEnvironmentProvider.class, - services = { "kinesis", "dynamodb", "cloudwatch" }) + services = {"kinesis", "dynamodb", "cloudwatch"}) @DirtiesContext public class KplKclIntegrationTests { diff --git a/src/test/java/org/springframework/integration/aws/leader/DynamoDbLockRegistryLeaderInitiatorTests.java b/src/test/java/org/springframework/integration/aws/leader/DynamoDbLockRegistryLeaderInitiatorTests.java index 2f713933..f156b819 100644 --- a/src/test/java/org/springframework/integration/aws/leader/DynamoDbLockRegistryLeaderInitiatorTests.java +++ b/src/test/java/org/springframework/integration/aws/leader/DynamoDbLockRegistryLeaderInitiatorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2019 the original author or authors. + * Copyright 2018-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/lock/DynamoDbLockRegistryTests.java b/src/test/java/org/springframework/integration/aws/lock/DynamoDbLockRegistryTests.java index d26313f6..ec04efe6 100644 --- a/src/test/java/org/springframework/integration/aws/lock/DynamoDbLockRegistryTests.java +++ b/src/test/java/org/springframework/integration/aws/lock/DynamoDbLockRegistryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2019 the original author or authors. + * Copyright 2018-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -66,10 +66,10 @@ @DirtiesContext public class DynamoDbLockRegistryTests { - private final AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); - private static AmazonDynamoDBAsync DYNAMO_DB; + private final AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); + @Autowired private DynamoDbLockRegistry dynamoDbLockRegistry; diff --git a/src/test/java/org/springframework/integration/aws/metadata/DynamoDbMetadataStoreTests.java b/src/test/java/org/springframework/integration/aws/metadata/DynamoDbMetadataStoreTests.java index 7acf710f..0da41c62 100644 --- a/src/test/java/org/springframework/integration/aws/metadata/DynamoDbMetadataStoreTests.java +++ b/src/test/java/org/springframework/integration/aws/metadata/DynamoDbMetadataStoreTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -54,10 +54,10 @@ services = "dynamodb") class DynamoDbMetadataStoreTests { - private static AmazonDynamoDBAsync DYNAMO_DB; - private static final String TEST_TABLE = "testMetadataStore"; + private static AmazonDynamoDBAsync DYNAMO_DB; + private static DynamoDbMetadataStore store; private final String file1 = "/remotepath/filesTodownload/file-1.txt"; diff --git a/src/test/java/org/springframework/integration/aws/outbound/KinesisMessageHandlerTests.java b/src/test/java/org/springframework/integration/aws/outbound/KinesisMessageHandlerTests.java index f81105c6..c914e998 100644 --- a/src/test/java/org/springframework/integration/aws/outbound/KinesisMessageHandlerTests.java +++ b/src/test/java/org/springframework/integration/aws/outbound/KinesisMessageHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/outbound/KinesisProducingMessageHandlerTests.java b/src/test/java/org/springframework/integration/aws/outbound/KinesisProducingMessageHandlerTests.java index 4bf11907..e25a3766 100644 --- a/src/test/java/org/springframework/integration/aws/outbound/KinesisProducingMessageHandlerTests.java +++ b/src/test/java/org/springframework/integration/aws/outbound/KinesisProducingMessageHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/outbound/S3MessageHandlerTests.java b/src/test/java/org/springframework/integration/aws/outbound/S3MessageHandlerTests.java index efacf2b5..63e132b2 100644 --- a/src/test/java/org/springframework/integration/aws/outbound/S3MessageHandlerTests.java +++ b/src/test/java/org/springframework/integration/aws/outbound/S3MessageHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -111,8 +111,6 @@ @DirtiesContext public class S3MessageHandlerTests { - private static SpelExpressionParser PARSER = new SpelExpressionParser(); - // define the bucket and file names used throughout the test private static final String S3_BUCKET_NAME = "myBucket"; @@ -123,6 +121,8 @@ public class S3MessageHandlerTests { @TempDir static Path temporaryFolder; + private static SpelExpressionParser PARSER = new SpelExpressionParser(); + @Autowired private AmazonS3 amazonS3; diff --git a/src/test/java/org/springframework/integration/aws/outbound/SnsMessageBuilderTests.java b/src/test/java/org/springframework/integration/aws/outbound/SnsMessageBuilderTests.java index 639e63a6..a51673aa 100644 --- a/src/test/java/org/springframework/integration/aws/outbound/SnsMessageBuilderTests.java +++ b/src/test/java/org/springframework/integration/aws/outbound/SnsMessageBuilderTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/outbound/SnsMessageHandlerTests.java b/src/test/java/org/springframework/integration/aws/outbound/SnsMessageHandlerTests.java index 4d5df0bd..120e0db5 100644 --- a/src/test/java/org/springframework/integration/aws/outbound/SnsMessageHandlerTests.java +++ b/src/test/java/org/springframework/integration/aws/outbound/SnsMessageHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/outbound/SqsMessageHandlerTests.java b/src/test/java/org/springframework/integration/aws/outbound/SqsMessageHandlerTests.java index eaf4f0bf..9ce7a7a9 100644 --- a/src/test/java/org/springframework/integration/aws/outbound/SqsMessageHandlerTests.java +++ b/src/test/java/org/springframework/integration/aws/outbound/SqsMessageHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2019 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/resources/log4j2-test.xml b/src/test/resources/log4j2-test.xml index 0149189d..212f97a2 100644 --- a/src/test/resources/log4j2-test.xml +++ b/src/test/resources/log4j2-test.xml @@ -2,7 +2,7 @@ - + @@ -10,7 +10,7 @@ - + From 3aeb6c1f25ab5548785152717788641f3cb0c2cb Mon Sep 17 00:00:00 2001 From: Matthias Wesolowski Date: Thu, 11 Jun 2020 01:10:01 +0200 Subject: [PATCH 3/8] checkstyle fixes --- .../integration/aws/LocalStackSslEnvironmentProvider.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/test/java/org/springframework/integration/aws/LocalStackSslEnvironmentProvider.java b/src/test/java/org/springframework/integration/aws/LocalStackSslEnvironmentProvider.java index 58c6c3e0..a909704d 100644 --- a/src/test/java/org/springframework/integration/aws/LocalStackSslEnvironmentProvider.java +++ b/src/test/java/org/springframework/integration/aws/LocalStackSslEnvironmentProvider.java @@ -34,13 +34,13 @@ */ public class LocalStackSslEnvironmentProvider implements IEnvironmentVariableProvider { + static { + System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true"); + } + @Override public Map getEnvironmentVariables() { return Collections.singletonMap("USE_SSL", "true"); } - static { - System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true"); - } - } From e7471ce613060723f5d07cba3e30724abf4abc36 Mon Sep 17 00:00:00 2001 From: Matthias Wesolowski Date: Fri, 12 Jun 2020 09:21:27 +0200 Subject: [PATCH 4/8] remove copyright fixes --- .../integration/aws/config/xml/AwsParserUtils.java | 2 +- .../aws/config/xml/S3OutboundChannelAdapterParser.java | 2 +- .../integration/aws/config/xml/S3OutboundGatewayParser.java | 2 +- .../aws/config/xml/SnsInboundChannelAdapterParser.java | 2 +- .../aws/config/xml/SnsOutboundChannelAdapterParser.java | 2 +- .../aws/config/xml/SqsMessageDrivenChannelAdapterParser.java | 2 +- .../aws/config/xml/SqsOutboundChannelAdapterParser.java | 2 +- .../integration/aws/inbound/S3InboundFileSynchronizer.java | 2 +- .../aws/inbound/S3InboundFileSynchronizingMessageSource.java | 2 +- .../integration/aws/inbound/S3StreamingMessageSource.java | 2 +- .../integration/aws/inbound/SnsInboundChannelAdapter.java | 2 +- .../aws/inbound/SqsMessageDrivenChannelAdapter.java | 2 +- .../aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java | 2 +- .../inbound/kinesis/KinesisMessageDrivenChannelAdapter.java | 2 +- .../integration/aws/inbound/kinesis/KinesisShardOffset.java | 3 ++- .../integration/aws/inbound/kinesis/ShardCheckpointer.java | 2 +- .../integration/aws/lock/DynamoDbLockRegistry.java | 2 +- .../integration/aws/metadata/DynamoDbMetadataStore.java | 2 +- .../integration/aws/outbound/KinesisMessageHandler.java | 2 +- .../integration/aws/outbound/KplMessageHandler.java | 2 +- .../aws/support/AbstractMessageAttributesHeaderMapper.java | 2 +- .../integration/aws/support/AwsRequestFailureException.java | 2 +- .../springframework/integration/aws/support/S3FileInfo.java | 2 +- .../integration/aws/support/S3RemoteFileTemplate.java | 2 +- .../org/springframework/integration/aws/support/S3Session.java | 2 +- .../integration/aws/support/S3SessionFactory.java | 2 +- .../integration/aws/support/SnsBodyBuilder.java | 2 +- .../integration/aws/support/SqsHeaderMapper.java | 2 +- .../support/filters/S3PersistentAcceptOnceFileListFilter.java | 2 +- .../aws/support/filters/S3RegexPatternFileListFilter.java | 2 +- .../aws/support/filters/S3SimplePatternFileListFilter.java | 2 +- .../integration/aws/EnvironmentHostNameResolver.java | 2 +- .../integration/aws/ExtendedDockerTestUtils.java | 2 +- .../integration/aws/LocalStackSslEnvironmentProvider.java | 2 +- .../aws/inbound/KinesisMessageDrivenChannelAdapterTests.java | 3 ++- .../integration/aws/inbound/S3InboundChannelAdapterTests.java | 2 +- .../aws/inbound/S3StreamingChannelAdapterTests.java | 2 +- .../integration/aws/inbound/SnsInboundChannelAdapterTests.java | 2 +- .../aws/inbound/SqsMessageDrivenChannelAdapterTests.java | 2 +- .../integration/aws/kinesis/KinesisIntegrationTests.java | 2 +- .../integration/aws/kinesis/KplKclIntegrationTests.java | 2 +- .../aws/leader/DynamoDbLockRegistryLeaderInitiatorTests.java | 2 +- .../integration/aws/lock/DynamoDbLockRegistryTests.java | 2 +- .../integration/aws/metadata/DynamoDbMetadataStoreTests.java | 2 +- .../integration/aws/outbound/KinesisMessageHandlerTests.java | 2 +- .../aws/outbound/KinesisProducingMessageHandlerTests.java | 2 +- .../integration/aws/outbound/S3MessageHandlerTests.java | 2 +- .../integration/aws/outbound/SnsMessageBuilderTests.java | 2 +- .../integration/aws/outbound/SnsMessageHandlerTests.java | 2 +- .../integration/aws/outbound/SqsMessageHandlerTests.java | 2 +- 50 files changed, 52 insertions(+), 50 deletions(-) diff --git a/src/main/java/org/springframework/integration/aws/config/xml/AwsParserUtils.java b/src/main/java/org/springframework/integration/aws/config/xml/AwsParserUtils.java index 6d28ae3a..92e33eec 100644 --- a/src/main/java/org/springframework/integration/aws/config/xml/AwsParserUtils.java +++ b/src/main/java/org/springframework/integration/aws/config/xml/AwsParserUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/config/xml/S3OutboundChannelAdapterParser.java b/src/main/java/org/springframework/integration/aws/config/xml/S3OutboundChannelAdapterParser.java index 53e95d50..92e384ce 100644 --- a/src/main/java/org/springframework/integration/aws/config/xml/S3OutboundChannelAdapterParser.java +++ b/src/main/java/org/springframework/integration/aws/config/xml/S3OutboundChannelAdapterParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/config/xml/S3OutboundGatewayParser.java b/src/main/java/org/springframework/integration/aws/config/xml/S3OutboundGatewayParser.java index b5581518..be437160 100644 --- a/src/main/java/org/springframework/integration/aws/config/xml/S3OutboundGatewayParser.java +++ b/src/main/java/org/springframework/integration/aws/config/xml/S3OutboundGatewayParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/config/xml/SnsInboundChannelAdapterParser.java b/src/main/java/org/springframework/integration/aws/config/xml/SnsInboundChannelAdapterParser.java index ed1858b0..15a85338 100644 --- a/src/main/java/org/springframework/integration/aws/config/xml/SnsInboundChannelAdapterParser.java +++ b/src/main/java/org/springframework/integration/aws/config/xml/SnsInboundChannelAdapterParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2020 the original author or authors. + * Copyright 2015-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/config/xml/SnsOutboundChannelAdapterParser.java b/src/main/java/org/springframework/integration/aws/config/xml/SnsOutboundChannelAdapterParser.java index 1a25ae29..0bf1576c 100644 --- a/src/main/java/org/springframework/integration/aws/config/xml/SnsOutboundChannelAdapterParser.java +++ b/src/main/java/org/springframework/integration/aws/config/xml/SnsOutboundChannelAdapterParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/config/xml/SqsMessageDrivenChannelAdapterParser.java b/src/main/java/org/springframework/integration/aws/config/xml/SqsMessageDrivenChannelAdapterParser.java index 894f9362..1e437fdd 100644 --- a/src/main/java/org/springframework/integration/aws/config/xml/SqsMessageDrivenChannelAdapterParser.java +++ b/src/main/java/org/springframework/integration/aws/config/xml/SqsMessageDrivenChannelAdapterParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/config/xml/SqsOutboundChannelAdapterParser.java b/src/main/java/org/springframework/integration/aws/config/xml/SqsOutboundChannelAdapterParser.java index 974dc0b9..cdaac23f 100644 --- a/src/main/java/org/springframework/integration/aws/config/xml/SqsOutboundChannelAdapterParser.java +++ b/src/main/java/org/springframework/integration/aws/config/xml/SqsOutboundChannelAdapterParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2020 the original author or authors. + * Copyright 2015-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/inbound/S3InboundFileSynchronizer.java b/src/main/java/org/springframework/integration/aws/inbound/S3InboundFileSynchronizer.java index 658aa079..dbce24a9 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/S3InboundFileSynchronizer.java +++ b/src/main/java/org/springframework/integration/aws/inbound/S3InboundFileSynchronizer.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/inbound/S3InboundFileSynchronizingMessageSource.java b/src/main/java/org/springframework/integration/aws/inbound/S3InboundFileSynchronizingMessageSource.java index 02b14de4..cc848ea8 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/S3InboundFileSynchronizingMessageSource.java +++ b/src/main/java/org/springframework/integration/aws/inbound/S3InboundFileSynchronizingMessageSource.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/inbound/S3StreamingMessageSource.java b/src/main/java/org/springframework/integration/aws/inbound/S3StreamingMessageSource.java index 1ba7a4ee..4a14bbac 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/S3StreamingMessageSource.java +++ b/src/main/java/org/springframework/integration/aws/inbound/S3StreamingMessageSource.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/inbound/SnsInboundChannelAdapter.java b/src/main/java/org/springframework/integration/aws/inbound/SnsInboundChannelAdapter.java index db7881e6..dad30245 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/SnsInboundChannelAdapter.java +++ b/src/main/java/org/springframework/integration/aws/inbound/SnsInboundChannelAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/inbound/SqsMessageDrivenChannelAdapter.java b/src/main/java/org/springframework/integration/aws/inbound/SqsMessageDrivenChannelAdapter.java index e0413f88..5f9c1216 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/SqsMessageDrivenChannelAdapter.java +++ b/src/main/java/org/springframework/integration/aws/inbound/SqsMessageDrivenChannelAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java index 60136614..09380299 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java +++ b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2020 the original author or authors. + * Copyright 2019-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java index db4bc0e7..96219605 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java +++ b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisShardOffset.java b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisShardOffset.java index 1b5fa341..6b25e622 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisShardOffset.java +++ b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisShardOffset.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,6 +28,7 @@ * A model to represent a sequence in the shard for particular {@link ShardIteratorType}. * * @author Artem Bilan + * @author Matthias Wesolowski * @since 1.1 */ public class KinesisShardOffset { diff --git a/src/main/java/org/springframework/integration/aws/inbound/kinesis/ShardCheckpointer.java b/src/main/java/org/springframework/integration/aws/inbound/kinesis/ShardCheckpointer.java index ac5e91eb..9b6c90e5 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/kinesis/ShardCheckpointer.java +++ b/src/main/java/org/springframework/integration/aws/inbound/kinesis/ShardCheckpointer.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/lock/DynamoDbLockRegistry.java b/src/main/java/org/springframework/integration/aws/lock/DynamoDbLockRegistry.java index e90903ed..ebb47375 100644 --- a/src/main/java/org/springframework/integration/aws/lock/DynamoDbLockRegistry.java +++ b/src/main/java/org/springframework/integration/aws/lock/DynamoDbLockRegistry.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2020 the original author or authors. + * Copyright 2018-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/metadata/DynamoDbMetadataStore.java b/src/main/java/org/springframework/integration/aws/metadata/DynamoDbMetadataStore.java index cecd5ce4..2b7e0db4 100644 --- a/src/main/java/org/springframework/integration/aws/metadata/DynamoDbMetadataStore.java +++ b/src/main/java/org/springframework/integration/aws/metadata/DynamoDbMetadataStore.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/outbound/KinesisMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/KinesisMessageHandler.java index e3bf92ed..ef70d69f 100644 --- a/src/main/java/org/springframework/integration/aws/outbound/KinesisMessageHandler.java +++ b/src/main/java/org/springframework/integration/aws/outbound/KinesisMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java index 9d9ef217..f4e65b5c 100644 --- a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java +++ b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2020 the original author or authors. + * Copyright 2019-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/support/AbstractMessageAttributesHeaderMapper.java b/src/main/java/org/springframework/integration/aws/support/AbstractMessageAttributesHeaderMapper.java index b4395e16..9381ad40 100644 --- a/src/main/java/org/springframework/integration/aws/support/AbstractMessageAttributesHeaderMapper.java +++ b/src/main/java/org/springframework/integration/aws/support/AbstractMessageAttributesHeaderMapper.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2020 the original author or authors. + * Copyright 2018-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/support/AwsRequestFailureException.java b/src/main/java/org/springframework/integration/aws/support/AwsRequestFailureException.java index c3737450..7f594f90 100644 --- a/src/main/java/org/springframework/integration/aws/support/AwsRequestFailureException.java +++ b/src/main/java/org/springframework/integration/aws/support/AwsRequestFailureException.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/support/S3FileInfo.java b/src/main/java/org/springframework/integration/aws/support/S3FileInfo.java index d2fd5d30..6dd64a04 100644 --- a/src/main/java/org/springframework/integration/aws/support/S3FileInfo.java +++ b/src/main/java/org/springframework/integration/aws/support/S3FileInfo.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/support/S3RemoteFileTemplate.java b/src/main/java/org/springframework/integration/aws/support/S3RemoteFileTemplate.java index 5ba5f2c1..82a74c56 100644 --- a/src/main/java/org/springframework/integration/aws/support/S3RemoteFileTemplate.java +++ b/src/main/java/org/springframework/integration/aws/support/S3RemoteFileTemplate.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/support/S3Session.java b/src/main/java/org/springframework/integration/aws/support/S3Session.java index efc4f4e2..ea6be487 100644 --- a/src/main/java/org/springframework/integration/aws/support/S3Session.java +++ b/src/main/java/org/springframework/integration/aws/support/S3Session.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/support/S3SessionFactory.java b/src/main/java/org/springframework/integration/aws/support/S3SessionFactory.java index b0d25b62..bae68a81 100644 --- a/src/main/java/org/springframework/integration/aws/support/S3SessionFactory.java +++ b/src/main/java/org/springframework/integration/aws/support/S3SessionFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/support/SnsBodyBuilder.java b/src/main/java/org/springframework/integration/aws/support/SnsBodyBuilder.java index 1d3e8773..636d2ff2 100644 --- a/src/main/java/org/springframework/integration/aws/support/SnsBodyBuilder.java +++ b/src/main/java/org/springframework/integration/aws/support/SnsBodyBuilder.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/support/SqsHeaderMapper.java b/src/main/java/org/springframework/integration/aws/support/SqsHeaderMapper.java index 5435355b..d81f5552 100644 --- a/src/main/java/org/springframework/integration/aws/support/SqsHeaderMapper.java +++ b/src/main/java/org/springframework/integration/aws/support/SqsHeaderMapper.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2020 the original author or authors. + * Copyright 2018-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/support/filters/S3PersistentAcceptOnceFileListFilter.java b/src/main/java/org/springframework/integration/aws/support/filters/S3PersistentAcceptOnceFileListFilter.java index 14052c44..7aedb02a 100644 --- a/src/main/java/org/springframework/integration/aws/support/filters/S3PersistentAcceptOnceFileListFilter.java +++ b/src/main/java/org/springframework/integration/aws/support/filters/S3PersistentAcceptOnceFileListFilter.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/support/filters/S3RegexPatternFileListFilter.java b/src/main/java/org/springframework/integration/aws/support/filters/S3RegexPatternFileListFilter.java index 4b52b3f6..65c98152 100644 --- a/src/main/java/org/springframework/integration/aws/support/filters/S3RegexPatternFileListFilter.java +++ b/src/main/java/org/springframework/integration/aws/support/filters/S3RegexPatternFileListFilter.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/support/filters/S3SimplePatternFileListFilter.java b/src/main/java/org/springframework/integration/aws/support/filters/S3SimplePatternFileListFilter.java index 093d8aa5..e2548e17 100644 --- a/src/main/java/org/springframework/integration/aws/support/filters/S3SimplePatternFileListFilter.java +++ b/src/main/java/org/springframework/integration/aws/support/filters/S3SimplePatternFileListFilter.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/EnvironmentHostNameResolver.java b/src/test/java/org/springframework/integration/aws/EnvironmentHostNameResolver.java index 2de0526f..7152a19d 100644 --- a/src/test/java/org/springframework/integration/aws/EnvironmentHostNameResolver.java +++ b/src/test/java/org/springframework/integration/aws/EnvironmentHostNameResolver.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2020 the original author or authors. + * Copyright 2019-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/ExtendedDockerTestUtils.java b/src/test/java/org/springframework/integration/aws/ExtendedDockerTestUtils.java index 80c14def..e12978fd 100644 --- a/src/test/java/org/springframework/integration/aws/ExtendedDockerTestUtils.java +++ b/src/test/java/org/springframework/integration/aws/ExtendedDockerTestUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2020 the original author or authors. + * Copyright 2019-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/LocalStackSslEnvironmentProvider.java b/src/test/java/org/springframework/integration/aws/LocalStackSslEnvironmentProvider.java index a909704d..0f616949 100644 --- a/src/test/java/org/springframework/integration/aws/LocalStackSslEnvironmentProvider.java +++ b/src/test/java/org/springframework/integration/aws/LocalStackSslEnvironmentProvider.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2020 the original author or authors. + * Copyright 2019-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java b/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java index 976d8826..22da8188 100644 --- a/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java +++ b/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -73,6 +73,7 @@ /** * @author Artem Bilan + * @author Matthias Wesolowski * @since 1.1 */ @SpringJUnitConfig diff --git a/src/test/java/org/springframework/integration/aws/inbound/S3InboundChannelAdapterTests.java b/src/test/java/org/springframework/integration/aws/inbound/S3InboundChannelAdapterTests.java index 3f03d7f2..ff5ce713 100644 --- a/src/test/java/org/springframework/integration/aws/inbound/S3InboundChannelAdapterTests.java +++ b/src/test/java/org/springframework/integration/aws/inbound/S3InboundChannelAdapterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/inbound/S3StreamingChannelAdapterTests.java b/src/test/java/org/springframework/integration/aws/inbound/S3StreamingChannelAdapterTests.java index b5a48c5e..31ed1c5a 100644 --- a/src/test/java/org/springframework/integration/aws/inbound/S3StreamingChannelAdapterTests.java +++ b/src/test/java/org/springframework/integration/aws/inbound/S3StreamingChannelAdapterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/inbound/SnsInboundChannelAdapterTests.java b/src/test/java/org/springframework/integration/aws/inbound/SnsInboundChannelAdapterTests.java index 0659c6b1..d804bcda 100644 --- a/src/test/java/org/springframework/integration/aws/inbound/SnsInboundChannelAdapterTests.java +++ b/src/test/java/org/springframework/integration/aws/inbound/SnsInboundChannelAdapterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/inbound/SqsMessageDrivenChannelAdapterTests.java b/src/test/java/org/springframework/integration/aws/inbound/SqsMessageDrivenChannelAdapterTests.java index e209168d..4897a887 100644 --- a/src/test/java/org/springframework/integration/aws/inbound/SqsMessageDrivenChannelAdapterTests.java +++ b/src/test/java/org/springframework/integration/aws/inbound/SqsMessageDrivenChannelAdapterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2020 the original author or authors. + * Copyright 2015-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/kinesis/KinesisIntegrationTests.java b/src/test/java/org/springframework/integration/aws/kinesis/KinesisIntegrationTests.java index c3c17726..dd5aa9b9 100644 --- a/src/test/java/org/springframework/integration/aws/kinesis/KinesisIntegrationTests.java +++ b/src/test/java/org/springframework/integration/aws/kinesis/KinesisIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/kinesis/KplKclIntegrationTests.java b/src/test/java/org/springframework/integration/aws/kinesis/KplKclIntegrationTests.java index e34f78f7..3316fd30 100644 --- a/src/test/java/org/springframework/integration/aws/kinesis/KplKclIntegrationTests.java +++ b/src/test/java/org/springframework/integration/aws/kinesis/KplKclIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/leader/DynamoDbLockRegistryLeaderInitiatorTests.java b/src/test/java/org/springframework/integration/aws/leader/DynamoDbLockRegistryLeaderInitiatorTests.java index f156b819..2f713933 100644 --- a/src/test/java/org/springframework/integration/aws/leader/DynamoDbLockRegistryLeaderInitiatorTests.java +++ b/src/test/java/org/springframework/integration/aws/leader/DynamoDbLockRegistryLeaderInitiatorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2020 the original author or authors. + * Copyright 2018-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/lock/DynamoDbLockRegistryTests.java b/src/test/java/org/springframework/integration/aws/lock/DynamoDbLockRegistryTests.java index ec04efe6..339d3d40 100644 --- a/src/test/java/org/springframework/integration/aws/lock/DynamoDbLockRegistryTests.java +++ b/src/test/java/org/springframework/integration/aws/lock/DynamoDbLockRegistryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2020 the original author or authors. + * Copyright 2018-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/metadata/DynamoDbMetadataStoreTests.java b/src/test/java/org/springframework/integration/aws/metadata/DynamoDbMetadataStoreTests.java index 0da41c62..8cf323fc 100644 --- a/src/test/java/org/springframework/integration/aws/metadata/DynamoDbMetadataStoreTests.java +++ b/src/test/java/org/springframework/integration/aws/metadata/DynamoDbMetadataStoreTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/outbound/KinesisMessageHandlerTests.java b/src/test/java/org/springframework/integration/aws/outbound/KinesisMessageHandlerTests.java index c914e998..f81105c6 100644 --- a/src/test/java/org/springframework/integration/aws/outbound/KinesisMessageHandlerTests.java +++ b/src/test/java/org/springframework/integration/aws/outbound/KinesisMessageHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/outbound/KinesisProducingMessageHandlerTests.java b/src/test/java/org/springframework/integration/aws/outbound/KinesisProducingMessageHandlerTests.java index e25a3766..4bf11907 100644 --- a/src/test/java/org/springframework/integration/aws/outbound/KinesisProducingMessageHandlerTests.java +++ b/src/test/java/org/springframework/integration/aws/outbound/KinesisProducingMessageHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/outbound/S3MessageHandlerTests.java b/src/test/java/org/springframework/integration/aws/outbound/S3MessageHandlerTests.java index 63e132b2..f9b5c57d 100644 --- a/src/test/java/org/springframework/integration/aws/outbound/S3MessageHandlerTests.java +++ b/src/test/java/org/springframework/integration/aws/outbound/S3MessageHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/outbound/SnsMessageBuilderTests.java b/src/test/java/org/springframework/integration/aws/outbound/SnsMessageBuilderTests.java index a51673aa..639e63a6 100644 --- a/src/test/java/org/springframework/integration/aws/outbound/SnsMessageBuilderTests.java +++ b/src/test/java/org/springframework/integration/aws/outbound/SnsMessageBuilderTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/outbound/SnsMessageHandlerTests.java b/src/test/java/org/springframework/integration/aws/outbound/SnsMessageHandlerTests.java index 120e0db5..4d5df0bd 100644 --- a/src/test/java/org/springframework/integration/aws/outbound/SnsMessageHandlerTests.java +++ b/src/test/java/org/springframework/integration/aws/outbound/SnsMessageHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/outbound/SqsMessageHandlerTests.java b/src/test/java/org/springframework/integration/aws/outbound/SqsMessageHandlerTests.java index 9ce7a7a9..eaf4f0bf 100644 --- a/src/test/java/org/springframework/integration/aws/outbound/SqsMessageHandlerTests.java +++ b/src/test/java/org/springframework/integration/aws/outbound/SqsMessageHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2020 the original author or authors. + * Copyright 2015-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 7fcaf3373a825aedb196cc1360ca502e94ac4851 Mon Sep 17 00:00:00 2001 From: Matthias Wesolowski Date: Fri, 12 Jun 2020 09:22:50 +0200 Subject: [PATCH 5/8] fix copyright --- .../aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java | 2 +- .../integration/aws/outbound/KplMessageHandler.java | 2 +- .../integration/aws/EnvironmentHostNameResolver.java | 2 +- .../integration/aws/ExtendedDockerTestUtils.java | 2 +- .../integration/aws/LocalStackSslEnvironmentProvider.java | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java index 09380299..5bded3d0 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java +++ b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2019 the original author or authors. + * Copyright 2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java index f4e65b5c..67aa8d4f 100644 --- a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java +++ b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2019 the original author or authors. + * Copyright 2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/EnvironmentHostNameResolver.java b/src/test/java/org/springframework/integration/aws/EnvironmentHostNameResolver.java index 7152a19d..b8588c20 100644 --- a/src/test/java/org/springframework/integration/aws/EnvironmentHostNameResolver.java +++ b/src/test/java/org/springframework/integration/aws/EnvironmentHostNameResolver.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2019 the original author or authors. + * Copyright 2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/ExtendedDockerTestUtils.java b/src/test/java/org/springframework/integration/aws/ExtendedDockerTestUtils.java index e12978fd..0f4f0d9d 100644 --- a/src/test/java/org/springframework/integration/aws/ExtendedDockerTestUtils.java +++ b/src/test/java/org/springframework/integration/aws/ExtendedDockerTestUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2019 the original author or authors. + * Copyright 2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/LocalStackSslEnvironmentProvider.java b/src/test/java/org/springframework/integration/aws/LocalStackSslEnvironmentProvider.java index 0f616949..dee547a4 100644 --- a/src/test/java/org/springframework/integration/aws/LocalStackSslEnvironmentProvider.java +++ b/src/test/java/org/springframework/integration/aws/LocalStackSslEnvironmentProvider.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2019 the original author or authors. + * Copyright 2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From fbbca219905eb963c418acbacabbc564b1007cba Mon Sep 17 00:00:00 2001 From: Matthias Wesolowski Date: Fri, 12 Jun 2020 09:26:53 +0200 Subject: [PATCH 6/8] fix copyright --- .../aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java | 2 +- .../integration/aws/inbound/kinesis/KinesisShardOffset.java | 2 +- .../org/springframework/integration/aws/support/S3Session.java | 2 +- .../integration/aws/support/S3SessionFactory.java | 2 +- .../integration/aws/inbound/S3InboundChannelAdapterTests.java | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java index 96219605..db4bc0e7 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java +++ b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisShardOffset.java b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisShardOffset.java index 6b25e622..1f6d14a3 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisShardOffset.java +++ b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisShardOffset.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/support/S3Session.java b/src/main/java/org/springframework/integration/aws/support/S3Session.java index ea6be487..efc4f4e2 100644 --- a/src/main/java/org/springframework/integration/aws/support/S3Session.java +++ b/src/main/java/org/springframework/integration/aws/support/S3Session.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/support/S3SessionFactory.java b/src/main/java/org/springframework/integration/aws/support/S3SessionFactory.java index bae68a81..b0d25b62 100644 --- a/src/main/java/org/springframework/integration/aws/support/S3SessionFactory.java +++ b/src/main/java/org/springframework/integration/aws/support/S3SessionFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/inbound/S3InboundChannelAdapterTests.java b/src/test/java/org/springframework/integration/aws/inbound/S3InboundChannelAdapterTests.java index ff5ce713..3f03d7f2 100644 --- a/src/test/java/org/springframework/integration/aws/inbound/S3InboundChannelAdapterTests.java +++ b/src/test/java/org/springframework/integration/aws/inbound/S3InboundChannelAdapterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From c42885caa55b7dee68eb9b1c19778b8984ea7b55 Mon Sep 17 00:00:00 2001 From: Matthias Wesolowski Date: Fri, 12 Jun 2020 09:26:53 +0200 Subject: [PATCH 7/8] fix copyright --- .../aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java | 2 +- .../integration/aws/inbound/kinesis/KinesisShardOffset.java | 2 +- .../integration/aws/lock/DynamoDbLockRegistry.java | 2 +- .../org/springframework/integration/aws/support/S3Session.java | 2 +- .../integration/aws/support/S3SessionFactory.java | 2 +- .../integration/aws/inbound/S3InboundChannelAdapterTests.java | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java index 96219605..db4bc0e7 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java +++ b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisShardOffset.java b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisShardOffset.java index 6b25e622..1f6d14a3 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisShardOffset.java +++ b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisShardOffset.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/lock/DynamoDbLockRegistry.java b/src/main/java/org/springframework/integration/aws/lock/DynamoDbLockRegistry.java index ebb47375..e90903ed 100644 --- a/src/main/java/org/springframework/integration/aws/lock/DynamoDbLockRegistry.java +++ b/src/main/java/org/springframework/integration/aws/lock/DynamoDbLockRegistry.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2019 the original author or authors. + * Copyright 2018-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/support/S3Session.java b/src/main/java/org/springframework/integration/aws/support/S3Session.java index ea6be487..efc4f4e2 100644 --- a/src/main/java/org/springframework/integration/aws/support/S3Session.java +++ b/src/main/java/org/springframework/integration/aws/support/S3Session.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/org/springframework/integration/aws/support/S3SessionFactory.java b/src/main/java/org/springframework/integration/aws/support/S3SessionFactory.java index bae68a81..b0d25b62 100644 --- a/src/main/java/org/springframework/integration/aws/support/S3SessionFactory.java +++ b/src/main/java/org/springframework/integration/aws/support/S3SessionFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/org/springframework/integration/aws/inbound/S3InboundChannelAdapterTests.java b/src/test/java/org/springframework/integration/aws/inbound/S3InboundChannelAdapterTests.java index ff5ce713..3f03d7f2 100644 --- a/src/test/java/org/springframework/integration/aws/inbound/S3InboundChannelAdapterTests.java +++ b/src/test/java/org/springframework/integration/aws/inbound/S3InboundChannelAdapterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From dde764478da5d91fea5207755ac6e21ab744281c Mon Sep 17 00:00:00 2001 From: Matthias Wesolowski Date: Mon, 15 Jun 2020 20:11:30 +0200 Subject: [PATCH 8/8] - remove blanks from javadoc - revert unnecassary formatting changes --- .../KinesisMessageDrivenChannelAdapter.java | 5 +- .../inbound/kinesis/KinesisShardOffset.java | 147 +++++++++--------- ...nesisMessageDrivenChannelAdapterTests.java | 3 +- 3 files changed, 76 insertions(+), 79 deletions(-) diff --git a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java index db4bc0e7..e17cc650 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java +++ b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java @@ -231,7 +231,6 @@ public void setStreamInitialSequence(KinesisShardOffset streamInitialSequence) { /** * Specify a {@link Converter} to deserialize the {@code byte[]} from record's body. Can be {@code * null} meaning no deserialization. - * * @param converter the {@link Converter} to use or null */ public void setConverter(Converter converter) { @@ -250,7 +249,6 @@ public void setCheckpointMode(CheckpointMode checkpointMode) { /** * Sets the interval between 2 checkpoints. Only used when checkpointMode is periodic. - * * @param checkpointsInterval interval between 2 checkpoints (in milliseconds) * @since 2.2 */ @@ -260,7 +258,6 @@ public void setCheckpointsInterval(long checkpointsInterval) { /** * The maximum record to poll per on get-records request. Not greater then {@code 10000}. - * * @param recordsLimit the number of records to for per on get-records request. * @see GetRecordsRequest#setLimit */ @@ -538,7 +535,7 @@ private List readShardList(String stream, int retryCount) { if (retryCount > this.describeStreamRetries) { throw new IllegalStateException( - "Kinesis could not read shards from stream with name:[" + stream + "] "); + "Kinesis could not read shards from stream with name [" + stream + "] "); } ListShardsRequest listShardsRequest = new ListShardsRequest().withStreamName(stream); diff --git a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisShardOffset.java b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisShardOffset.java index 1f6d14a3..3b7b2294 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisShardOffset.java +++ b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisShardOffset.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,7 +28,6 @@ * A model to represent a sequence in the shard for particular {@link ShardIteratorType}. * * @author Artem Bilan - * @author Matthias Wesolowski * @since 1.1 */ public class KinesisShardOffset { @@ -59,112 +58,54 @@ public KinesisShardOffset(KinesisShardOffset other) { this.reset = other.isReset(); } - public static KinesisShardOffset latest() { - return latest(null, null); - } - - public static KinesisShardOffset latest(String stream, String shard) { - KinesisShardOffset kinesisShardOffset = new KinesisShardOffset(ShardIteratorType.LATEST); - kinesisShardOffset.stream = stream; - kinesisShardOffset.shard = shard; - return kinesisShardOffset; - } - - public static KinesisShardOffset trimHorizon() { - return trimHorizon(null, null); - } - - public static KinesisShardOffset trimHorizon(String stream, String shard) { - KinesisShardOffset kinesisShardOffset = new KinesisShardOffset(ShardIteratorType.TRIM_HORIZON); - kinesisShardOffset.stream = stream; - kinesisShardOffset.shard = shard; - return kinesisShardOffset; - } - - public static KinesisShardOffset atSequenceNumber(String sequenceNumber) { - return atSequenceNumber(null, null, sequenceNumber); - } - - public static KinesisShardOffset atSequenceNumber(String stream, String shard, String sequenceNumber) { - KinesisShardOffset kinesisShardOffset = new KinesisShardOffset(ShardIteratorType.AT_SEQUENCE_NUMBER); - kinesisShardOffset.stream = stream; - kinesisShardOffset.shard = shard; - kinesisShardOffset.sequenceNumber = sequenceNumber; - return kinesisShardOffset; + public void setIteratorType(ShardIteratorType iteratorType) { + this.iteratorType = iteratorType; } - public static KinesisShardOffset afterSequenceNumber(String sequenceNumber) { - return afterSequenceNumber(null, null, sequenceNumber); + public ShardIteratorType getIteratorType() { + return this.iteratorType; } - public static KinesisShardOffset afterSequenceNumber(String stream, String shard, String sequenceNumber) { - KinesisShardOffset kinesisShardOffset = new KinesisShardOffset(ShardIteratorType.AFTER_SEQUENCE_NUMBER); - kinesisShardOffset.stream = stream; - kinesisShardOffset.shard = shard; - kinesisShardOffset.sequenceNumber = sequenceNumber; - return kinesisShardOffset; + public void setSequenceNumber(String sequenceNumber) { + this.sequenceNumber = sequenceNumber; } - public static KinesisShardOffset atTimestamp(Date timestamp) { - return atTimestamp(null, null, timestamp); + public void setTimestamp(Date timestamp) { + this.timestamp = timestamp; } - public static KinesisShardOffset atTimestamp(String stream, String shard, Date timestamp) { - KinesisShardOffset kinesisShardOffset = new KinesisShardOffset(ShardIteratorType.AT_TIMESTAMP); - kinesisShardOffset.stream = stream; - kinesisShardOffset.shard = shard; - kinesisShardOffset.timestamp = timestamp; - return kinesisShardOffset; + public void setStream(String stream) { + this.stream = stream; } - public ShardIteratorType getIteratorType() { - return this.iteratorType; + public void setShard(String shard) { + this.shard = shard; } - public void setIteratorType(ShardIteratorType iteratorType) { - this.iteratorType = iteratorType; + public void setReset(boolean reset) { + this.reset = reset; } public String getSequenceNumber() { return this.sequenceNumber; } - public void setSequenceNumber(String sequenceNumber) { - this.sequenceNumber = sequenceNumber; - } - public Date getTimestamp() { return this.timestamp; } - public void setTimestamp(Date timestamp) { - this.timestamp = timestamp; - } - public String getStream() { return this.stream; } - public void setStream(String stream) { - this.stream = stream; - } - public String getShard() { return this.shard; } - public void setShard(String shard) { - this.shard = shard; - } - public boolean isReset() { return this.reset; } - public void setReset(boolean reset) { - this.reset = reset; - } - public KinesisShardOffset reset() { this.reset = true; return this; @@ -202,4 +143,62 @@ public String toString() { + '\'' + ", reset=" + this.reset + '}'; } + public static KinesisShardOffset latest() { + return latest(null, null); + } + + public static KinesisShardOffset latest(String stream, String shard) { + KinesisShardOffset kinesisShardOffset = new KinesisShardOffset(ShardIteratorType.LATEST); + kinesisShardOffset.stream = stream; + kinesisShardOffset.shard = shard; + return kinesisShardOffset; + } + + public static KinesisShardOffset trimHorizon() { + return trimHorizon(null, null); + } + + public static KinesisShardOffset trimHorizon(String stream, String shard) { + KinesisShardOffset kinesisShardOffset = new KinesisShardOffset(ShardIteratorType.TRIM_HORIZON); + kinesisShardOffset.stream = stream; + kinesisShardOffset.shard = shard; + return kinesisShardOffset; + } + + public static KinesisShardOffset atSequenceNumber(String sequenceNumber) { + return atSequenceNumber(null, null, sequenceNumber); + } + + public static KinesisShardOffset atSequenceNumber(String stream, String shard, String sequenceNumber) { + KinesisShardOffset kinesisShardOffset = new KinesisShardOffset(ShardIteratorType.AT_SEQUENCE_NUMBER); + kinesisShardOffset.stream = stream; + kinesisShardOffset.shard = shard; + kinesisShardOffset.sequenceNumber = sequenceNumber; + return kinesisShardOffset; + } + + public static KinesisShardOffset afterSequenceNumber(String sequenceNumber) { + return afterSequenceNumber(null, null, sequenceNumber); + } + + public static KinesisShardOffset afterSequenceNumber(String stream, String shard, String sequenceNumber) { + KinesisShardOffset kinesisShardOffset = new KinesisShardOffset(ShardIteratorType.AFTER_SEQUENCE_NUMBER); + kinesisShardOffset.stream = stream; + kinesisShardOffset.shard = shard; + kinesisShardOffset.sequenceNumber = sequenceNumber; + return kinesisShardOffset; + } + + public static KinesisShardOffset atTimestamp(Date timestamp) { + return atTimestamp(null, null, timestamp); + } + + public static KinesisShardOffset atTimestamp(String stream, String shard, Date timestamp) { + KinesisShardOffset kinesisShardOffset = new KinesisShardOffset(ShardIteratorType.AT_TIMESTAMP); + kinesisShardOffset.stream = stream; + kinesisShardOffset.shard = shard; + kinesisShardOffset.timestamp = timestamp; + return kinesisShardOffset; + } + } diff --git a/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java b/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java index 22da8188..c7da48a6 100644 --- a/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java +++ b/src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java @@ -213,7 +213,8 @@ void testResharding() throws InterruptedException { assertThat(n).isLessThan(100); // When resharding happens the describeStream() is performed again - verify(this.amazonKinesisForResharding, atLeast(1)).listShards(any(ListShardsRequest.class)); + verify(this.amazonKinesisForResharding, atLeast(1)) + .listShards(any(ListShardsRequest.class)); this.reshardingChannelAdapter.stop();