From bdbc580b02b3843617ed3f3a10c4056864562fa5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Guilherme=20de=20Souza=20Lima?= Date: Sun, 4 Dec 2022 11:07:17 -0300 Subject: [PATCH 1/9] GH-2496: Reuse retry topic for maxInterval delay Resolves spring-projects#2496 Created SameIntervalTopicReuseStrategy, used to keep retrying in the same retry topic when the delays of the last retries are the same. --- .../src/main/asciidoc/retrytopic.adoc | 133 +++++++++++++----- .../kafka/annotation/RetryableTopic.java | 8 ++ .../RetryableTopicAnnotationProcessor.java | 1 + .../DefaultDestinationTopicResolver.java | 16 ++- .../kafka/retrytopic/DestinationTopic.java | 6 +- .../DestinationTopicPropertiesFactory.java | 73 ++++++++-- .../RetryTopicConfigurationBuilder.java | 27 +++- .../SameIntervalTopicReuseStrategy.java | 39 +++++ .../DefaultDestinationTopicResolverTests.java | 37 ++++- ...estinationTopicPropertiesFactoryTests.java | 122 ++++++++++++---- .../retrytopic/DestinationTopicTests.java | 48 ++++++- 11 files changed, 424 insertions(+), 86 deletions(-) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/retrytopic/SameIntervalTopicReuseStrategy.java diff --git a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc index 8272957edf..b0f6066e73 100644 --- a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc +++ b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc @@ -398,39 +398,6 @@ If your back off policy requires delays with values bigger than that, adjust the IMPORTANT: The first attempt counts against `maxAttempts`, so if you provide a `maxAttempts` value of 4 there'll be the original attempt plus 3 retries. -===== Single Topic Fixed Delay Retries - -If you're using fixed delay policies such as `FixedBackOffPolicy` or `NoBackOffPolicy` you can use a single topic to accomplish the non-blocking retries. -This topic will be suffixed with the provided or default suffix, and will not have either the index or the delay values appended. - -==== -[source, java] ----- -@RetryableTopic(backoff = @Backoff(2000), fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC) -@KafkaListener(topics = "my-annotated-topic") -public void processMessage(MyPojo message) { - // ... message processing -} ----- -==== - -==== -[source, java] ----- -@Bean -public RetryTopicConfiguration myRetryTopic(KafkaTemplate template) { - return RetryTopicConfigurationBuilder - .newInstance() - .fixedBackoff(3000) - .maxAttempts(5) - .useSingleTopicForFixedDelays() - .build(); -} ----- -==== - -NOTE: The default behavior is creating separate retry topics for each attempt, appended with their index value: retry-0, retry-1, ... - ===== Global timeout You can set the global timeout for the retrying process. @@ -677,7 +644,7 @@ IMPORTANT: Note that the blocking retries behavior is allowlist - you add the ex IMPORTANT: The non-blocking exception classification behavior also depends on the specific topic's configuration. -==== Topic Naming +==== Topic Amount and Naming Retry topics and DLT are named by suffixing the main topic with a provided or default value, appended by either the delay or index for that topic. @@ -687,6 +654,11 @@ Examples: "my-other-topic" -> "my-topic-myRetrySuffix-1000", "my-topic-myRetrySuffix-2000", ..., "my-topic-myDltSuffix". +NOTE: The default behavior is creating separate retry topics for each attempt, appended with their index value: retry-0, retry-1, ..., retry-n. Therefore, by default the amount of retry topics is the configured `maxAttempts` minus 1. + +You can <>, choose whether to append <>, use a <>, and use a <> when using exponential backoffs. + +[[retry-topics-and-dlt-suffixes]] ===== Retry Topics and Dlt Suffixes You can specify the suffixes that will be used by the retry and dlt topics. @@ -718,6 +690,7 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate t NOTE: The default suffixes are "-retry" and "-dlt", for retry topics and dlt respectively. +[[append-index-or-delay]] ===== Appending the Topic's Index or Delay You can either append the topic's index or delay values after the suffix. @@ -748,6 +721,98 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate templa NOTE: The default behavior is to suffix with the delay values, except for fixed delay configurations with multiple topics, in which case the topics are suffixed with the topic's index. +[[single-topic-fixed-delay]] +===== Single Topic for Fixed Delay Retries + +If you're using fixed delay policies such as `FixedBackOffPolicy` or `NoBackOffPolicy` you can use a single topic to accomplish the non-blocking retries. +This topic will be suffixed with the provided or default suffix, and will not have either the index or the delay values appended. + +==== +[source, java] +---- +@RetryableTopic(backoff = @Backoff(2000), fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC) +@KafkaListener(topics = "my-annotated-topic") +public void processMessage(MyPojo message) { + // ... message processing +} +---- +==== + +==== +[source, java] +---- +@Bean +public RetryTopicConfiguration myRetryTopic(KafkaTemplate template) { + return RetryTopicConfigurationBuilder + .newInstance() + .fixedBackoff(3000) + .maxAttempts(5) + .useSingleTopicForFixedDelays() + .build(); +} +---- +==== + +NOTE: The default behavior is creating separate retry topics for each attempt, appended with their index value: retry-0, retry-1, ... + + +[[single-topic-maxinterval-delay]] +===== Single Topic for maxInterval Exponential Delay + +If you're using exponential backoff policy (`ExponentialBackOffPolicy`), you can use a single retry topic to accomplish the non-blocking retries of the attempts whose delays are the configured `maxInterval`. This "final" retry topic will be suffixed with the provided or default suffix, and will have either the index or the `maxInterval` value appended. + +NOTE: By opting to use a single topic for the retries with the `maxInterval` delay, it may become more viable to configure an exponential retry policy that keeps retrying for a long time, because in this approach you do not need a large amount of topics. + +By default, the default behavior is to work with an amount of retry topics equal to the configured `maxAttempts` minus 1, and when using exponential backoff, the retry topics are suffixed with the delay values, with the last retry topics (corresponding to the `maxInterval` delay) being suffixed with an additional index. + +For instance, when configuring the exponential backoff with `initialInterval=1000`, `multiplier=2`, and `maxInterval=16000`, in order to keep trying for one hour, one would need to configure `maxAttempts` as 229, and by default the needed retry topics would be: + +* -retry-1000 +* -retry-2000 +* -retry-4000 +* -retry-8000 +* -retry-16000-0 +* -retry-16000-1 +* -retry-16000-2 +* ... +* -retry-16000-224 + +When using the strategy that reuses the retry topic for the same intervals, in the same configuration above the needed retry topics would be: + +* -retry-1000 +* -retry-2000 +* -retry-4000 +* -retry-8000 +* -retry-16000 + +==== +[source, java] +---- +@RetryableTopic(attempts = 230, + backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 16000), + sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC) +@KafkaListener(topics = "my-annotated-topic") +public void processMessage(MyPojo message) { + // ... message processing +} +---- +==== + +==== +[source, java] +---- +@Bean +public RetryTopicConfiguration myRetryTopic(KafkaTemplate template) { + return RetryTopicConfigurationBuilder + .newInstance() + .exponentialBackoff(1000, 2, 16000) + .maxAttempts(230) + .useSingleTopicForSameIntervals() + .build(); +} +---- +==== + ===== Custom naming strategies More complex naming strategies can be accomplished by registering a bean that implements `RetryTopicNamesProviderFactory`. The default implementation is `SuffixingRetryTopicNamesProviderFactory` and a different implementation can be registered in the following way: diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java index ab40042188..7479993fc0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java @@ -25,6 +25,7 @@ import org.springframework.kafka.retrytopic.DltStrategy; import org.springframework.kafka.retrytopic.FixedDelayStrategy; import org.springframework.kafka.retrytopic.RetryTopicConstants; +import org.springframework.kafka.retrytopic.SameIntervalTopicReuseStrategy; import org.springframework.kafka.retrytopic.TopicSuffixingStrategy; import org.springframework.retry.annotation.Backoff; @@ -177,6 +178,13 @@ */ TopicSuffixingStrategy topicSuffixingStrategy() default TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE; + + /** + * Topic reuse strategy for sequential attempts made with a same backoff interval. + * @return the strategy. + */ + SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy() default SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS; + /** * Whether or not create a DLT, and redeliver to the DLT if delivery fails or just give up. * @return the dlt strategy. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java index b5b4186d28..9214af535e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java @@ -146,6 +146,7 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method, .dltProcessingFailureStrategy(annotation.dltStrategy()) .autoStartDltHandler(autoStartDlt) .setTopicSuffixingStrategy(annotation.topicSuffixingStrategy()) + .sameIntervalTopicReuseStrategy(annotation.sameIntervalTopicReuseStrategy()) .timeoutAfter(timeout) .create(getKafkaTemplate(resolveExpressionAsString(annotation.kafkaTemplate(), "kafkaTemplate"), topics)); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java index 06587174d0..031aa34987 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java @@ -35,6 +35,7 @@ import org.springframework.kafka.listener.ExceptionClassifier; import org.springframework.kafka.listener.ListenerExecutionFailedException; import org.springframework.kafka.listener.TimestampedException; +import org.springframework.kafka.retrytopic.DestinationTopic.Type; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -133,7 +134,7 @@ && isNotFatalException(e) } private DestinationTopic resolveRetryDestination(DestinationTopicHolder destinationTopicHolder) { - return destinationTopicHolder.getSourceDestination().isSingleTopicRetry() + return destinationTopicHolder.getSourceDestination().isReusableRetryTopic() ? destinationTopicHolder.getSourceDestination() : destinationTopicHolder.getNextDestination(); } @@ -192,6 +193,7 @@ public void addDestinationTopics(String mainListenerId, List d throw new IllegalStateException("Cannot add new destinations, " + DefaultDestinationTopicResolver.class.getSimpleName() + " is already refreshed."); } + validateDestinations(destinationsToAdd); synchronized (this.sourceDestinationsHolderMap) { Map map = this.sourceDestinationsHolderMap.computeIfAbsent(mainListenerId, id -> new HashMap<>()); @@ -199,6 +201,18 @@ public void addDestinationTopics(String mainListenerId, List d } } + private void validateDestinations(List destinationsToAdd) { + for (int i = 0; i < destinationsToAdd.size(); i++) { + DestinationTopic destination = destinationsToAdd.get(i); + if (destination.isReusableRetryTopic()) { + Assert.isTrue((i == (destinationsToAdd.size() - 1) || + ((i == (destinationsToAdd.size() - 2)) && (destinationsToAdd.get(i + 1).isDltTopic()))), + String.format("In the destination topic chain, the type %s can only be " + + "specified as the last retry topic.", Type.REUSABLE_RETRY_TOPIC)); + } + } + } + private Map correlatePairSourceAndDestinationValues( List destinationList) { return IntStream diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java index 609c0cac56..f6a6804f49 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java @@ -68,8 +68,8 @@ public boolean isNoOpsTopic() { return Type.NO_OPS.equals(this.properties.type); } - public boolean isSingleTopicRetry() { - return Type.SINGLE_TOPIC_RETRY.equals(this.properties.type); + public boolean isReusableRetryTopic() { + return Type.REUSABLE_RETRY_TOPIC.equals(this.properties.type); } public boolean isMainTopic() { @@ -280,6 +280,6 @@ public boolean isMainEndpoint() { } enum Type { - MAIN, RETRY, SINGLE_TOPIC_RETRY, DLT, NO_OPS + MAIN, RETRY, REUSABLE_RETRY_TOPIC, DLT, NO_OPS } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java index 327d840323..083d0e0bae 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ import org.springframework.classify.BinaryExceptionClassifier; import org.springframework.kafka.core.KafkaOperations; +import org.springframework.kafka.retrytopic.DestinationTopic.Type; import org.springframework.lang.Nullable; import org.springframework.util.StringUtils; @@ -34,6 +35,7 @@ * * @author Tomaz Fernandes * @author Gary Russell + * @author João Lima * @since 2.7 * */ @@ -59,6 +61,8 @@ public class DestinationTopicPropertiesFactory { private final TopicSuffixingStrategy topicSuffixingStrategy; + private final SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy; + private final long timeout; @Nullable @@ -70,6 +74,7 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff FixedDelayStrategy fixedDelayStrategy, DltStrategy dltStrategy, TopicSuffixingStrategy topicSuffixingStrategy, + SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy, long timeout) { this.dltStrategy = dltStrategy; @@ -78,6 +83,7 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff this.numPartitions = numPartitions; this.fixedDelayStrategy = fixedDelayStrategy; this.topicSuffixingStrategy = topicSuffixingStrategy; + this.sameIntervalTopicReuseStrategy = sameIntervalTopicReuseStrategy; this.timeout = timeout; this.destinationTopicSuffixes = new DestinationTopicSuffixes(retryTopicSuffix, dltSuffix); this.backOffValues = backOffValues; @@ -105,9 +111,9 @@ public List createProperties() { private List createPropertiesForFixedDelaySingleTopic() { return isNoDltStrategy() ? Arrays.asList(createMainTopicProperties(), - createRetryProperties(1, DestinationTopic.Type.SINGLE_TOPIC_RETRY, getShouldRetryOn())) + createRetryProperties(1, DestinationTopic.Type.REUSABLE_RETRY_TOPIC, getShouldRetryOn())) : Arrays.asList(createMainTopicProperties(), - createRetryProperties(1, DestinationTopic.Type.SINGLE_TOPIC_RETRY, getShouldRetryOn()), + createRetryProperties(1, DestinationTopic.Type.REUSABLE_RETRY_TOPIC, getShouldRetryOn()), createDltProperties()); } @@ -119,23 +125,48 @@ private boolean isSingleTopicStrategy() { return FixedDelayStrategy.SINGLE_TOPIC.equals(this.fixedDelayStrategy); } + private boolean isSingleTopicSameIntervalTopicReuseStrategy() { + return SameIntervalTopicReuseStrategy.SINGLE_TOPIC.equals(this.sameIntervalTopicReuseStrategy); + } + private List createPropertiesForDefaultTopicStrategy() { + + int retryTopicsAmount = retryTopicsAmount(); + return IntStream.rangeClosed(0, isNoDltStrategy() - ? this.maxAttempts - 1 - : this.maxAttempts) - .mapToObj(this::createRetryOrDltTopicSuffixes) + ? retryTopicsAmount + : retryTopicsAmount + 1) + .mapToObj(this::createTopicProperties) .collect(Collectors.toList()); } + int retryTopicsAmount() { + return this.backOffValues.size() - reusableTopicAttempts(); + } + + private int reusableTopicAttempts() { + return this.backOffValues.size() > 0 + ? !isFixedDelay() + ? isSingleTopicSameIntervalTopicReuseStrategy() + // Assuming that duplicates are always in + // the end of the list. + ? amountOfDuplicates(this.backOffValues.get(this.backOffValues.size() - 1)) - 1 + : 0 + : isSingleTopicStrategy() + ? this.backOffValues.size() - 1 + : 0 + : 0; + } + private boolean isNoDltStrategy() { return DltStrategy.NO_DLT.equals(this.dltStrategy); } - private DestinationTopic.Properties createRetryOrDltTopicSuffixes(int index) { + private DestinationTopic.Properties createTopicProperties(int index) { BiPredicate shouldRetryOn = getShouldRetryOn(); return index == 0 ? createMainTopicProperties() - : index < this.maxAttempts + : (index <= this.retryTopicsAmount()) ? createRetryProperties(index, DestinationTopic.Type.RETRY, shouldRetryOn) : createDltProperties(); } @@ -160,7 +191,10 @@ private DestinationTopic.Properties createRetryProperties(int index, BiPredicate shouldRetryOn) { int indexInBackoffValues = index - 1; Long thisBackOffValue = this.backOffValues.get(indexInBackoffValues); - return createProperties(topicType, shouldRetryOn, indexInBackoffValues, + DestinationTopic.Type topicTypeToUse = isDelayWithReusedTopic(thisBackOffValue) + ? Type.REUSABLE_RETRY_TOPIC + : topicType; + return createProperties(topicTypeToUse, shouldRetryOn, indexInBackoffValues, getTopicSuffix(indexInBackoffValues, thisBackOffValue)); } @@ -171,10 +205,20 @@ private String getTopicSuffix(int indexInBackoffValues, Long thisBackOffValue) { ? joinWithRetrySuffix(indexInBackoffValues) : hasDuplicates(thisBackOffValue) ? joinWithRetrySuffix(thisBackOffValue) - .concat("-" + getIndexInBackoffValues(indexInBackoffValues, thisBackOffValue)) + .concat(suffixForRepeatedInterval(indexInBackoffValues, thisBackOffValue)) : joinWithRetrySuffix(thisBackOffValue); } + private String suffixForRepeatedInterval(int indexInBackoffValues, Long thisBackOffValue) { + return isSingleTopicSameIntervalTopicReuseStrategy() + ? "" + : "-" + getIndexInBackoffValues(indexInBackoffValues, thisBackOffValue); + } + + private boolean isDelayWithReusedTopic(Long thisBackOffValue) { + return hasDuplicates(thisBackOffValue) && isSingleTopicSameIntervalTopicReuseStrategy(); + } + private int getIndexInBackoffValues(int indexInBackoffValues, Long thisBackOffValue) { return indexInBackoffValues - this.backOffValues.indexOf(thisBackOffValue); } @@ -184,11 +228,14 @@ private boolean isSuffixWithIndexStrategy() { } private boolean hasDuplicates(Long thisBackOffValue) { - return this - .backOffValues + return amountOfDuplicates(thisBackOffValue) > 1; + } + + private int amountOfDuplicates(Long thisBackOffValue) { + return Long.valueOf(this.backOffValues .stream() .filter(value -> value.equals(thisBackOffValue)) - .count() > 1; + .count()).intValue(); } private DestinationTopic.Properties createProperties(DestinationTopic.Type topicType, diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java index 188273d1a0..6b67d0d279 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java @@ -79,6 +79,8 @@ public class RetryTopicConfigurationBuilder { private TopicSuffixingStrategy topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE; + private SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS; + @Nullable private Boolean autoStartDltHandler; @@ -241,6 +243,29 @@ public RetryTopicConfigurationBuilder setTopicSuffixingStrategy(TopicSuffixingSt return this; } + /** + * Configure the retry topic name {@link SameIntervalTopicReuseStrategy}. + * @param sameIntervalTopicReuseStrategy the strategy. + * @return the builder. + */ + public RetryTopicConfigurationBuilder sameIntervalTopicReuseStrategy(SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy) { + this.sameIntervalTopicReuseStrategy = sameIntervalTopicReuseStrategy; + return this; + } + + /** + * For exponential backoff, configure the use of a single retry topic + * for the attempts that have the {@code maxInterval}. + * + * @return the builder. + * @see SameIntervalTopicReuseStrategy#SINGLE_TOPIC + * + */ + public RetryTopicConfigurationBuilder useSingleTopicForSameIntervals() { + this.sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC; + return this; + } + /* ---------------- Configure BackOff -------------- */ /** @@ -553,7 +578,7 @@ public RetryTopicConfiguration create(KafkaOperations sendToTopicKafkaTemp new DestinationTopicPropertiesFactory(this.retryTopicSuffix, this.dltSuffix, backOffValues, buildClassifier(), this.topicCreationConfiguration.getNumPartitions(), sendToTopicKafkaTemplate, this.fixedDelayStrategy, this.dltStrategy, - this.topicSuffixingStrategy, this.timeout) + this.topicSuffixingStrategy, this.sameIntervalTopicReuseStrategy, this.timeout) .autoStartDltHandler(this.autoStartDltHandler) .createProperties(); return new RetryTopicConfiguration(destinationTopicProperties, diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/SameIntervalTopicReuseStrategy.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/SameIntervalTopicReuseStrategy.java new file mode 100644 index 0000000000..7af3218183 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/SameIntervalTopicReuseStrategy.java @@ -0,0 +1,39 @@ +/* + * Copyright 2021-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.retrytopic; + +/** + * + * Strategy for topic reuse when multiple, sequential retries have the same backoff interval. + * + * @author João Lima + * @since 3.1 + * + */ +public enum SameIntervalTopicReuseStrategy { + + /** + * Uses a single retry topic for sequential attempts with the same interval. + */ + SINGLE_TOPIC, + + /** + * Uses one separate topic per retry attempt. + */ + MULTIPLE_TOPICS + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java index bf691d2581..71366a03d3 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java @@ -17,12 +17,15 @@ package org.springframework.kafka.retrytopic; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import static org.assertj.core.api.Assertions.assertThatIllegalStateException; import static org.assertj.core.api.Assertions.assertThatNullPointerException; import java.time.Clock; import java.time.Instant; +import java.util.Arrays; import java.util.Collections; +import java.util.List; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -34,6 +37,7 @@ import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.kafka.listener.ListenerExecutionFailedException; import org.springframework.kafka.listener.TimestampedException; +import org.springframework.kafka.retrytopic.DestinationTopic.Type; import org.springframework.kafka.support.converter.ConversionException; /** @@ -57,8 +61,6 @@ class DefaultDestinationTopicResolverTests extends DestinationTopicTests { private final long originalTimestamp = Instant.now(this.clock).toEpochMilli(); - private final long failureTimestamp = Instant.now(this.clock).plusMillis(500).toEpochMilli(); - @BeforeEach public void setup() { @@ -67,6 +69,7 @@ public void setup() { defaultDestinationTopicContainer.addDestinationTopics("id", allFirstDestinationsTopics); defaultDestinationTopicContainer.addDestinationTopics("id", allSecondDestinationTopics); defaultDestinationTopicContainer.addDestinationTopics("id", allThirdDestinationTopics); + defaultDestinationTopicContainer.addDestinationTopics("id", allFourthDestinationTopics); } @@ -97,6 +100,18 @@ void shouldResolveRetryDestination() { assertThat(defaultDestinationTopicContainer .resolveDestinationTopic("id", dltDestinationTopic2.getDestinationName(), 1, new IllegalArgumentException(), this.originalTimestamp)).isEqualTo(dltDestinationTopic2); + + assertThat(defaultDestinationTopicContainer + .resolveDestinationTopic("id", mainDestinationTopic4.getDestinationName(), 1, + new IllegalArgumentException(), this.originalTimestamp)).isEqualTo(singleFixedRetryDestinationTopic4); + + assertThat(defaultDestinationTopicContainer + .resolveDestinationTopic("id", singleFixedRetryDestinationTopic4.getDestinationName(), maxAttempts - 1, + new IllegalArgumentException(), this.originalTimestamp)).isEqualTo(singleFixedRetryDestinationTopic4); + + assertThat(defaultDestinationTopicContainer + .resolveDestinationTopic("id", singleFixedRetryDestinationTopic4.getDestinationName(), maxAttempts, + new IllegalArgumentException(), this.originalTimestamp)).isEqualTo(dltDestinationTopic4); } @Test @@ -190,6 +205,20 @@ void shouldThrowIfNoDestinationFound() { new IllegalArgumentException(), originalTimestamp)); } + @Test + void shouldThrowIfMultipleReusableRetryTopicsAdded() { + DefaultDestinationTopicResolver destinationResolver = new DefaultDestinationTopicResolver(clock); + destinationResolver.setApplicationContext(applicationContext); + destinationResolver.addDestinationTopics("id", allFirstDestinationsTopics); + + List destinationTopics = Arrays + .asList(mainDestinationTopic4, singleFixedRetryDestinationTopic4, singleFixedRetryDestinationTopic4, dltDestinationTopic4); + + assertThatIllegalArgumentException().isThrownBy( + () -> destinationResolver.addDestinationTopics("id", destinationTopics)) + .withMessageMatching(String.format(".*%s.*last retry topic.*", Type.REUSABLE_RETRY_TOPIC)); + } + @Test void shouldResolveNoOpsIfDltAndNotRetryable() { assertThat(defaultDestinationTopicContainer @@ -197,10 +226,6 @@ void shouldResolveNoOpsIfDltAndNotRetryable() { new RuntimeException(), originalTimestamp)).isEqualTo(noOpsDestinationTopic3); } - private long getExpectedNextExecutionTime(DestinationTopic destinationTopic) { - return failureTimestamp + destinationTopic.getDestinationDelay(); - } - @Test void shouldThrowIfAddsDestinationsAfterClosed() { defaultDestinationTopicContainer diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java index 2dbb0d3064..3c488efe78 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -54,12 +54,18 @@ class DestinationTopicPropertiesFactoryTests { private final FixedDelayStrategy fixedDelayStrategy = FixedDelayStrategy.SINGLE_TOPIC; - private final TopicSuffixingStrategy defaultTopicSuffixingStrategy = + private final TopicSuffixingStrategy suffixWithDelayValueSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE; private final TopicSuffixingStrategy suffixWithIndexTopicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE; + private final SameIntervalTopicReuseStrategy multipleTopicsSameIntervalReuseStrategy = + SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS; + + private final SameIntervalTopicReuseStrategy singleTopicSameIntervalReuseStrategy = + SameIntervalTopicReuseStrategy.SINGLE_TOPIC; + private final DltStrategy dltStrategy = DltStrategy.FAIL_ON_ERROR; @@ -88,7 +94,7 @@ void shouldCreateMainAndDltProperties() { List propertiesList = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, fixedDelayStrategy, - dltStrategy, defaultTopicSuffixingStrategy, RetryTopicConstants.NOT_SET) + dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, RetryTopicConstants.NOT_SET) .createProperties(); // then @@ -131,7 +137,7 @@ void shouldCreateTwoRetryPropertiesForMultipleBackoffValues() { List propertiesList = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, fixedDelayStrategy, - dltStrategy, TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE, RetryTopicConstants.NOT_SET) + dltStrategy, TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE, multipleTopicsSameIntervalReuseStrategy, RetryTopicConstants.NOT_SET) .createProperties(); List destinationTopicList = propertiesList @@ -145,6 +151,7 @@ void shouldCreateTwoRetryPropertiesForMultipleBackoffValues() { assertThat(firstRetryProperties.suffix()).isEqualTo(retryTopicSuffix + "-1000"); assertThat(firstRetryProperties.isDltTopic()).isFalse(); DestinationTopic firstRetryDestinationTopic = destinationTopicList.get(1); + assertThat(firstRetryDestinationTopic.isReusableRetryTopic()).isFalse(); assertThat(firstRetryDestinationTopic.getDestinationDelay()).isEqualTo(1000); assertThat(firstRetryDestinationTopic.getDestinationPartitions()).isEqualTo(numPartitions); assertThat(firstRetryDestinationTopic.shouldRetryOn(0, new IllegalArgumentException())).isTrue(); @@ -155,6 +162,7 @@ void shouldCreateTwoRetryPropertiesForMultipleBackoffValues() { assertThat(secondRetryProperties.suffix()).isEqualTo(retryTopicSuffix + "-2000"); assertThat(secondRetryProperties.isDltTopic()).isFalse(); DestinationTopic secondRetryDestinationTopic = destinationTopicList.get(2); + assertThat(secondRetryDestinationTopic.isReusableRetryTopic()).isFalse(); assertThat(secondRetryDestinationTopic.getDestinationDelay()).isEqualTo(2000); assertThat(secondRetryDestinationTopic.getDestinationPartitions()).isEqualTo(numPartitions); assertThat(secondRetryDestinationTopic.shouldRetryOn(0, new IllegalArgumentException())).isTrue(); @@ -177,14 +185,9 @@ void shouldNotCreateDltProperties() { List propertiesList = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, fixedDelayStrategy, - noDltStrategy, TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE, RetryTopicConstants.NOT_SET) + noDltStrategy, TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE, multipleTopicsSameIntervalReuseStrategy, RetryTopicConstants.NOT_SET) .createProperties(); - List destinationTopicList = propertiesList - .stream() - .map(properties -> new DestinationTopic("mainTopic" + properties.suffix(), properties)) - .collect(Collectors.toList()); - // then assertThat(propertiesList.size() == 3).isTrue(); assertThat(propertiesList.get(2).isDltTopic()).isFalse(); @@ -203,7 +206,7 @@ void shouldCreateOneRetryPropertyForFixedBackoffWithSingleTopicStrategy() { List propertiesList = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, FixedDelayStrategy.SINGLE_TOPIC, - dltStrategy, defaultTopicSuffixingStrategy, -1).createProperties(); + dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, -1).createProperties(); List destinationTopicList = propertiesList .stream() @@ -213,14 +216,13 @@ void shouldCreateOneRetryPropertyForFixedBackoffWithSingleTopicStrategy() { // then assertThat(propertiesList.size() == 3).isTrue(); - DestinationTopic.Properties mainTopicProperties = propertiesList.get(0); DestinationTopic mainDestinationTopic = destinationTopicList.get(0); assertThat(mainDestinationTopic.isMainTopic()).isTrue(); DestinationTopic.Properties firstRetryProperties = propertiesList.get(1); assertThat(firstRetryProperties.suffix()).isEqualTo(retryTopicSuffix); DestinationTopic retryDestinationTopic = destinationTopicList.get(1); - assertThat(retryDestinationTopic.isSingleTopicRetry()).isTrue(); + assertThat(retryDestinationTopic.isReusableRetryTopic()).isTrue(); assertThat(retryDestinationTopic.getDestinationDelay()).isEqualTo(1000); DestinationTopic.Properties dltProperties = propertiesList.get(2); @@ -245,7 +247,7 @@ void shouldCreateRetryPropertiesForFixedBackoffWithMultiTopicStrategy() { new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, FixedDelayStrategy.MULTIPLE_TOPICS, - dltStrategy, defaultTopicSuffixingStrategy, -1).createProperties(); + dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, -1).createProperties(); List destinationTopicList = propertiesList .stream() @@ -255,20 +257,19 @@ void shouldCreateRetryPropertiesForFixedBackoffWithMultiTopicStrategy() { // then assertThat(propertiesList.size() == 4).isTrue(); - DestinationTopic.Properties mainTopicProperties = propertiesList.get(0); DestinationTopic mainDestinationTopic = destinationTopicList.get(0); assertThat(mainDestinationTopic.isMainTopic()).isTrue(); DestinationTopic.Properties firstRetryProperties = propertiesList.get(1); assertThat(firstRetryProperties.suffix()).isEqualTo(retryTopicSuffix + "-0"); DestinationTopic retryDestinationTopic = destinationTopicList.get(1); - assertThat(retryDestinationTopic.isSingleTopicRetry()).isFalse(); + assertThat(retryDestinationTopic.isReusableRetryTopic()).isFalse(); assertThat(retryDestinationTopic.getDestinationDelay()).isEqualTo(5000); DestinationTopic.Properties secondRetryProperties = propertiesList.get(2); assertThat(secondRetryProperties.suffix()).isEqualTo(retryTopicSuffix + "-1"); DestinationTopic secondRetryDestinationTopic = destinationTopicList.get(2); - assertThat(secondRetryDestinationTopic.isSingleTopicRetry()).isFalse(); + assertThat(secondRetryDestinationTopic.isReusableRetryTopic()).isFalse(); assertThat(secondRetryDestinationTopic.getDestinationDelay()).isEqualTo(5000); DestinationTopic.Properties dltProperties = propertiesList.get(3); @@ -292,7 +293,7 @@ void shouldSuffixRetryTopicsWithIndexIfSuffixWithIndexStrategy() { new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, FixedDelayStrategy.SINGLE_TOPIC, - dltStrategy, suffixWithIndexTopicSuffixingStrategy, -1).createProperties(); + dltStrategy, suffixWithIndexTopicSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, -1).createProperties(); // then IntStream.range(1, maxAttempts) @@ -313,7 +314,7 @@ void shouldSuffixRetryTopicsWithIndexIfFixedDelayWithMultipleTopics() { new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, FixedDelayStrategy.MULTIPLE_TOPICS, - dltStrategy, suffixWithIndexTopicSuffixingStrategy, -1).createProperties(); + dltStrategy, suffixWithIndexTopicSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, -1).createProperties(); // then IntStream.range(1, maxAttempts) @@ -332,13 +333,15 @@ void shouldSuffixRetryTopicsWithMixedIfMaxDelayReached() { List backOffValues = new BackOffValuesGenerator(maxAttempts, backOffPolicy).generateValues(); // when - List propertiesList = - new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, - classifier, numPartitions, kafkaOperations, - FixedDelayStrategy.MULTIPLE_TOPICS, - dltStrategy, defaultTopicSuffixingStrategy, -1).createProperties(); + DestinationTopicPropertiesFactory factory = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, + classifier, numPartitions, kafkaOperations, + FixedDelayStrategy.MULTIPLE_TOPICS, + dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, -1); + + List propertiesList = factory.createProperties(); // then + assertThat(factory.retryTopicsAmount() == 4).isTrue(); assertThat(propertiesList.size() == 6).isTrue(); assertThat(propertiesList.get(0).suffix()).isEqualTo(""); assertThat(propertiesList.get(1).suffix()).isEqualTo(retryTopicSuffix + "-1000"); @@ -347,4 +350,75 @@ void shouldSuffixRetryTopicsWithMixedIfMaxDelayReached() { assertThat(propertiesList.get(4).suffix()).isEqualTo(retryTopicSuffix + "-3000-1"); assertThat(propertiesList.get(5).suffix()).isEqualTo(dltSuffix); } + + @Test + void shouldReuseRetryTopicsIfMaxDelayReachedWithDelayValueSuffixingStrategy() { + + // setup + ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); + backOffPolicy.setInitialInterval(1000); + backOffPolicy.setMultiplier(2); + backOffPolicy.setMaxInterval(3000); + int maxAttempts = 5; + List backOffValues = new BackOffValuesGenerator(maxAttempts, backOffPolicy).generateValues(); + + // when + DestinationTopicPropertiesFactory factory = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, + classifier, numPartitions, kafkaOperations, + FixedDelayStrategy.MULTIPLE_TOPICS, + dltStrategy, suffixWithDelayValueSuffixingStrategy, singleTopicSameIntervalReuseStrategy, -1); + + List propertiesList = factory.createProperties(); + + // then + assertThat(factory.retryTopicsAmount()).isEqualTo(3); + assertThat(propertiesList.size()).isEqualTo(5); + assertThat(propertiesList.get(0).suffix()).isEqualTo(""); + assertRetryTopic(propertiesList.get(1), maxAttempts, 1000L, retryTopicSuffix + "-1000", false); + assertRetryTopic(propertiesList.get(2), maxAttempts, 2000L, retryTopicSuffix + "-2000", false); + assertRetryTopic(propertiesList.get(3), maxAttempts, 3000L, retryTopicSuffix + "-3000", true); + assertThat(propertiesList.get(4).suffix()).isEqualTo(dltSuffix); + } + + @Test + void shouldReuseRetryTopicsIfMaxDelayReachedWithIndexValueSuffixingStrategy() { + + // setup + ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); + backOffPolicy.setInitialInterval(1000); + backOffPolicy.setMultiplier(2); + backOffPolicy.setMaxInterval(3000); + int maxAttempts = 5; + List backOffValues = new BackOffValuesGenerator(maxAttempts, backOffPolicy).generateValues(); + + // when + DestinationTopicPropertiesFactory factory = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, + classifier, numPartitions, kafkaOperations, + FixedDelayStrategy.MULTIPLE_TOPICS, + dltStrategy, suffixWithIndexTopicSuffixingStrategy, singleTopicSameIntervalReuseStrategy, -1); + + List propertiesList = factory.createProperties(); + + // then + assertThat(factory.retryTopicsAmount()).isEqualTo(3); + assertThat(propertiesList.size()).isEqualTo(5); + assertThat(propertiesList.get(0).suffix()).isEqualTo(""); + assertRetryTopic(propertiesList.get(1), maxAttempts, 1000L, retryTopicSuffix + "-0", false); + assertRetryTopic(propertiesList.get(2), maxAttempts, 2000L, retryTopicSuffix + "-1", false); + assertRetryTopic(propertiesList.get(3), maxAttempts, 3000L, retryTopicSuffix + "-2", true); + assertThat(propertiesList.get(4).suffix()).isEqualTo(dltSuffix); + } + + private void assertRetryTopic(DestinationTopic.Properties topicProperties, int maxAttempts, + Long expectedDelay, String expectedSuffix, boolean expectedReusableTopic) { + assertThat(topicProperties.suffix()).isEqualTo(expectedSuffix); + DestinationTopic topic = new DestinationTopic("irrelevant" + topicProperties.suffix(), topicProperties); + assertThat(topic.isDltTopic()).isFalse(); + assertThat(topic.isReusableRetryTopic()).isEqualTo(expectedReusableTopic); + assertThat(topic.getDestinationDelay()).isEqualTo(expectedDelay); + assertThat(topic.getDestinationPartitions()).isEqualTo(numPartitions); + assertThat(topic.shouldRetryOn(0, new IllegalArgumentException())).isTrue(); + assertThat(topic.shouldRetryOn(maxAttempts, new IllegalArgumentException())).isFalse(); + assertThat(topic.shouldRetryOn(0, new RuntimeException())).isFalse(); + } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java index 4f5dd74fd4..287d3ad693 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java @@ -56,12 +56,12 @@ public class DestinationTopicTests { // MaxAttempts - private final int maxAttempts = 3; + protected final int maxAttempts = 3; // DestinationTopic Properties protected DestinationTopic.Properties mainTopicProps = - new DestinationTopic.Properties(0, "", DestinationTopic.Type.RETRY, 4, 1, + new DestinationTopic.Properties(0, "", DestinationTopic.Type.MAIN, 4, 1, DltStrategy.FAIL_ON_ERROR, kafkaOperations1, getShouldRetryOnDenyList(), noTimeout); protected DestinationTopic.Properties firstRetryProps = @@ -80,7 +80,7 @@ public class DestinationTopicTests { .asList(mainTopicProps, firstRetryProps, secondRetryProps, dltTopicProps); protected DestinationTopic.Properties mainTopicProps2 = - new DestinationTopic.Properties(0, "", DestinationTopic.Type.RETRY, 4, 1, + new DestinationTopic.Properties(0, "", DestinationTopic.Type.MAIN, 4, 1, DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout); protected DestinationTopic.Properties firstRetryProps2 = @@ -99,7 +99,7 @@ public class DestinationTopicTests { .asList(mainTopicProps2, firstRetryProps2, secondRetryProps2, dltTopicProps2); protected DestinationTopic.Properties mainTopicProps3 = - new DestinationTopic.Properties(0, "", DestinationTopic.Type.RETRY, 4, 1, + new DestinationTopic.Properties(0, "", DestinationTopic.Type.MAIN, 4, 1, DltStrategy.NO_DLT, kafkaOperations2, getShouldRetryOn(), timeout); protected DestinationTopic.Properties firstRetryProps3 = @@ -113,6 +113,20 @@ public class DestinationTopicTests { protected List allProps3 = Arrays .asList(mainTopicProps3, firstRetryProps3, secondRetryProps3); + protected DestinationTopic.Properties mainTopicProps4 = + new DestinationTopic.Properties(0, "", DestinationTopic.Type.MAIN, 4, 1, + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout); + + protected DestinationTopic.Properties singleFixedRetryTopicProps4 = + new DestinationTopic.Properties(1000, retrySuffix, DestinationTopic.Type.REUSABLE_RETRY_TOPIC, 4, 1, + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout); + + protected DestinationTopic.Properties dltTopicProps4 = + new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1, + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout); + + protected List allProps4 = Arrays + .asList(mainTopicProps4, singleFixedRetryTopicProps4, dltTopicProps4); // Holders @@ -160,6 +174,20 @@ public class DestinationTopicTests { protected List allThirdDestinationHolders = Arrays .asList(mainDestinationHolder3, firstRetryDestinationHolder3, secondRetryDestinationHolder3); + protected final static String FOURTH_TOPIC = "fourthTopic"; + + protected PropsHolder mainDestinationHolder4 = + new PropsHolder(FOURTH_TOPIC, mainTopicProps4); + + protected PropsHolder singleFixedRetryDestinationHolder4 = + new PropsHolder(FOURTH_TOPIC, singleFixedRetryTopicProps4); + + protected PropsHolder dltDestinationHolder4 = + new PropsHolder(FOURTH_TOPIC, dltTopicProps4); + + protected List allFourthDestinationHolders = Arrays + .asList(mainDestinationHolder4, singleFixedRetryDestinationHolder4, dltDestinationHolder4); + // DestinationTopics protected DestinationTopic mainDestinationTopic = @@ -216,6 +244,18 @@ public class DestinationTopicTests { protected List allThirdDestinationTopics = Arrays .asList(mainDestinationTopic3, firstRetryDestinationTopic3, secondRetryDestinationTopic3); + protected DestinationTopic mainDestinationTopic4 = + new DestinationTopic(FOURTH_TOPIC + mainTopicProps4.suffix(), mainTopicProps4); + + protected DestinationTopic singleFixedRetryDestinationTopic4 = + new DestinationTopic(FOURTH_TOPIC + singleFixedRetryTopicProps4.suffix(), singleFixedRetryTopicProps4); + + protected DestinationTopic dltDestinationTopic4 = + new DestinationTopic(FOURTH_TOPIC + dltTopicProps4.suffix(), dltTopicProps4); + + protected List allFourthDestinationTopics = Arrays + .asList(mainDestinationTopic4, singleFixedRetryDestinationTopic4, dltDestinationTopic4); + // Classifiers private final BinaryExceptionClassifier allowListClassifier = new BinaryExceptionClassifierBuilder() From c5beccec4ccebd316f1b831e92a69d9462b88dfc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Lima?= Date: Mon, 5 Dec 2022 00:52:16 -0300 Subject: [PATCH 2/9] GH-2496 Reference documentation typos. --- spring-kafka-docs/src/main/asciidoc/retrytopic.adoc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc index b0f6066e73..347135b7bc 100644 --- a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc +++ b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc @@ -656,7 +656,7 @@ Examples: NOTE: The default behavior is creating separate retry topics for each attempt, appended with their index value: retry-0, retry-1, ..., retry-n. Therefore, by default the amount of retry topics is the configured `maxAttempts` minus 1. -You can <>, choose whether to append <>, use a <>, and use a <> when using exponential backoffs. +You can <>, choose whether to append <>, use a <>, and use a <> when using exponential backoffs. [[retry-topics-and-dlt-suffixes]] ===== Retry Topics and Dlt Suffixes @@ -763,7 +763,7 @@ If you're using exponential backoff policy (`ExponentialBackOffPolicy`), you can NOTE: By opting to use a single topic for the retries with the `maxInterval` delay, it may become more viable to configure an exponential retry policy that keeps retrying for a long time, because in this approach you do not need a large amount of topics. -By default, the default behavior is to work with an amount of retry topics equal to the configured `maxAttempts` minus 1, and when using exponential backoff, the retry topics are suffixed with the delay values, with the last retry topics (corresponding to the `maxInterval` delay) being suffixed with an additional index. +The default behavior is to work with an amount of retry topics equal to the configured `maxAttempts` minus 1, and when using exponential backoff, the retry topics are suffixed with the delay values, with the last retry topics (corresponding to the `maxInterval` delay) being suffixed with an additional index. For instance, when configuring the exponential backoff with `initialInterval=1000`, `multiplier=2`, and `maxInterval=16000`, in order to keep trying for one hour, one would need to configure `maxAttempts` as 229, and by default the needed retry topics would be: From 4c6cf75da22b6df3df348381c6f93849c05402ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Lima?= Date: Tue, 31 Jan 2023 21:47:03 -0300 Subject: [PATCH 3/9] GH-2496: Deprecation of FixedDelayStrategy. RetryTopic docs and code fixes and small refactorings. --- .../src/main/asciidoc/index.adoc | 2 +- .../src/main/asciidoc/retrytopic.adoc | 28 ++++--- .../kafka/annotation/RetryableTopic.java | 4 +- .../DestinationTopicPropertiesFactory.java | 35 ++++++--- .../kafka/retrytopic/FixedDelayStrategy.java | 4 +- .../RetryTopicConfigurationBuilder.java | 19 +++-- .../SameIntervalTopicReuseStrategy.java | 10 ++- ...estinationTopicPropertiesFactoryTests.java | 75 +++++++++++++------ 8 files changed, 125 insertions(+), 52 deletions(-) diff --git a/spring-kafka-docs/src/main/asciidoc/index.adoc b/spring-kafka-docs/src/main/asciidoc/index.adoc index 6226ded78c..73e74ccbc8 100644 --- a/spring-kafka-docs/src/main/asciidoc/index.adoc +++ b/spring-kafka-docs/src/main/asciidoc/index.adoc @@ -17,7 +17,7 @@ ifdef::backend-pdf[] NOTE: This documentation is also available as https://docs.spring.io/spring-kafka/docs/{project-version}/reference/html/index.html[HTML]. endif::[] -(C) 2016 - 2022 VMware, Inc. +(C) 2016 - 2023 VMware, Inc. Copies of this document may be made for your own use and for distribution to others, provided that you do not charge any fee for such copies and further provided that each copy contains this Copyright Notice, whether distributed in print or electronically. diff --git a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc index b881d13da0..a673ca8397 100644 --- a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc +++ b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc @@ -146,7 +146,8 @@ public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate>, to handle deserilialization exceptions, it is important to configure the `KafkaTemplate` and its producer with a serializer that can handle normal objects as well as raw `byte[]` values, which result from deserialization exceptions. The generic value type of the template should be `Object`. @@ -650,7 +651,7 @@ IMPORTANT: Note that the blocking retries behavior is allowlist - you add the ex IMPORTANT: The non-blocking exception classification behavior also depends on the specific topic's configuration. -==== Topic Amount and Naming +==== Topic Amount Retry topics and DLT are named by suffixing the main topic with a provided or default value, appended by either the delay or index for that topic. @@ -660,7 +661,8 @@ Examples: "my-other-topic" -> "my-topic-myRetrySuffix-1000", "my-topic-myRetrySuffix-2000", ..., "my-topic-myDltSuffix". -NOTE: The default behavior is creating separate retry topics for each attempt, appended with their index value: retry-0, retry-1, ..., retry-n. Therefore, by default the amount of retry topics is the configured `maxAttempts` minus 1. +NOTE: The default behavior is to create separate retry topics for each attempt, appended with an index value: retry-0, retry-1, ..., retry-n. +Therefore, by default the amount of retry topics is the configured `maxAttempts` minus 1. You can <>, choose whether to append <>, use a <>, and use a <> when using exponential backoffs. @@ -733,6 +735,8 @@ NOTE: The default behavior is to suffix with the delay values, except for fixed If you're using fixed delay policies such as `FixedBackOffPolicy` or `NoBackOffPolicy` you can use a single topic to accomplish the non-blocking retries. This topic will be suffixed with the provided or default suffix, and will not have either the index or the delay values appended. +NOTE: `FixedDelayStrategy` is now deprecated, and will be replaced by `SameIntervalTopicReuseStrategy` in a future release. + ==== [source, java] ---- @@ -754,7 +758,7 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate templa .fixedBackoff(3000) .maxAttempts(5) .useSingleTopicForFixedDelays() - .build(); + .create(template); } ---- ==== @@ -765,7 +769,9 @@ NOTE: The default behavior is creating separate retry topics for each attempt, a [[single-topic-maxinterval-delay]] ===== Single Topic for maxInterval Exponential Delay -If you're using exponential backoff policy (`ExponentialBackOffPolicy`), you can use a single retry topic to accomplish the non-blocking retries of the attempts whose delays are the configured `maxInterval`. This "final" retry topic will be suffixed with the provided or default suffix, and will have either the index or the `maxInterval` value appended. +If you're using exponential backoff policy (`ExponentialBackOffPolicy`), you can use a single retry topic to accomplish the non-blocking retries of the attempts whose delays are the configured `maxInterval`. + +This "final" retry topic will be suffixed with the provided or default suffix, and will have either the index or the `maxInterval` value appended. NOTE: By opting to use a single topic for the retries with the `maxInterval` delay, it may become more viable to configure an exponential retry policy that keeps retrying for a long time, because in this approach you do not need a large amount of topics. @@ -814,14 +820,15 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate templa .exponentialBackoff(1000, 2, 16000) .maxAttempts(230) .useSingleTopicForSameIntervals() - .build(); + .create(template); } ---- ==== ===== Custom naming strategies -More complex naming strategies can be accomplished by registering a bean that implements `RetryTopicNamesProviderFactory`. The default implementation is `SuffixingRetryTopicNamesProviderFactory` and a different implementation can be registered in the following way: +More complex naming strategies can be accomplished by registering a bean that implements `RetryTopicNamesProviderFactory`. +The default implementation is `SuffixingRetryTopicNamesProviderFactory` and a different implementation can be registered in the following way: ==== [source, java] @@ -901,7 +908,9 @@ The framework will configure and use a separate set of retry topics for each lis ==== Dlt Strategies -The framework provides a few strategies for working with DLTs. You can provide a method for DLT processing, use the default logging method, or have no DLT at all. Also you can choose what happens if DLT processing fails. +The framework provides a few strategies for working with DLTs. +You can provide a method for DLT processing, use the default logging method, or have no DLT at all. +Also you can choose what happens if DLT processing fails. ===== Dlt Processing Method @@ -1110,7 +1119,8 @@ Use the `DestinationTopicResolver` interface if you need to weigh in these facto [[change-kboe-logging-level]] ==== Changing KafkaBackOffException Logging Level -When a message in the retry topic is not due for consumption, a `KafkaBackOffException` is thrown. Such exceptions are logged by default at `DEBUG` level, but you can change this behavior by setting an error handler customizer in the `ListenerContainerFactoryConfigurer` in a `@Configuration` class. +When a message in the retry topic is not due for consumption, a `KafkaBackOffException` is thrown. +Such exceptions are logged by default at `DEBUG` level, but you can change this behavior by setting an error handler customizer in the `ListenerContainerFactoryConfigurer` in a `@Configuration` class. For example, to change the logging level to WARN you might add: diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java index 7479993fc0..7f02a013c7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -194,7 +194,9 @@ /** * Whether to use a single or multiple topics when using a fixed delay. * @return the fixed delay strategy. + * @deprecated in a future release, will be replaced by {@link #sameIntervalTopicReuseStrategy()}. */ + @Deprecated(forRemoval = true) // in 3.1 FixedDelayStrategy fixedDelayTopicStrategy() default FixedDelayStrategy.MULTIPLE_TOPICS; /** diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java index 083d0e0bae..47f5103901 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -91,6 +91,17 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff this.maxAttempts = this.backOffValues.size() + 1; } + public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuffix, List backOffValues, + BinaryExceptionClassifier exceptionClassifier, + int numPartitions, KafkaOperations kafkaOperations, + FixedDelayStrategy fixedDelayStrategy, + DltStrategy dltStrategy, + TopicSuffixingStrategy topicSuffixingStrategy, + long timeout) { + this(retryTopicSuffix, dltSuffix, backOffValues, exceptionClassifier, numPartitions, kafkaOperations, + fixedDelayStrategy, dltStrategy, topicSuffixingStrategy, SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS, + timeout); + } /** * Set to false to not start the DLT handler. * @param autoStart false to not start. @@ -111,9 +122,9 @@ public List createProperties() { private List createPropertiesForFixedDelaySingleTopic() { return isNoDltStrategy() ? Arrays.asList(createMainTopicProperties(), - createRetryProperties(1, DestinationTopic.Type.REUSABLE_RETRY_TOPIC, getShouldRetryOn())) + createRetryProperties(1, getShouldRetryOn())) : Arrays.asList(createMainTopicProperties(), - createRetryProperties(1, DestinationTopic.Type.REUSABLE_RETRY_TOPIC, getShouldRetryOn()), + createRetryProperties(1, getShouldRetryOn()), createDltProperties()); } @@ -134,10 +145,10 @@ private List createPropertiesForDefaultTopicStrateg int retryTopicsAmount = retryTopicsAmount(); return IntStream.rangeClosed(0, isNoDltStrategy() - ? retryTopicsAmount - : retryTopicsAmount + 1) - .mapToObj(this::createTopicProperties) - .collect(Collectors.toList()); + ? retryTopicsAmount + : retryTopicsAmount + 1) + .mapToObj(this::createTopicProperties) + .collect(Collectors.toList()); } int retryTopicsAmount() { @@ -167,7 +178,7 @@ private DestinationTopic.Properties createTopicProperties(int index) { return index == 0 ? createMainTopicProperties() : (index <= this.retryTopicsAmount()) - ? createRetryProperties(index, DestinationTopic.Type.RETRY, shouldRetryOn) + ? createRetryProperties(index, shouldRetryOn) : createDltProperties(); } @@ -187,13 +198,12 @@ private BiPredicate getShouldRetryOn() { } private DestinationTopic.Properties createRetryProperties(int index, - DestinationTopic.Type topicType, BiPredicate shouldRetryOn) { int indexInBackoffValues = index - 1; Long thisBackOffValue = this.backOffValues.get(indexInBackoffValues); DestinationTopic.Type topicTypeToUse = isDelayWithReusedTopic(thisBackOffValue) ? Type.REUSABLE_RETRY_TOPIC - : topicType; + : Type.RETRY; return createProperties(topicTypeToUse, shouldRetryOn, indexInBackoffValues, getTopicSuffix(indexInBackoffValues, thisBackOffValue)); } @@ -215,8 +225,9 @@ private String suffixForRepeatedInterval(int indexInBackoffValues, Long thisBack : "-" + getIndexInBackoffValues(indexInBackoffValues, thisBackOffValue); } - private boolean isDelayWithReusedTopic(Long thisBackOffValue) { - return hasDuplicates(thisBackOffValue) && isSingleTopicSameIntervalTopicReuseStrategy(); + private boolean isDelayWithReusedTopic(Long backoffValue) { + return ((isSingleTopicFixedDelay()) || + (hasDuplicates(backoffValue) && isSingleTopicSameIntervalTopicReuseStrategy())); } private int getIndexInBackoffValues(int indexInBackoffValues, Long thisBackOffValue) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/FixedDelayStrategy.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/FixedDelayStrategy.java index 1dce512510..6f779249e5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/FixedDelayStrategy.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/FixedDelayStrategy.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,8 +22,10 @@ * * @author Tomaz Fernandes * @since 2.7 + * @deprecated in a future release, will be replaced by {@link SameIntervalTopicReuseStrategy}. * */ +@Deprecated(forRemoval = true) // in 3.1 public enum FixedDelayStrategy { /** diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java index 6b67d0d279..d1557d0c41 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -244,7 +244,7 @@ public RetryTopicConfigurationBuilder setTopicSuffixingStrategy(TopicSuffixingSt } /** - * Configure the retry topic name {@link SameIntervalTopicReuseStrategy}. + * Configure the {@link SameIntervalTopicReuseStrategy}. * @param sameIntervalTopicReuseStrategy the strategy. * @return the builder. */ @@ -254,11 +254,16 @@ public RetryTopicConfigurationBuilder sameIntervalTopicReuseStrategy(SameInterva } /** - * For exponential backoff, configure the use of a single retry topic - * for the attempts that have the {@code maxInterval}. + * Configure the use of a single retry topic + * for the attempts that have the same backoff interval + * (as long as these attempts are in the middle of the chain). + * + * Currently used only for the last retries of exponential backoff, + * and in a future release this will dictate whether to use + * a single retry topic for fixed backoff. * * @return the builder. - * @see SameIntervalTopicReuseStrategy#SINGLE_TOPIC + * @see SameIntervalTopicReuseStrategy * */ public RetryTopicConfigurationBuilder useSingleTopicForSameIntervals() { @@ -399,8 +404,10 @@ public RetryTopicConfigurationBuilder fixedBackOff(int interval) { /** * Configure the use of a single retry topic with fixed delays. * @return the builder. + * @deprecated in a future release, configuration for single retry topic with fixed delays will have to be done with {@link #useSingleTopicForSameIntervals()}. * @see FixedDelayStrategy#SINGLE_TOPIC */ + @Deprecated(forRemoval = true) // in 3.1 public RetryTopicConfigurationBuilder useSingleTopicForFixedDelays() { this.fixedDelayStrategy = FixedDelayStrategy.SINGLE_TOPIC; return this; @@ -411,7 +418,9 @@ public RetryTopicConfigurationBuilder useSingleTopicForFixedDelays() { * {@link FixedDelayStrategy#MULTIPLE_TOPICS}. * @param delayStrategy the delay strategy. * @return the builder. + * @deprecated in a future release, retry topic reuse configuration for fixed delays will have to be done with {@link #sameIntervalTopicReuseStrategy(SameIntervalTopicReuseStrategy)}. */ + @Deprecated(forRemoval = true) // in 3.1 public RetryTopicConfigurationBuilder useSingleTopicForFixedDelays(FixedDelayStrategy delayStrategy) { this.fixedDelayStrategy = delayStrategy; return this; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/SameIntervalTopicReuseStrategy.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/SameIntervalTopicReuseStrategy.java index 7af3218183..ed6eff0929 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/SameIntervalTopicReuseStrategy.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/SameIntervalTopicReuseStrategy.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2022 the original author or authors. + * Copyright 2021-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,13 @@ /** * - * Strategy for topic reuse when multiple, sequential retries have the same backoff interval. + * Strategy for topic reuse when multiple, sequential retries have the same backoff + * interval. + * + *

+ * It can be used only when the retries that have the same interval are located + * in the end of the retry chain (it cannot be used for retries with the same + * interval in the middle of the retry chain). * * @author João Lima * @since 3.1 diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java index 3c488efe78..875c516de7 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -94,8 +94,8 @@ void shouldCreateMainAndDltProperties() { List propertiesList = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, fixedDelayStrategy, - dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, RetryTopicConstants.NOT_SET) - .createProperties(); + dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, + RetryTopicConstants.NOT_SET).createProperties(); // then assertThat(propertiesList.size() == 2).isTrue(); @@ -137,8 +137,8 @@ void shouldCreateTwoRetryPropertiesForMultipleBackoffValues() { List propertiesList = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, fixedDelayStrategy, - dltStrategy, TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE, multipleTopicsSameIntervalReuseStrategy, RetryTopicConstants.NOT_SET) - .createProperties(); + dltStrategy, TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE, + multipleTopicsSameIntervalReuseStrategy, RetryTopicConstants.NOT_SET).createProperties(); List destinationTopicList = propertiesList .stream() @@ -184,9 +184,10 @@ void shouldNotCreateDltProperties() { List backOffValues = new BackOffValuesGenerator(maxAttempts, backOffPolicy).generateValues(); List propertiesList = - new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, fixedDelayStrategy, - noDltStrategy, TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE, multipleTopicsSameIntervalReuseStrategy, RetryTopicConstants.NOT_SET) - .createProperties(); + new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, + numPartitions, kafkaOperations, fixedDelayStrategy, noDltStrategy, + TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE, multipleTopicsSameIntervalReuseStrategy, + RetryTopicConstants.NOT_SET).createProperties(); // then assertThat(propertiesList.size() == 3).isTrue(); @@ -206,7 +207,8 @@ void shouldCreateOneRetryPropertyForFixedBackoffWithSingleTopicStrategy() { List propertiesList = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, FixedDelayStrategy.SINGLE_TOPIC, - dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, -1).createProperties(); + dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, + -1).createProperties(); List destinationTopicList = propertiesList .stream() @@ -247,7 +249,8 @@ void shouldCreateRetryPropertiesForFixedBackoffWithMultiTopicStrategy() { new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, FixedDelayStrategy.MULTIPLE_TOPICS, - dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, -1).createProperties(); + dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, + -1).createProperties(); List destinationTopicList = propertiesList .stream() @@ -293,11 +296,12 @@ void shouldSuffixRetryTopicsWithIndexIfSuffixWithIndexStrategy() { new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, FixedDelayStrategy.SINGLE_TOPIC, - dltStrategy, suffixWithIndexTopicSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, -1).createProperties(); + dltStrategy, suffixWithIndexTopicSuffixingStrategy, + multipleTopicsSameIntervalReuseStrategy, -1).createProperties(); // then - IntStream.range(1, maxAttempts) - .forEach(index -> assertThat(propertiesList.get(index).suffix()).isEqualTo(retryTopicSuffix + "-" + String.valueOf(index - 1))); + IntStream.range(1, maxAttempts).forEach(index -> assertThat(propertiesList.get(index).suffix()) + .isEqualTo(retryTopicSuffix + "-" + String.valueOf(index - 1))); } @Test @@ -314,11 +318,13 @@ void shouldSuffixRetryTopicsWithIndexIfFixedDelayWithMultipleTopics() { new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, FixedDelayStrategy.MULTIPLE_TOPICS, - dltStrategy, suffixWithIndexTopicSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, -1).createProperties(); + dltStrategy, suffixWithIndexTopicSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, + -1).createProperties(); // then IntStream.range(1, maxAttempts) - .forEach(index -> assertThat(propertiesList.get(index).suffix()).isEqualTo(retryTopicSuffix + "-" + String.valueOf(index - 1))); + .forEach(index -> assertThat(propertiesList.get(index).suffix()).isEqualTo(retryTopicSuffix + + "-" + String.valueOf(index - 1))); } @Test @@ -333,8 +339,8 @@ void shouldSuffixRetryTopicsWithMixedIfMaxDelayReached() { List backOffValues = new BackOffValuesGenerator(maxAttempts, backOffPolicy).generateValues(); // when - DestinationTopicPropertiesFactory factory = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, - classifier, numPartitions, kafkaOperations, + DestinationTopicPropertiesFactory factory = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, + backOffValues, classifier, numPartitions, kafkaOperations, FixedDelayStrategy.MULTIPLE_TOPICS, dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, -1); @@ -363,8 +369,8 @@ void shouldReuseRetryTopicsIfMaxDelayReachedWithDelayValueSuffixingStrategy() { List backOffValues = new BackOffValuesGenerator(maxAttempts, backOffPolicy).generateValues(); // when - DestinationTopicPropertiesFactory factory = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, - classifier, numPartitions, kafkaOperations, + DestinationTopicPropertiesFactory factory = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, + backOffValues, classifier, numPartitions, kafkaOperations, FixedDelayStrategy.MULTIPLE_TOPICS, dltStrategy, suffixWithDelayValueSuffixingStrategy, singleTopicSameIntervalReuseStrategy, -1); @@ -392,8 +398,8 @@ void shouldReuseRetryTopicsIfMaxDelayReachedWithIndexValueSuffixingStrategy() { List backOffValues = new BackOffValuesGenerator(maxAttempts, backOffPolicy).generateValues(); // when - DestinationTopicPropertiesFactory factory = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, - classifier, numPartitions, kafkaOperations, + DestinationTopicPropertiesFactory factory = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, + backOffValues, classifier, numPartitions, kafkaOperations, FixedDelayStrategy.MULTIPLE_TOPICS, dltStrategy, suffixWithIndexTopicSuffixingStrategy, singleTopicSameIntervalReuseStrategy, -1); @@ -409,6 +415,33 @@ void shouldReuseRetryTopicsIfMaxDelayReachedWithIndexValueSuffixingStrategy() { assertThat(propertiesList.get(4).suffix()).isEqualTo(dltSuffix); } + @Test + void shouldNotReuseRetryTopicsIfRepeatedIntervalsAreInTheMiddleOfChain() { + + // setup + List backOffValues = List.of(1000L, 2000L, 2000L, 2000L, 3000L); + int maxAttempts = backOffValues.size() + 1; + + // when + DestinationTopicPropertiesFactory factory = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, + backOffValues, classifier, numPartitions, kafkaOperations, + FixedDelayStrategy.SINGLE_TOPIC, + dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, -1); + + List propertiesList = factory.createProperties(); + + // then + assertThat(factory.retryTopicsAmount()).isEqualTo(5); + assertThat(propertiesList.size()).isEqualTo(7); + assertThat(propertiesList.get(0).suffix()).isEqualTo(""); + assertRetryTopic(propertiesList.get(1), maxAttempts, 1000L, retryTopicSuffix + "-1000", false); + assertRetryTopic(propertiesList.get(2), maxAttempts, 2000L, retryTopicSuffix + "-2000-0", false); + assertRetryTopic(propertiesList.get(3), maxAttempts, 2000L, retryTopicSuffix + "-2000-1", false); + assertRetryTopic(propertiesList.get(4), maxAttempts, 2000L, retryTopicSuffix + "-2000-2", false); + assertRetryTopic(propertiesList.get(5), maxAttempts, 3000L, retryTopicSuffix + "-3000", false); + assertThat(propertiesList.get(6).suffix()).isEqualTo(dltSuffix); + } + private void assertRetryTopic(DestinationTopic.Properties topicProperties, int maxAttempts, Long expectedDelay, String expectedSuffix, boolean expectedReusableTopic) { assertThat(topicProperties.suffix()).isEqualTo(expectedSuffix); From 20865a67ebee5ecd1e985606f296fbcfb87e3dc8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Lima?= Date: Mon, 6 Feb 2023 22:36:33 -0300 Subject: [PATCH 4/9] GH-2496: Restoring DestinationTopic.isSingleTopicRetry() --- .../kafka/retrytopic/DestinationTopic.java | 23 +++++++++++--- .../DestinationTopicPropertiesFactory.java | 6 ++-- ...estinationTopicPropertiesFactoryTests.java | 31 ++++++++++++------- .../retrytopic/DestinationTopicTests.java | 30 +++++++++--------- 4 files changed, 55 insertions(+), 35 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java index f6a6804f49..0a5b118412 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -72,6 +72,12 @@ public boolean isReusableRetryTopic() { return Type.REUSABLE_RETRY_TOPIC.equals(this.properties.type); } + @Deprecated(forRemoval = true) // in 3.1 + public boolean isSingleTopicRetry() { + return ((Type.REUSABLE_RETRY_TOPIC.equals(this.properties.type)) && + (Integer.valueOf(1).equals(this.properties.firstAttemptIndex))); + } + public boolean isMainTopic() { return Type.MAIN.equals(this.properties.type); } @@ -137,6 +143,8 @@ public static class Properties { private final long timeout; + private final Integer firstAttemptIndex; + @Nullable private final Boolean autoStartDltHandler; @@ -152,15 +160,17 @@ public static class Properties { * @param kafkaOperations the {@link KafkaOperations}. * @param shouldRetryOn the exception classifications. * @param timeout the timeout. + * @param firstAttemptIndex the first attempt this topic is used with. */ public Properties(long delayMs, String suffix, Type type, int maxAttempts, int numPartitions, DltStrategy dltStrategy, KafkaOperations kafkaOperations, - BiPredicate shouldRetryOn, long timeout) { + BiPredicate shouldRetryOn, long timeout, + Integer firstAttemptIndex) { this(delayMs, suffix, type, maxAttempts, numPartitions, dltStrategy, kafkaOperations, shouldRetryOn, - timeout, null); + timeout, null, firstAttemptIndex); } /** @@ -173,7 +183,7 @@ public Properties(long delayMs, String suffix, Type type, public Properties(Properties sourceProperties, String suffix, Type type) { this(sourceProperties.delayMs, suffix, type, sourceProperties.maxAttempts, sourceProperties.numPartitions, sourceProperties.dltStrategy, sourceProperties.kafkaOperations, sourceProperties.shouldRetryOn, - sourceProperties.timeout, null); + sourceProperties.timeout, null, sourceProperties.firstAttemptIndex); } /** @@ -188,13 +198,15 @@ public Properties(Properties sourceProperties, String suffix, Type type) { * @param shouldRetryOn the exception classifications. * @param timeout the timeout. * @param autoStartDltHandler whether or not to start the DLT handler. + * @param firstAttemptIndex the first attempt this topic is used with. * @since 2.8 */ public Properties(long delayMs, String suffix, Type type, int maxAttempts, int numPartitions, DltStrategy dltStrategy, KafkaOperations kafkaOperations, - BiPredicate shouldRetryOn, long timeout, @Nullable Boolean autoStartDltHandler) { + BiPredicate shouldRetryOn, long timeout, @Nullable Boolean autoStartDltHandler, + Integer firstAttemptIndex) { this.delayMs = delayMs; this.suffix = suffix; @@ -206,6 +218,7 @@ public Properties(long delayMs, String suffix, Type type, this.shouldRetryOn = shouldRetryOn; this.timeout = timeout; this.autoStartDltHandler = autoStartDltHandler; + this.firstAttemptIndex = firstAttemptIndex; } public boolean isDltTopic() { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java index 47f5103901..5713e1694f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java @@ -184,13 +184,13 @@ private DestinationTopic.Properties createTopicProperties(int index) { private DestinationTopic.Properties createMainTopicProperties() { return new DestinationTopic.Properties(0, MAIN_TOPIC_SUFFIX, DestinationTopic.Type.MAIN, this.maxAttempts, - this.numPartitions, this.dltStrategy, this.kafkaOperations, getShouldRetryOn(), this.timeout); + this.numPartitions, this.dltStrategy, this.kafkaOperations, getShouldRetryOn(), this.timeout, 0); } private DestinationTopic.Properties createDltProperties() { return new DestinationTopic.Properties(0, this.destinationTopicSuffixes.getDltSuffix(), DestinationTopic.Type.DLT, this.maxAttempts, this.numPartitions, this.dltStrategy, - this.kafkaOperations, (a, e) -> false, this.timeout, this.autoStartDltHandler); + this.kafkaOperations, (a, e) -> false, this.timeout, this.autoStartDltHandler, null); } private BiPredicate getShouldRetryOn() { @@ -255,7 +255,7 @@ private DestinationTopic.Properties createProperties(DestinationTopic.Type topic String suffix) { return new DestinationTopic.Properties(this.backOffValues.get(indexInBackoffValues), suffix, topicType, this.maxAttempts, this.numPartitions, this.dltStrategy, - this.kafkaOperations, shouldRetryOn, this.timeout); + this.kafkaOperations, shouldRetryOn, this.timeout, indexInBackoffValues + 1); } private boolean isFixedDelay() { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java index 875c516de7..40a3c0b29c 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java @@ -151,6 +151,7 @@ void shouldCreateTwoRetryPropertiesForMultipleBackoffValues() { assertThat(firstRetryProperties.suffix()).isEqualTo(retryTopicSuffix + "-1000"); assertThat(firstRetryProperties.isDltTopic()).isFalse(); DestinationTopic firstRetryDestinationTopic = destinationTopicList.get(1); + assertThat(firstRetryDestinationTopic.isSingleTopicRetry()).isFalse(); assertThat(firstRetryDestinationTopic.isReusableRetryTopic()).isFalse(); assertThat(firstRetryDestinationTopic.getDestinationDelay()).isEqualTo(1000); assertThat(firstRetryDestinationTopic.getDestinationPartitions()).isEqualTo(numPartitions); @@ -162,6 +163,7 @@ void shouldCreateTwoRetryPropertiesForMultipleBackoffValues() { assertThat(secondRetryProperties.suffix()).isEqualTo(retryTopicSuffix + "-2000"); assertThat(secondRetryProperties.isDltTopic()).isFalse(); DestinationTopic secondRetryDestinationTopic = destinationTopicList.get(2); + assertThat(secondRetryDestinationTopic.isSingleTopicRetry()).isFalse(); assertThat(secondRetryDestinationTopic.isReusableRetryTopic()).isFalse(); assertThat(secondRetryDestinationTopic.getDestinationDelay()).isEqualTo(2000); assertThat(secondRetryDestinationTopic.getDestinationPartitions()).isEqualTo(numPartitions); @@ -224,6 +226,7 @@ void shouldCreateOneRetryPropertyForFixedBackoffWithSingleTopicStrategy() { DestinationTopic.Properties firstRetryProperties = propertiesList.get(1); assertThat(firstRetryProperties.suffix()).isEqualTo(retryTopicSuffix); DestinationTopic retryDestinationTopic = destinationTopicList.get(1); + assertThat(retryDestinationTopic.isSingleTopicRetry()).isTrue(); assertThat(retryDestinationTopic.isReusableRetryTopic()).isTrue(); assertThat(retryDestinationTopic.getDestinationDelay()).isEqualTo(1000); @@ -266,12 +269,14 @@ void shouldCreateRetryPropertiesForFixedBackoffWithMultiTopicStrategy() { DestinationTopic.Properties firstRetryProperties = propertiesList.get(1); assertThat(firstRetryProperties.suffix()).isEqualTo(retryTopicSuffix + "-0"); DestinationTopic retryDestinationTopic = destinationTopicList.get(1); + assertThat(retryDestinationTopic.isSingleTopicRetry()).isFalse(); assertThat(retryDestinationTopic.isReusableRetryTopic()).isFalse(); assertThat(retryDestinationTopic.getDestinationDelay()).isEqualTo(5000); DestinationTopic.Properties secondRetryProperties = propertiesList.get(2); assertThat(secondRetryProperties.suffix()).isEqualTo(retryTopicSuffix + "-1"); DestinationTopic secondRetryDestinationTopic = destinationTopicList.get(2); + assertThat(secondRetryDestinationTopic.isSingleTopicRetry()).isFalse(); assertThat(secondRetryDestinationTopic.isReusableRetryTopic()).isFalse(); assertThat(secondRetryDestinationTopic.getDestinationDelay()).isEqualTo(5000); @@ -380,9 +385,9 @@ void shouldReuseRetryTopicsIfMaxDelayReachedWithDelayValueSuffixingStrategy() { assertThat(factory.retryTopicsAmount()).isEqualTo(3); assertThat(propertiesList.size()).isEqualTo(5); assertThat(propertiesList.get(0).suffix()).isEqualTo(""); - assertRetryTopic(propertiesList.get(1), maxAttempts, 1000L, retryTopicSuffix + "-1000", false); - assertRetryTopic(propertiesList.get(2), maxAttempts, 2000L, retryTopicSuffix + "-2000", false); - assertRetryTopic(propertiesList.get(3), maxAttempts, 3000L, retryTopicSuffix + "-3000", true); + assertRetryTopic(propertiesList.get(1), maxAttempts, 1000L, retryTopicSuffix + "-1000", false, false); + assertRetryTopic(propertiesList.get(2), maxAttempts, 2000L, retryTopicSuffix + "-2000", false, false); + assertRetryTopic(propertiesList.get(3), maxAttempts, 3000L, retryTopicSuffix + "-3000", true, false); assertThat(propertiesList.get(4).suffix()).isEqualTo(dltSuffix); } @@ -409,9 +414,9 @@ void shouldReuseRetryTopicsIfMaxDelayReachedWithIndexValueSuffixingStrategy() { assertThat(factory.retryTopicsAmount()).isEqualTo(3); assertThat(propertiesList.size()).isEqualTo(5); assertThat(propertiesList.get(0).suffix()).isEqualTo(""); - assertRetryTopic(propertiesList.get(1), maxAttempts, 1000L, retryTopicSuffix + "-0", false); - assertRetryTopic(propertiesList.get(2), maxAttempts, 2000L, retryTopicSuffix + "-1", false); - assertRetryTopic(propertiesList.get(3), maxAttempts, 3000L, retryTopicSuffix + "-2", true); + assertRetryTopic(propertiesList.get(1), maxAttempts, 1000L, retryTopicSuffix + "-0", false, false); + assertRetryTopic(propertiesList.get(2), maxAttempts, 2000L, retryTopicSuffix + "-1", false, false); + assertRetryTopic(propertiesList.get(3), maxAttempts, 3000L, retryTopicSuffix + "-2", true, false); assertThat(propertiesList.get(4).suffix()).isEqualTo(dltSuffix); } @@ -434,19 +439,21 @@ void shouldNotReuseRetryTopicsIfRepeatedIntervalsAreInTheMiddleOfChain() { assertThat(factory.retryTopicsAmount()).isEqualTo(5); assertThat(propertiesList.size()).isEqualTo(7); assertThat(propertiesList.get(0).suffix()).isEqualTo(""); - assertRetryTopic(propertiesList.get(1), maxAttempts, 1000L, retryTopicSuffix + "-1000", false); - assertRetryTopic(propertiesList.get(2), maxAttempts, 2000L, retryTopicSuffix + "-2000-0", false); - assertRetryTopic(propertiesList.get(3), maxAttempts, 2000L, retryTopicSuffix + "-2000-1", false); - assertRetryTopic(propertiesList.get(4), maxAttempts, 2000L, retryTopicSuffix + "-2000-2", false); - assertRetryTopic(propertiesList.get(5), maxAttempts, 3000L, retryTopicSuffix + "-3000", false); + assertRetryTopic(propertiesList.get(1), maxAttempts, 1000L, retryTopicSuffix + "-1000", false, false); + assertRetryTopic(propertiesList.get(2), maxAttempts, 2000L, retryTopicSuffix + "-2000-0", false, false); + assertRetryTopic(propertiesList.get(3), maxAttempts, 2000L, retryTopicSuffix + "-2000-1", false, false); + assertRetryTopic(propertiesList.get(4), maxAttempts, 2000L, retryTopicSuffix + "-2000-2", false, false); + assertRetryTopic(propertiesList.get(5), maxAttempts, 3000L, retryTopicSuffix + "-3000", false, false); assertThat(propertiesList.get(6).suffix()).isEqualTo(dltSuffix); } private void assertRetryTopic(DestinationTopic.Properties topicProperties, int maxAttempts, - Long expectedDelay, String expectedSuffix, boolean expectedReusableTopic) { + Long expectedDelay, String expectedSuffix, boolean expectedReusableTopic, + boolean expectedIsSingleTopicRetry) { assertThat(topicProperties.suffix()).isEqualTo(expectedSuffix); DestinationTopic topic = new DestinationTopic("irrelevant" + topicProperties.suffix(), topicProperties); assertThat(topic.isDltTopic()).isFalse(); + assertThat(topic.isSingleTopicRetry()).isEqualTo(expectedIsSingleTopicRetry); assertThat(topic.isReusableRetryTopic()).isEqualTo(expectedReusableTopic); assertThat(topic.getDestinationDelay()).isEqualTo(expectedDelay); assertThat(topic.getDestinationPartitions()).isEqualTo(numPartitions); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java index 287d3ad693..3be26e9d97 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -62,68 +62,68 @@ public class DestinationTopicTests { protected DestinationTopic.Properties mainTopicProps = new DestinationTopic.Properties(0, "", DestinationTopic.Type.MAIN, 4, 1, - DltStrategy.FAIL_ON_ERROR, kafkaOperations1, getShouldRetryOnDenyList(), noTimeout); + DltStrategy.FAIL_ON_ERROR, kafkaOperations1, getShouldRetryOnDenyList(), noTimeout, 0); protected DestinationTopic.Properties firstRetryProps = new DestinationTopic.Properties(1000, retrySuffix + "-1000", DestinationTopic.Type.RETRY, 4, 1, - DltStrategy.FAIL_ON_ERROR, kafkaOperations1, getShouldRetryOnDenyList(), noTimeout); + DltStrategy.FAIL_ON_ERROR, kafkaOperations1, getShouldRetryOnDenyList(), noTimeout, 1); protected DestinationTopic.Properties secondRetryProps = new DestinationTopic.Properties(2000, retrySuffix + "-2000", DestinationTopic.Type.RETRY, 4, 1, - DltStrategy.FAIL_ON_ERROR, kafkaOperations1, getShouldRetryOnDenyList(), noTimeout); + DltStrategy.FAIL_ON_ERROR, kafkaOperations1, getShouldRetryOnDenyList(), noTimeout, 2); protected DestinationTopic.Properties dltTopicProps = new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1, - DltStrategy.FAIL_ON_ERROR, kafkaOperations1, (a, e) -> false, noTimeout); + DltStrategy.FAIL_ON_ERROR, kafkaOperations1, (a, e) -> false, noTimeout, null); protected List allProps = Arrays .asList(mainTopicProps, firstRetryProps, secondRetryProps, dltTopicProps); protected DestinationTopic.Properties mainTopicProps2 = new DestinationTopic.Properties(0, "", DestinationTopic.Type.MAIN, 4, 1, - DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout); + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout, 0); protected DestinationTopic.Properties firstRetryProps2 = new DestinationTopic.Properties(1000, retrySuffix + "-0", DestinationTopic.Type.RETRY, 4, 1, - DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout); + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout, 1); protected DestinationTopic.Properties secondRetryProps2 = new DestinationTopic.Properties(1000, retrySuffix + "-1", DestinationTopic.Type.RETRY, 4, 1, - DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout); + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout, 2); protected DestinationTopic.Properties dltTopicProps2 = new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1, - DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout); + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null); protected List allProps2 = Arrays .asList(mainTopicProps2, firstRetryProps2, secondRetryProps2, dltTopicProps2); protected DestinationTopic.Properties mainTopicProps3 = new DestinationTopic.Properties(0, "", DestinationTopic.Type.MAIN, 4, 1, - DltStrategy.NO_DLT, kafkaOperations2, getShouldRetryOn(), timeout); + DltStrategy.NO_DLT, kafkaOperations2, getShouldRetryOn(), timeout, 0); protected DestinationTopic.Properties firstRetryProps3 = new DestinationTopic.Properties(1000, retrySuffix + "-0", DestinationTopic.Type.RETRY, 4, 1, - DltStrategy.NO_DLT, kafkaOperations2, getShouldRetryOn(), timeout); + DltStrategy.NO_DLT, kafkaOperations2, getShouldRetryOn(), timeout, 1); protected DestinationTopic.Properties secondRetryProps3 = new DestinationTopic.Properties(1000, retrySuffix + "-1", DestinationTopic.Type.RETRY, 4, 1, - DltStrategy.NO_DLT, kafkaOperations2, getShouldRetryOn(), timeout); + DltStrategy.NO_DLT, kafkaOperations2, getShouldRetryOn(), timeout, 2); protected List allProps3 = Arrays .asList(mainTopicProps3, firstRetryProps3, secondRetryProps3); protected DestinationTopic.Properties mainTopicProps4 = new DestinationTopic.Properties(0, "", DestinationTopic.Type.MAIN, 4, 1, - DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout); + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout, 0); protected DestinationTopic.Properties singleFixedRetryTopicProps4 = new DestinationTopic.Properties(1000, retrySuffix, DestinationTopic.Type.REUSABLE_RETRY_TOPIC, 4, 1, - DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout); + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout, 1); protected DestinationTopic.Properties dltTopicProps4 = new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1, - DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout); + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null); protected List allProps4 = Arrays .asList(mainTopicProps4, singleFixedRetryTopicProps4, dltTopicProps4); From d466fe5854e3bbb9cd9b4d402c80df6a5dc61790 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Lima?= Date: Mon, 27 Feb 2023 22:54:57 -0300 Subject: [PATCH 5/9] GH-2496: Restoring DestinationTopic.Type.SINGLE_TOPIC_RETRY. --- .../DefaultDestinationTopicResolver.java | 3 +- .../kafka/retrytopic/DestinationTopic.java | 28 ++++----- .../DestinationTopicPropertiesFactory.java | 14 +++-- .../DefaultDestinationTopicResolverTests.java | 15 ++++- ...estinationTopicPropertiesFactoryTests.java | 2 +- .../retrytopic/DestinationTopicTests.java | 57 ++++++++++++------- 6 files changed, 76 insertions(+), 43 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java index 031aa34987..f1ca235a95 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java @@ -134,7 +134,8 @@ && isNotFatalException(e) } private DestinationTopic resolveRetryDestination(DestinationTopicHolder destinationTopicHolder) { - return destinationTopicHolder.getSourceDestination().isReusableRetryTopic() + return ((destinationTopicHolder.getSourceDestination().isReusableRetryTopic()) || + (destinationTopicHolder.getSourceDestination().isSingleTopicRetry())) ? destinationTopicHolder.getSourceDestination() : destinationTopicHolder.getNextDestination(); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java index 0a5b118412..85250734c1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java @@ -74,8 +74,7 @@ public boolean isReusableRetryTopic() { @Deprecated(forRemoval = true) // in 3.1 public boolean isSingleTopicRetry() { - return ((Type.REUSABLE_RETRY_TOPIC.equals(this.properties.type)) && - (Integer.valueOf(1).equals(this.properties.firstAttemptIndex))); + return Type.SINGLE_TOPIC_RETRY.equals(this.properties.type); } public boolean isMainTopic() { @@ -143,8 +142,6 @@ public static class Properties { private final long timeout; - private final Integer firstAttemptIndex; - @Nullable private final Boolean autoStartDltHandler; @@ -160,17 +157,15 @@ public static class Properties { * @param kafkaOperations the {@link KafkaOperations}. * @param shouldRetryOn the exception classifications. * @param timeout the timeout. - * @param firstAttemptIndex the first attempt this topic is used with. */ public Properties(long delayMs, String suffix, Type type, int maxAttempts, int numPartitions, DltStrategy dltStrategy, KafkaOperations kafkaOperations, - BiPredicate shouldRetryOn, long timeout, - Integer firstAttemptIndex) { + BiPredicate shouldRetryOn, long timeout) { this(delayMs, suffix, type, maxAttempts, numPartitions, dltStrategy, kafkaOperations, shouldRetryOn, - timeout, null, firstAttemptIndex); + timeout, null); } /** @@ -183,7 +178,7 @@ public Properties(long delayMs, String suffix, Type type, public Properties(Properties sourceProperties, String suffix, Type type) { this(sourceProperties.delayMs, suffix, type, sourceProperties.maxAttempts, sourceProperties.numPartitions, sourceProperties.dltStrategy, sourceProperties.kafkaOperations, sourceProperties.shouldRetryOn, - sourceProperties.timeout, null, sourceProperties.firstAttemptIndex); + sourceProperties.timeout, null); } /** @@ -198,15 +193,13 @@ public Properties(Properties sourceProperties, String suffix, Type type) { * @param shouldRetryOn the exception classifications. * @param timeout the timeout. * @param autoStartDltHandler whether or not to start the DLT handler. - * @param firstAttemptIndex the first attempt this topic is used with. * @since 2.8 */ public Properties(long delayMs, String suffix, Type type, int maxAttempts, int numPartitions, DltStrategy dltStrategy, KafkaOperations kafkaOperations, - BiPredicate shouldRetryOn, long timeout, @Nullable Boolean autoStartDltHandler, - Integer firstAttemptIndex) { + BiPredicate shouldRetryOn, long timeout, @Nullable Boolean autoStartDltHandler) { this.delayMs = delayMs; this.suffix = suffix; @@ -218,7 +211,6 @@ public Properties(long delayMs, String suffix, Type type, this.shouldRetryOn = shouldRetryOn; this.timeout = timeout; this.autoStartDltHandler = autoStartDltHandler; - this.firstAttemptIndex = firstAttemptIndex; } public boolean isDltTopic() { @@ -293,6 +285,14 @@ public boolean isMainEndpoint() { } enum Type { - MAIN, RETRY, REUSABLE_RETRY_TOPIC, DLT, NO_OPS + MAIN, RETRY, + /** + * A single retry topic for all retries. + * + * @deprecated Use {@code REUSABLE_RETRY_TOPIC} instead. + */ + @Deprecated + SINGLE_TOPIC_RETRY, + REUSABLE_RETRY_TOPIC, DLT, NO_OPS } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java index 5713e1694f..da5760cced 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java @@ -184,13 +184,13 @@ private DestinationTopic.Properties createTopicProperties(int index) { private DestinationTopic.Properties createMainTopicProperties() { return new DestinationTopic.Properties(0, MAIN_TOPIC_SUFFIX, DestinationTopic.Type.MAIN, this.maxAttempts, - this.numPartitions, this.dltStrategy, this.kafkaOperations, getShouldRetryOn(), this.timeout, 0); + this.numPartitions, this.dltStrategy, this.kafkaOperations, getShouldRetryOn(), this.timeout); } private DestinationTopic.Properties createDltProperties() { return new DestinationTopic.Properties(0, this.destinationTopicSuffixes.getDltSuffix(), DestinationTopic.Type.DLT, this.maxAttempts, this.numPartitions, this.dltStrategy, - this.kafkaOperations, (a, e) -> false, this.timeout, this.autoStartDltHandler, null); + this.kafkaOperations, (a, e) -> false, this.timeout, this.autoStartDltHandler); } private BiPredicate getShouldRetryOn() { @@ -201,9 +201,11 @@ private DestinationTopic.Properties createRetryProperties(int index, BiPredicate shouldRetryOn) { int indexInBackoffValues = index - 1; Long thisBackOffValue = this.backOffValues.get(indexInBackoffValues); - DestinationTopic.Type topicTypeToUse = isDelayWithReusedTopic(thisBackOffValue) - ? Type.REUSABLE_RETRY_TOPIC - : Type.RETRY; + DestinationTopic.Type topicTypeToUse = isSingleTopicFixedDelay() + ? Type.SINGLE_TOPIC_RETRY + : isDelayWithReusedTopic(thisBackOffValue) + ? Type.REUSABLE_RETRY_TOPIC + : Type.RETRY; return createProperties(topicTypeToUse, shouldRetryOn, indexInBackoffValues, getTopicSuffix(indexInBackoffValues, thisBackOffValue)); } @@ -255,7 +257,7 @@ private DestinationTopic.Properties createProperties(DestinationTopic.Type topic String suffix) { return new DestinationTopic.Properties(this.backOffValues.get(indexInBackoffValues), suffix, topicType, this.maxAttempts, this.numPartitions, this.dltStrategy, - this.kafkaOperations, shouldRetryOn, this.timeout, indexInBackoffValues + 1); + this.kafkaOperations, shouldRetryOn, this.timeout); } private boolean isFixedDelay() { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java index 71366a03d3..ea1fa3334b 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java @@ -70,6 +70,7 @@ public void setup() { defaultDestinationTopicContainer.addDestinationTopics("id", allSecondDestinationTopics); defaultDestinationTopicContainer.addDestinationTopics("id", allThirdDestinationTopics); defaultDestinationTopicContainer.addDestinationTopics("id", allFourthDestinationTopics); + defaultDestinationTopicContainer.addDestinationTopics("id", allFifthDestinationTopics); } @@ -112,6 +113,18 @@ void shouldResolveRetryDestination() { assertThat(defaultDestinationTopicContainer .resolveDestinationTopic("id", singleFixedRetryDestinationTopic4.getDestinationName(), maxAttempts, new IllegalArgumentException(), this.originalTimestamp)).isEqualTo(dltDestinationTopic4); + + assertThat(defaultDestinationTopicContainer + .resolveDestinationTopic("id", mainDestinationTopic5.getDestinationName(), 1, + new IllegalArgumentException(), this.originalTimestamp)).isEqualTo(reusableRetryDestinationTopic5); + + assertThat(defaultDestinationTopicContainer + .resolveDestinationTopic("id", reusableRetryDestinationTopic5.getDestinationName(), maxAttempts - 1, + new IllegalArgumentException(), this.originalTimestamp)).isEqualTo(reusableRetryDestinationTopic5); + + assertThat(defaultDestinationTopicContainer + .resolveDestinationTopic("id", reusableRetryDestinationTopic5.getDestinationName(), maxAttempts, + new IllegalArgumentException(), this.originalTimestamp)).isEqualTo(dltDestinationTopic5); } @Test @@ -212,7 +225,7 @@ void shouldThrowIfMultipleReusableRetryTopicsAdded() { destinationResolver.addDestinationTopics("id", allFirstDestinationsTopics); List destinationTopics = Arrays - .asList(mainDestinationTopic4, singleFixedRetryDestinationTopic4, singleFixedRetryDestinationTopic4, dltDestinationTopic4); + .asList(mainDestinationTopic5, reusableRetryDestinationTopic5, reusableRetryDestinationTopic5, dltDestinationTopic5); assertThatIllegalArgumentException().isThrownBy( () -> destinationResolver.addDestinationTopics("id", destinationTopics)) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java index 40a3c0b29c..7effe3ccb4 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java @@ -227,7 +227,7 @@ void shouldCreateOneRetryPropertyForFixedBackoffWithSingleTopicStrategy() { assertThat(firstRetryProperties.suffix()).isEqualTo(retryTopicSuffix); DestinationTopic retryDestinationTopic = destinationTopicList.get(1); assertThat(retryDestinationTopic.isSingleTopicRetry()).isTrue(); - assertThat(retryDestinationTopic.isReusableRetryTopic()).isTrue(); + assertThat(retryDestinationTopic.isReusableRetryTopic()).isFalse(); assertThat(retryDestinationTopic.getDestinationDelay()).isEqualTo(1000); DestinationTopic.Properties dltProperties = propertiesList.get(2); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java index 3be26e9d97..7c6978af3b 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java @@ -62,15 +62,15 @@ public class DestinationTopicTests { protected DestinationTopic.Properties mainTopicProps = new DestinationTopic.Properties(0, "", DestinationTopic.Type.MAIN, 4, 1, - DltStrategy.FAIL_ON_ERROR, kafkaOperations1, getShouldRetryOnDenyList(), noTimeout, 0); + DltStrategy.FAIL_ON_ERROR, kafkaOperations1, getShouldRetryOnDenyList(), noTimeout); protected DestinationTopic.Properties firstRetryProps = new DestinationTopic.Properties(1000, retrySuffix + "-1000", DestinationTopic.Type.RETRY, 4, 1, - DltStrategy.FAIL_ON_ERROR, kafkaOperations1, getShouldRetryOnDenyList(), noTimeout, 1); + DltStrategy.FAIL_ON_ERROR, kafkaOperations1, getShouldRetryOnDenyList(), noTimeout); protected DestinationTopic.Properties secondRetryProps = new DestinationTopic.Properties(2000, retrySuffix + "-2000", DestinationTopic.Type.RETRY, 4, 1, - DltStrategy.FAIL_ON_ERROR, kafkaOperations1, getShouldRetryOnDenyList(), noTimeout, 2); + DltStrategy.FAIL_ON_ERROR, kafkaOperations1, getShouldRetryOnDenyList(), noTimeout); protected DestinationTopic.Properties dltTopicProps = new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1, @@ -81,15 +81,15 @@ public class DestinationTopicTests { protected DestinationTopic.Properties mainTopicProps2 = new DestinationTopic.Properties(0, "", DestinationTopic.Type.MAIN, 4, 1, - DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout, 0); + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout); protected DestinationTopic.Properties firstRetryProps2 = new DestinationTopic.Properties(1000, retrySuffix + "-0", DestinationTopic.Type.RETRY, 4, 1, - DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout, 1); + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout); protected DestinationTopic.Properties secondRetryProps2 = new DestinationTopic.Properties(1000, retrySuffix + "-1", DestinationTopic.Type.RETRY, 4, 1, - DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout, 2); + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout); protected DestinationTopic.Properties dltTopicProps2 = new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1, @@ -100,33 +100,42 @@ public class DestinationTopicTests { protected DestinationTopic.Properties mainTopicProps3 = new DestinationTopic.Properties(0, "", DestinationTopic.Type.MAIN, 4, 1, - DltStrategy.NO_DLT, kafkaOperations2, getShouldRetryOn(), timeout, 0); + DltStrategy.NO_DLT, kafkaOperations2, getShouldRetryOn(), timeout); protected DestinationTopic.Properties firstRetryProps3 = new DestinationTopic.Properties(1000, retrySuffix + "-0", DestinationTopic.Type.RETRY, 4, 1, - DltStrategy.NO_DLT, kafkaOperations2, getShouldRetryOn(), timeout, 1); + DltStrategy.NO_DLT, kafkaOperations2, getShouldRetryOn(), timeout); protected DestinationTopic.Properties secondRetryProps3 = new DestinationTopic.Properties(1000, retrySuffix + "-1", DestinationTopic.Type.RETRY, 4, 1, - DltStrategy.NO_DLT, kafkaOperations2, getShouldRetryOn(), timeout, 2); + DltStrategy.NO_DLT, kafkaOperations2, getShouldRetryOn(), timeout); protected List allProps3 = Arrays .asList(mainTopicProps3, firstRetryProps3, secondRetryProps3); protected DestinationTopic.Properties mainTopicProps4 = new DestinationTopic.Properties(0, "", DestinationTopic.Type.MAIN, 4, 1, - DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout, 0); + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout); protected DestinationTopic.Properties singleFixedRetryTopicProps4 = - new DestinationTopic.Properties(1000, retrySuffix, DestinationTopic.Type.REUSABLE_RETRY_TOPIC, 4, 1, - DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout, 1); + new DestinationTopic.Properties(1000, retrySuffix, DestinationTopic.Type.SINGLE_TOPIC_RETRY, 4, 1, + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout); protected DestinationTopic.Properties dltTopicProps4 = new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1, DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null); - protected List allProps4 = Arrays - .asList(mainTopicProps4, singleFixedRetryTopicProps4, dltTopicProps4); + protected DestinationTopic.Properties mainTopicProps5 = + new DestinationTopic.Properties(0, "", DestinationTopic.Type.MAIN, 4, 1, + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout); + + protected DestinationTopic.Properties reusableRetryTopicProps5 = + new DestinationTopic.Properties(1000, retrySuffix, DestinationTopic.Type.REUSABLE_RETRY_TOPIC, 4, 1, + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout); + + protected DestinationTopic.Properties dltTopicProps5 = + new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1, + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null); // Holders @@ -179,15 +188,9 @@ public class DestinationTopicTests { protected PropsHolder mainDestinationHolder4 = new PropsHolder(FOURTH_TOPIC, mainTopicProps4); - protected PropsHolder singleFixedRetryDestinationHolder4 = - new PropsHolder(FOURTH_TOPIC, singleFixedRetryTopicProps4); - protected PropsHolder dltDestinationHolder4 = new PropsHolder(FOURTH_TOPIC, dltTopicProps4); - protected List allFourthDestinationHolders = Arrays - .asList(mainDestinationHolder4, singleFixedRetryDestinationHolder4, dltDestinationHolder4); - // DestinationTopics protected DestinationTopic mainDestinationTopic = @@ -256,6 +259,20 @@ public class DestinationTopicTests { protected List allFourthDestinationTopics = Arrays .asList(mainDestinationTopic4, singleFixedRetryDestinationTopic4, dltDestinationTopic4); + protected final static String FIFTH_TOPIC = "fifthTopic"; + + protected DestinationTopic mainDestinationTopic5 = + new DestinationTopic(FIFTH_TOPIC + mainTopicProps5.suffix(), mainTopicProps5); + + protected DestinationTopic reusableRetryDestinationTopic5 = + new DestinationTopic(FIFTH_TOPIC + reusableRetryTopicProps5.suffix(), reusableRetryTopicProps5); + + protected DestinationTopic dltDestinationTopic5 = + new DestinationTopic(FIFTH_TOPIC + dltTopicProps5.suffix(), dltTopicProps5); + + protected List allFifthDestinationTopics = Arrays + .asList(mainDestinationTopic5, reusableRetryDestinationTopic5, dltDestinationTopic5); + // Classifiers private final BinaryExceptionClassifier allowListClassifier = new BinaryExceptionClassifierBuilder() From f7ff884881c9ce11883bbb8f071da15975be3c44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Lima?= Date: Mon, 27 Feb 2023 23:46:40 -0300 Subject: [PATCH 6/9] GH-2496: Removal of "forRemoval" for FixedDelayStrategy --- .../kafka/annotation/RetryableTopic.java | 9 +++- .../kafka/retrytopic/DestinationTopic.java | 9 +++- .../DestinationTopicPropertiesFactory.java | 2 +- .../kafka/retrytopic/FixedDelayStrategy.java | 4 +- .../RetryTopicConfigurationBuilder.java | 14 +++++-- ...estinationTopicPropertiesFactoryTests.java | 42 +++++++++++++++++++ 6 files changed, 70 insertions(+), 10 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java index 7f02a013c7..c0952af5a6 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java @@ -39,6 +39,7 @@ * @author Tomaz Fernandes * @author Gary Russell * @author Fabio da Silva Jr. + * @author João Lima * @since 2.7 * * @see org.springframework.kafka.retrytopic.RetryTopicConfigurer @@ -181,6 +182,10 @@ /** * Topic reuse strategy for sequential attempts made with a same backoff interval. + * + *

Note: for fixed backoffs, when this is configured as + * {@link SameIntervalTopicReuseStrategy#SINGLE_TOPIC}, it has precedence over + * the configuration in {@link #fixedDelayTopicStrategy()}. * @return the strategy. */ SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy() default SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS; @@ -194,9 +199,9 @@ /** * Whether to use a single or multiple topics when using a fixed delay. * @return the fixed delay strategy. - * @deprecated in a future release, will be replaced by {@link #sameIntervalTopicReuseStrategy()}. + * @deprecated in favor of {@link #sameIntervalTopicReuseStrategy()}. */ - @Deprecated(forRemoval = true) // in 3.1 + @Deprecated FixedDelayStrategy fixedDelayTopicStrategy() default FixedDelayStrategy.MULTIPLE_TOPICS; /** diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java index 85250734c1..d61ab02d54 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java @@ -72,7 +72,14 @@ public boolean isReusableRetryTopic() { return Type.REUSABLE_RETRY_TOPIC.equals(this.properties.type); } - @Deprecated(forRemoval = true) // in 3.1 + /** + * Whether this is a single retry topic. + * + * @return whether this is a single retry topic. + * @deprecated in favor of using {@link DestinationTopic.Type#REUSABLE_RETRY_TOPIC} + * and {@link #isReusableRetryTopic()}. + */ + @Deprecated public boolean isSingleTopicRetry() { return Type.SINGLE_TOPIC_RETRY.equals(this.properties.type); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java index da5760cced..8981ce748e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java @@ -129,7 +129,7 @@ private List createPropertiesForFixedDelaySingleTop } private boolean isSingleTopicFixedDelay() { - return isFixedDelay() && isSingleTopicStrategy(); + return isFixedDelay() && (isSingleTopicStrategy() || isSingleTopicSameIntervalTopicReuseStrategy()); } private boolean isSingleTopicStrategy() { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/FixedDelayStrategy.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/FixedDelayStrategy.java index 6f779249e5..e256ad1a5b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/FixedDelayStrategy.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/FixedDelayStrategy.java @@ -22,10 +22,10 @@ * * @author Tomaz Fernandes * @since 2.7 - * @deprecated in a future release, will be replaced by {@link SameIntervalTopicReuseStrategy}. + * @deprecated in favor of {@link SameIntervalTopicReuseStrategy}. * */ -@Deprecated(forRemoval = true) // in 3.1 +@Deprecated public enum FixedDelayStrategy { /** diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java index d1557d0c41..fd9e094ccf 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java @@ -245,6 +245,11 @@ public RetryTopicConfigurationBuilder setTopicSuffixingStrategy(TopicSuffixingSt /** * Configure the {@link SameIntervalTopicReuseStrategy}. + * + *

Note: for fixed backoffs, when this is configured as + * {@link SameIntervalTopicReuseStrategy#SINGLE_TOPIC}, it has precedence over + * the configuration done through + * {@link #useSingleTopicForFixedDelays(FixedDelayStrategy)}. * @param sameIntervalTopicReuseStrategy the strategy. * @return the builder. */ @@ -404,10 +409,10 @@ public RetryTopicConfigurationBuilder fixedBackOff(int interval) { /** * Configure the use of a single retry topic with fixed delays. * @return the builder. - * @deprecated in a future release, configuration for single retry topic with fixed delays will have to be done with {@link #useSingleTopicForSameIntervals()}. + * @deprecated in favor of {@link #useSingleTopicForSameIntervals()}. * @see FixedDelayStrategy#SINGLE_TOPIC */ - @Deprecated(forRemoval = true) // in 3.1 + @Deprecated public RetryTopicConfigurationBuilder useSingleTopicForFixedDelays() { this.fixedDelayStrategy = FixedDelayStrategy.SINGLE_TOPIC; return this; @@ -418,9 +423,10 @@ public RetryTopicConfigurationBuilder useSingleTopicForFixedDelays() { * {@link FixedDelayStrategy#MULTIPLE_TOPICS}. * @param delayStrategy the delay strategy. * @return the builder. - * @deprecated in a future release, retry topic reuse configuration for fixed delays will have to be done with {@link #sameIntervalTopicReuseStrategy(SameIntervalTopicReuseStrategy)}. + * @deprecated in favor of + * {@link #sameIntervalTopicReuseStrategy(SameIntervalTopicReuseStrategy)}. */ - @Deprecated(forRemoval = true) // in 3.1 + @Deprecated public RetryTopicConfigurationBuilder useSingleTopicForFixedDelays(FixedDelayStrategy delayStrategy) { this.fixedDelayStrategy = delayStrategy; return this; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java index 7effe3ccb4..7f4846c644 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java @@ -238,6 +238,48 @@ void shouldCreateOneRetryPropertyForFixedBackoffWithSingleTopicStrategy() { assertThat(dltTopic.getDestinationPartitions()).isEqualTo(numPartitions); } + @Test + void shouldCreateOneRetryPropertyForFixedBackoffWithSingleTopicSameIntervalReuseStrategy() { + + // when + FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy(); + backOffPolicy.setBackOffPeriod(1000); + int maxAttempts = 5; + + List backOffValues = new BackOffValuesGenerator(maxAttempts, backOffPolicy).generateValues(); + + List propertiesList = + new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, + classifier, numPartitions, kafkaOperations, FixedDelayStrategy.MULTIPLE_TOPICS, + dltStrategy, suffixWithDelayValueSuffixingStrategy, singleTopicSameIntervalReuseStrategy, + -1).createProperties(); + + List destinationTopicList = propertiesList + .stream() + .map(properties -> new DestinationTopic("mainTopic" + properties.suffix(), properties)) + .collect(Collectors.toList()); + + // then + assertThat(propertiesList.size() == 3).isTrue(); + + DestinationTopic mainDestinationTopic = destinationTopicList.get(0); + assertThat(mainDestinationTopic.isMainTopic()).isTrue(); + + DestinationTopic.Properties firstRetryProperties = propertiesList.get(1); + assertThat(firstRetryProperties.suffix()).isEqualTo(retryTopicSuffix); + DestinationTopic retryDestinationTopic = destinationTopicList.get(1); + assertThat(retryDestinationTopic.isSingleTopicRetry()).isTrue(); + assertThat(retryDestinationTopic.isReusableRetryTopic()).isFalse(); + assertThat(retryDestinationTopic.getDestinationDelay()).isEqualTo(1000); + + DestinationTopic.Properties dltProperties = propertiesList.get(2); + assertThat(dltProperties.suffix()).isEqualTo(dltSuffix); + assertThat(dltProperties.isDltTopic()).isTrue(); + DestinationTopic dltTopic = destinationTopicList.get(2); + assertThat(dltTopic.getDestinationDelay()).isEqualTo(0); + assertThat(dltTopic.getDestinationPartitions()).isEqualTo(numPartitions); + } + @Test void shouldCreateRetryPropertiesForFixedBackoffWithMultiTopicStrategy() { From 6cd53ef465cf476bb413daff19940696d51dfd28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Lima?= Date: Tue, 28 Feb 2023 00:06:25 -0300 Subject: [PATCH 7/9] GH-2496: Supressing deprecation warnings --- .../kafka/annotation/RetryableTopic.java | 3 +-- .../RetryableTopicAnnotationProcessor.java | 3 ++- .../retrytopic/DefaultDestinationTopicResolver.java | 3 ++- .../DestinationTopicPropertiesFactory.java | 5 +++++ .../retrytopic/RetryTopicConfigurationBuilder.java | 1 + .../DestinationTopicPropertiesFactoryTests.java | 12 ++++++++++++ .../kafka/retrytopic/DestinationTopicTests.java | 1 + .../RetryTopicExceptionRoutingIntegrationTests.java | 4 +++- .../kafka/retrytopic/RetryTopicIntegrationTests.java | 3 ++- 9 files changed, 29 insertions(+), 6 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java index c0952af5a6..92c27c7d32 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java @@ -23,7 +23,6 @@ import java.lang.annotation.Target; import org.springframework.kafka.retrytopic.DltStrategy; -import org.springframework.kafka.retrytopic.FixedDelayStrategy; import org.springframework.kafka.retrytopic.RetryTopicConstants; import org.springframework.kafka.retrytopic.SameIntervalTopicReuseStrategy; import org.springframework.kafka.retrytopic.TopicSuffixingStrategy; @@ -202,7 +201,7 @@ * @deprecated in favor of {@link #sameIntervalTopicReuseStrategy()}. */ @Deprecated - FixedDelayStrategy fixedDelayTopicStrategy() default FixedDelayStrategy.MULTIPLE_TOPICS; + org.springframework.kafka.retrytopic.FixedDelayStrategy fixedDelayTopicStrategy() default org.springframework.kafka.retrytopic.FixedDelayStrategy.MULTIPLE_TOPICS; /** * Override the container factory's {@code autoStartup} property for just the DLT container. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java index 9214af535e..d10032c484 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -101,6 +101,7 @@ public RetryableTopicAnnotationProcessor(BeanFactory beanFactory, BeanExpression this.expressionContext = expressionContext; } + @SuppressWarnings("deprecation") public RetryTopicConfiguration processAnnotation(String[] topics, Method method, RetryableTopic annotation, Object bean) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java index f1ca235a95..5786bc658b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -133,6 +133,7 @@ && isNotFatalException(e) : destinationTopicHolder.getNextDestination(); } + @SuppressWarnings("deprecation") private DestinationTopic resolveRetryDestination(DestinationTopicHolder destinationTopicHolder) { return ((destinationTopicHolder.getSourceDestination().isReusableRetryTopic()) || (destinationTopicHolder.getSourceDestination().isSingleTopicRetry())) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java index 8981ce748e..f03a0b8f8e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java @@ -55,6 +55,7 @@ public class DestinationTopicPropertiesFactory { private final KafkaOperations kafkaOperations; + @SuppressWarnings("deprecation") private final FixedDelayStrategy fixedDelayStrategy; private final DltStrategy dltStrategy; @@ -68,6 +69,7 @@ public class DestinationTopicPropertiesFactory { @Nullable private Boolean autoStartDltHandler; + @SuppressWarnings("deprecation") public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuffix, List backOffValues, BinaryExceptionClassifier exceptionClassifier, int numPartitions, KafkaOperations kafkaOperations, @@ -91,6 +93,7 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff this.maxAttempts = this.backOffValues.size() + 1; } + @SuppressWarnings("deprecation") public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuffix, List backOffValues, BinaryExceptionClassifier exceptionClassifier, int numPartitions, KafkaOperations kafkaOperations, @@ -132,6 +135,7 @@ private boolean isSingleTopicFixedDelay() { return isFixedDelay() && (isSingleTopicStrategy() || isSingleTopicSameIntervalTopicReuseStrategy()); } + @SuppressWarnings("deprecation") private boolean isSingleTopicStrategy() { return FixedDelayStrategy.SINGLE_TOPIC.equals(this.fixedDelayStrategy); } @@ -197,6 +201,7 @@ private BiPredicate getShouldRetryOn() { return (attempt, throwable) -> attempt < this.maxAttempts && this.exceptionClassifier.classify(throwable); } + @SuppressWarnings("deprecation") private DestinationTopic.Properties createRetryProperties(int index, BiPredicate shouldRetryOn) { int indexInBackoffValues = index - 1; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java index fd9e094ccf..6c5fb5c4ff 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java @@ -71,6 +71,7 @@ public class RetryTopicConfigurationBuilder { @Nullable private BinaryExceptionClassifierBuilder classifierBuilder; + @SuppressWarnings("deprecation") private FixedDelayStrategy fixedDelayStrategy = FixedDelayStrategy.MULTIPLE_TOPICS; private DltStrategy dltStrategy = DltStrategy.ALWAYS_RETRY_ON_ERROR; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java index 7f4846c644..620f267f2a 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java @@ -51,6 +51,7 @@ class DestinationTopicPropertiesFactoryTests { private final int numPartitions = 0; + @SuppressWarnings("deprecation") private final FixedDelayStrategy fixedDelayStrategy = FixedDelayStrategy.SINGLE_TOPIC; @@ -125,6 +126,7 @@ private void assertDltTopic(DestinationTopic.Properties dltProperties) { } @Test + @SuppressWarnings("deprecation") void shouldCreateTwoRetryPropertiesForMultipleBackoffValues() { // when ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); @@ -197,6 +199,7 @@ void shouldNotCreateDltProperties() { } @Test + @SuppressWarnings("deprecation") void shouldCreateOneRetryPropertyForFixedBackoffWithSingleTopicStrategy() { // when @@ -239,6 +242,7 @@ void shouldCreateOneRetryPropertyForFixedBackoffWithSingleTopicStrategy() { } @Test + @SuppressWarnings("deprecation") void shouldCreateOneRetryPropertyForFixedBackoffWithSingleTopicSameIntervalReuseStrategy() { // when @@ -281,6 +285,7 @@ void shouldCreateOneRetryPropertyForFixedBackoffWithSingleTopicSameIntervalReuse } @Test + @SuppressWarnings("deprecation") void shouldCreateRetryPropertiesForFixedBackoffWithMultiTopicStrategy() { // when @@ -331,6 +336,7 @@ void shouldCreateRetryPropertiesForFixedBackoffWithMultiTopicStrategy() { } @Test + @SuppressWarnings("deprecation") void shouldSuffixRetryTopicsWithIndexIfSuffixWithIndexStrategy() { // setup @@ -352,6 +358,7 @@ void shouldSuffixRetryTopicsWithIndexIfSuffixWithIndexStrategy() { } @Test + @SuppressWarnings("deprecation") void shouldSuffixRetryTopicsWithIndexIfFixedDelayWithMultipleTopics() { // setup @@ -375,6 +382,7 @@ void shouldSuffixRetryTopicsWithIndexIfFixedDelayWithMultipleTopics() { } @Test + @SuppressWarnings("deprecation") void shouldSuffixRetryTopicsWithMixedIfMaxDelayReached() { // setup @@ -405,6 +413,7 @@ void shouldSuffixRetryTopicsWithMixedIfMaxDelayReached() { } @Test + @SuppressWarnings("deprecation") void shouldReuseRetryTopicsIfMaxDelayReachedWithDelayValueSuffixingStrategy() { // setup @@ -434,6 +443,7 @@ void shouldReuseRetryTopicsIfMaxDelayReachedWithDelayValueSuffixingStrategy() { } @Test + @SuppressWarnings("deprecation") void shouldReuseRetryTopicsIfMaxDelayReachedWithIndexValueSuffixingStrategy() { // setup @@ -463,6 +473,7 @@ void shouldReuseRetryTopicsIfMaxDelayReachedWithIndexValueSuffixingStrategy() { } @Test + @SuppressWarnings("deprecation") void shouldNotReuseRetryTopicsIfRepeatedIntervalsAreInTheMiddleOfChain() { // setup @@ -489,6 +500,7 @@ void shouldNotReuseRetryTopicsIfRepeatedIntervalsAreInTheMiddleOfChain() { assertThat(propertiesList.get(6).suffix()).isEqualTo(dltSuffix); } + @SuppressWarnings("deprecation") private void assertRetryTopic(DestinationTopic.Properties topicProperties, int maxAttempts, Long expectedDelay, String expectedSuffix, boolean expectedReusableTopic, boolean expectedIsSingleTopicRetry) { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java index 7c6978af3b..8cad3c7ace 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java @@ -117,6 +117,7 @@ public class DestinationTopicTests { new DestinationTopic.Properties(0, "", DestinationTopic.Type.MAIN, 4, 1, DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout); + @SuppressWarnings("deprecation") protected DestinationTopic.Properties singleFixedRetryTopicProps4 = new DestinationTopic.Properties(1000, retrySuffix, DestinationTopic.Type.SINGLE_TOPIC_RETRY, 4, 1, DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java index 36c8f39635..425dfbd2c5 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2022 the original author or authors. + * Copyright 2021-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -247,6 +247,7 @@ static class FrameworkFatalTopicListener { @Autowired CountDownLatchContainer container; + @SuppressWarnings("deprecation") @RetryableTopic(fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC, backoff = @Backoff(50)) @KafkaListener(topics = FRAMEWORK_FATAL_EXCEPTION_TOPIC) public void listenWithAnnotation(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { @@ -330,6 +331,7 @@ public RetryTopicConfiguration blockingAndTopic(KafkaTemplate te } @Bean + @SuppressWarnings("deprecation") public RetryTopicConfiguration onlyTopic(KafkaTemplate template) { return RetryTopicConfigurationBuilder .newInstance() diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java index bf9a1ba02d..3823c37b6c 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2022 the original author or authors. + * Copyright 2021-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -478,6 +478,7 @@ static class RetryTopicConfigurations extends RetryTopicConfigurationSupport { private static final String DLT_METHOD_NAME = "processDltMessage"; + @SuppressWarnings("deprecation") @Bean public RetryTopicConfiguration firstRetryTopic(KafkaTemplate template) { return RetryTopicConfigurationBuilder From 7ccdf1675c6bf334b8c9405fe7a24367876ccb77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Lima?= Date: Tue, 28 Feb 2023 00:18:53 -0300 Subject: [PATCH 8/9] GH-2496 Ajusting new method isRetryTopic(). --- .../org/springframework/kafka/retrytopic/DestinationTopic.java | 3 ++- .../retrytopic/DestinationTopicPropertiesFactoryTests.java | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java index 77b71fdd61..3021bbc7bd 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java @@ -225,7 +225,8 @@ public boolean isDltTopic() { } public boolean isRetryTopic() { - return Type.RETRY.equals(this.type) || Type.SINGLE_TOPIC_RETRY.equals(this.type); + return Type.RETRY.equals(this.type) || Type.SINGLE_TOPIC_RETRY.equals(this.type) + || Type.REUSABLE_RETRY_TOPIC.equals(this.type); } public String suffix() { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java index 45fbf03ed6..2e08aaeb91 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java @@ -511,6 +511,7 @@ private void assertRetryTopic(DestinationTopic.Properties topicProperties, int m Long expectedDelay, String expectedSuffix, boolean expectedReusableTopic, boolean expectedIsSingleTopicRetry) { assertThat(topicProperties.suffix()).isEqualTo(expectedSuffix); + assertThat(topicProperties.isRetryTopic()).isTrue(); DestinationTopic topic = new DestinationTopic("irrelevant" + topicProperties.suffix(), topicProperties); assertThat(topic.isDltTopic()).isFalse(); assertThat(topic.isSingleTopicRetry()).isEqualTo(expectedIsSingleTopicRetry); From c03860326533876290b8bd493f26c1399912d194 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Lima?= Date: Wed, 1 Mar 2023 19:06:17 -0300 Subject: [PATCH 9/9] GH-2496: Documentation and formatting --- .../src/main/asciidoc/retrytopic.adoc | 8 +++++--- .../kafka/annotation/RetryableTopic.java | 1 + .../kafka/retrytopic/DestinationTopic.java | 18 ++++++++++++++++-- .../RetryTopicConfigurationBuilder.java | 4 +++- .../SameIntervalTopicReuseStrategy.java | 2 +- 5 files changed, 26 insertions(+), 7 deletions(-) diff --git a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc index a673ca8397..4d7a081a53 100644 --- a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc +++ b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc @@ -651,7 +651,7 @@ IMPORTANT: Note that the blocking retries behavior is allowlist - you add the ex IMPORTANT: The non-blocking exception classification behavior also depends on the specific topic's configuration. -==== Topic Amount +==== Topic Naming Retry topics and DLT are named by suffixing the main topic with a provided or default value, appended by either the delay or index for that topic. @@ -662,7 +662,7 @@ Examples: "my-other-topic" -> "my-topic-myRetrySuffix-1000", "my-topic-myRetrySuffix-2000", ..., "my-topic-myDltSuffix". NOTE: The default behavior is to create separate retry topics for each attempt, appended with an index value: retry-0, retry-1, ..., retry-n. -Therefore, by default the amount of retry topics is the configured `maxAttempts` minus 1. +Therefore, by default the number of retry topics is the configured `maxAttempts` minus 1. You can <>, choose whether to append <>, use a <>, and use a <> when using exponential backoffs. @@ -775,7 +775,7 @@ This "final" retry topic will be suffixed with the provided or default suffix, a NOTE: By opting to use a single topic for the retries with the `maxInterval` delay, it may become more viable to configure an exponential retry policy that keeps retrying for a long time, because in this approach you do not need a large amount of topics. -The default behavior is to work with an amount of retry topics equal to the configured `maxAttempts` minus 1, and when using exponential backoff, the retry topics are suffixed with the delay values, with the last retry topics (corresponding to the `maxInterval` delay) being suffixed with an additional index. +The default behavior is to work with the number of retry topics equal to the configured `maxAttempts` minus 1 and, when using exponential backoff, the retry topics are suffixed with the delay values, with the last retry topic (corresponding to the `maxInterval` delay) being suffixed with an additional index. For instance, when configuring the exponential backoff with `initialInterval=1000`, `multiplier=2`, and `maxInterval=16000`, in order to keep trying for one hour, one would need to configure `maxAttempts` as 229, and by default the needed retry topics would be: @@ -797,6 +797,8 @@ When using the strategy that reuses the retry topic for the same intervals, in t * -retry-8000 * -retry-16000 +This will be the default in a future release. + ==== [source, java] ---- diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java index 92c27c7d32..e0137c2589 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java @@ -186,6 +186,7 @@ * {@link SameIntervalTopicReuseStrategy#SINGLE_TOPIC}, it has precedence over * the configuration in {@link #fixedDelayTopicStrategy()}. * @return the strategy. + * @since 3.0.4 */ SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy() default SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java index 3021bbc7bd..18ffa3f67f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java @@ -297,7 +297,10 @@ public boolean isMainEndpoint() { } enum Type { - MAIN, RETRY, + MAIN, + + RETRY, + /** * A single retry topic for all retries. * @@ -305,6 +308,17 @@ enum Type { */ @Deprecated SINGLE_TOPIC_RETRY, - REUSABLE_RETRY_TOPIC, DLT, NO_OPS + + /** + * A retry topic reused along sequential retries + * with the same backoff interval. + * + * @since 3.0.4 + */ + REUSABLE_RETRY_TOPIC, + + DLT, + + NO_OPS } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java index 6c5fb5c4ff..fec32a7154 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java @@ -253,6 +253,7 @@ public RetryTopicConfigurationBuilder setTopicSuffixingStrategy(TopicSuffixingSt * {@link #useSingleTopicForFixedDelays(FixedDelayStrategy)}. * @param sameIntervalTopicReuseStrategy the strategy. * @return the builder. + * @since 3.0.4 */ public RetryTopicConfigurationBuilder sameIntervalTopicReuseStrategy(SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy) { this.sameIntervalTopicReuseStrategy = sameIntervalTopicReuseStrategy; @@ -262,13 +263,14 @@ public RetryTopicConfigurationBuilder sameIntervalTopicReuseStrategy(SameInterva /** * Configure the use of a single retry topic * for the attempts that have the same backoff interval - * (as long as these attempts are in the middle of the chain). + * (as long as these attempts are in the end of the chain). * * Currently used only for the last retries of exponential backoff, * and in a future release this will dictate whether to use * a single retry topic for fixed backoff. * * @return the builder. + * @since 3.0.4 * @see SameIntervalTopicReuseStrategy * */ diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/SameIntervalTopicReuseStrategy.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/SameIntervalTopicReuseStrategy.java index ed6eff0929..8a066cdb3c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/SameIntervalTopicReuseStrategy.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/SameIntervalTopicReuseStrategy.java @@ -27,7 +27,7 @@ * interval in the middle of the retry chain). * * @author João Lima - * @since 3.1 + * @since 3.0.4 * */ public enum SameIntervalTopicReuseStrategy {