From 1a9d63d6a2ff6534d6a1a00affbde7400953ba13 Mon Sep 17 00:00:00 2001 From: Wzy19930507 <1208931582@qq.com> Date: Fri, 26 Jan 2024 13:29:30 +0800 Subject: [PATCH 1/3] Polish `DestinationTopicPropertiesFactory` and `DestinationTopicPropertiesFactoryTests` --- .../DestinationTopicPropertiesFactory.java | 178 ++++++------------ ...estinationTopicPropertiesFactoryTests.java | 51 ++--- 2 files changed, 80 insertions(+), 149 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java index 462e63b40d..034d8a4ae4 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java @@ -16,10 +16,8 @@ package org.springframework.kafka.retrytopic; -import java.util.Arrays; import java.util.List; import java.util.function.BiPredicate; -import java.util.stream.Collectors; import java.util.stream.IntStream; import org.springframework.classify.BinaryExceptionClassifier; @@ -36,6 +34,7 @@ * @author Tomaz Fernandes * @author Gary Russell * @author João Lima + * @author Wang Zhiyang * @since 2.7 * */ @@ -47,19 +46,21 @@ public class DestinationTopicPropertiesFactory { private final List backOffValues; - private final BinaryExceptionClassifier exceptionClassifier; - private final int numPartitions; private final int maxAttempts; - private final KafkaOperations kafkaOperations; + private final boolean isSameIntervalReuse; - private final DltStrategy dltStrategy; + private final boolean isFixedDelay; + + private final int retryTopicsAmount; + + private final BiPredicate shouldRetryOn; - private final TopicSuffixingStrategy topicSuffixingStrategy; + private final KafkaOperations kafkaOperations; - private final SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy; + private final DltStrategy dltStrategy; private final long timeout; @@ -90,15 +91,19 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff this.dltStrategy = dltStrategy; this.kafkaOperations = kafkaOperations; - this.exceptionClassifier = exceptionClassifier; this.numPartitions = numPartitions; - this.topicSuffixingStrategy = topicSuffixingStrategy; - this.sameIntervalTopicReuseStrategy = sameIntervalTopicReuseStrategy; this.timeout = timeout; this.destinationTopicSuffixes = new DestinationTopicSuffixes(retryTopicSuffix, dltSuffix); this.backOffValues = backOffValues; - // Max Attempts include the initial try. - this.maxAttempts = this.backOffValues.size() + 1; + // Max Attempts to include the initial try. + int backOffValuesSize = this.backOffValues.size(); + this.maxAttempts = backOffValuesSize + 1; + this.shouldRetryOn = (attempt, throwable) -> attempt < this.maxAttempts + && exceptionClassifier.classify(throwable); + this.isSameIntervalReuse = SameIntervalTopicReuseStrategy.SINGLE_TOPIC.equals(sameIntervalTopicReuseStrategy); + this.retryTopicsAmount = backOffValuesSize - reusableTopicAttempts(); + this.isFixedDelay = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE.equals(topicSuffixingStrategy) + || backOffValuesSize > 1 && backOffValues.stream().distinct().count() == 1; } /** @@ -113,71 +118,26 @@ public DestinationTopicPropertiesFactory autoStartDltHandler(@Nullable Boolean a } public List createProperties() { - return isSingleTopicFixedDelay() - ? createPropertiesForFixedDelaySingleTopic() - : createPropertiesForDefaultTopicStrategy(); - } - - private List createPropertiesForFixedDelaySingleTopic() { - return isNoDltStrategy() - ? Arrays.asList(createMainTopicProperties(), - createRetryProperties(1, getShouldRetryOn())) - : Arrays.asList(createMainTopicProperties(), - createRetryProperties(1, getShouldRetryOn()), - createDltProperties()); - } - - private boolean isSingleTopicFixedDelay() { - return (this.backOffValues.size() == 1 || isFixedDelay()) && isSingleTopicSameIntervalTopicReuseStrategy(); - } - - private boolean isSingleTopicSameIntervalTopicReuseStrategy() { - return SameIntervalTopicReuseStrategy.SINGLE_TOPIC.equals(this.sameIntervalTopicReuseStrategy); - } - - private List createPropertiesForDefaultTopicStrategy() { - - int retryTopicsAmount = retryTopicsAmount(); - - return IntStream.rangeClosed(0, isNoDltStrategy() - ? retryTopicsAmount - : retryTopicsAmount + 1) - .mapToObj(this::createTopicProperties) - .collect(Collectors.toList()); - } - - int retryTopicsAmount() { - return this.backOffValues.size() - reusableTopicAttempts(); - } - - private int reusableTopicAttempts() { - return this.backOffValues.size() > 0 - ? !isFixedDelay() - ? isSingleTopicSameIntervalTopicReuseStrategy() - // Assuming that duplicates are always in - // the end of the list. - ? amountOfDuplicates(this.backOffValues.get(this.backOffValues.size() - 1)) - 1 - : 0 - : 0 - : 0; - } - - private boolean isNoDltStrategy() { - return DltStrategy.NO_DLT.equals(this.dltStrategy); + int topicAmount = DltStrategy.NO_DLT.equals(this.dltStrategy) + ? this.retryTopicsAmount + : this.retryTopicsAmount + 1; + return IntStream + .rangeClosed(0, topicAmount) + .mapToObj(this::createTopicProperties) + .toList(); } private DestinationTopic.Properties createTopicProperties(int index) { - BiPredicate shouldRetryOn = getShouldRetryOn(); return index == 0 ? createMainTopicProperties() - : (index <= this.retryTopicsAmount()) - ? createRetryProperties(index, shouldRetryOn) + : index <= this.retryTopicsAmount + ? createRetryProperties(index) : createDltProperties(); } private DestinationTopic.Properties createMainTopicProperties() { return new DestinationTopic.Properties(0, MAIN_TOPIC_SUFFIX, DestinationTopic.Type.MAIN, this.maxAttempts, - this.numPartitions, this.dltStrategy, this.kafkaOperations, getShouldRetryOn(), this.timeout); + this.numPartitions, this.dltStrategy, this.kafkaOperations, this.shouldRetryOn, this.timeout); } private DestinationTopic.Properties createDltProperties() { @@ -186,49 +146,42 @@ private DestinationTopic.Properties createDltProperties() { this.kafkaOperations, (a, e) -> false, this.timeout, this.autoStartDltHandler); } - private BiPredicate getShouldRetryOn() { - return (attempt, throwable) -> attempt < this.maxAttempts && this.exceptionClassifier.classify(throwable); - } - - private DestinationTopic.Properties createRetryProperties(int index, - BiPredicate shouldRetryOn) { - + private DestinationTopic.Properties createRetryProperties(int index) { int indexInBackoffValues = index - 1; - Long thisBackOffValue = this.backOffValues.get(indexInBackoffValues); - DestinationTopic.Type topicTypeToUse = isDelayWithReusedTopic(thisBackOffValue) - ? Type.REUSABLE_RETRY_TOPIC - : Type.RETRY; - return createProperties(topicTypeToUse, shouldRetryOn, indexInBackoffValues, - getTopicSuffix(indexInBackoffValues, thisBackOffValue)); - } - - private String getTopicSuffix(int indexInBackoffValues, Long thisBackOffValue) { - return isSingleTopicFixedDelay() - ? this.destinationTopicSuffixes.getRetrySuffix() - : isSuffixWithIndexStrategy() || isFixedDelay() - ? joinWithRetrySuffix(indexInBackoffValues) - : hasDuplicates(thisBackOffValue) - ? joinWithRetrySuffix(thisBackOffValue) - .concat(suffixForRepeatedInterval(indexInBackoffValues, thisBackOffValue)) - : joinWithRetrySuffix(thisBackOffValue); - } - - private String suffixForRepeatedInterval(int indexInBackoffValues, Long thisBackOffValue) { - return isSingleTopicSameIntervalTopicReuseStrategy() - ? "" - : "-" + getIndexInBackoffValues(indexInBackoffValues, thisBackOffValue); + long thisBackOffValue = this.backOffValues.get(indexInBackoffValues); + return createProperties(thisBackOffValue, getTopicSuffix(indexInBackoffValues, thisBackOffValue)); } - private boolean isDelayWithReusedTopic(Long backoffValue) { - return hasDuplicates(backoffValue) && isSingleTopicSameIntervalTopicReuseStrategy(); + private String getTopicSuffix(int indexInBackoffValues, long thisBackOffValue) { + if (this.isSameIntervalReuse && this.retryTopicsAmount == 1) { + return this.destinationTopicSuffixes.getRetrySuffix(); + } + else if (this.isFixedDelay) { + return joinWithRetrySuffix(indexInBackoffValues); + } + else { + String retrySuffix = joinWithRetrySuffix(thisBackOffValue); + if (!this.isSameIntervalReuse && hasDuplicates(thisBackOffValue)) { + return retrySuffix.concat("-" + getIndexInBackoffValues(indexInBackoffValues, thisBackOffValue)); + } + return retrySuffix; + } } private int getIndexInBackoffValues(int indexInBackoffValues, Long thisBackOffValue) { return indexInBackoffValues - this.backOffValues.indexOf(thisBackOffValue); } - private boolean isSuffixWithIndexStrategy() { - return TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE.equals(this.topicSuffixingStrategy); + private DestinationTopic.Type getDestinationTopicType(Long backOffValue) { + return this.isSameIntervalReuse && hasDuplicates(backOffValue) ? Type.REUSABLE_RETRY_TOPIC : Type.RETRY; + } + + private int reusableTopicAttempts() { + if (this.isSameIntervalReuse && this.backOffValues.size() > 1) { + // Assuming that duplicates are always at the end of the list. + return amountOfDuplicates(this.backOffValues.get(this.backOffValues.size() - 1)) - 1; + } + return 0; } private boolean hasDuplicates(Long thisBackOffValue) { @@ -238,22 +191,15 @@ private boolean hasDuplicates(Long thisBackOffValue) { private int amountOfDuplicates(Long thisBackOffValue) { return Long.valueOf(this.backOffValues .stream() - .filter(value -> value.equals(thisBackOffValue)) - .count()).intValue(); - } - - private DestinationTopic.Properties createProperties(DestinationTopic.Type topicType, - BiPredicate shouldRetryOn, - int indexInBackoffValues, - String suffix) { - return new DestinationTopic.Properties(this.backOffValues.get(indexInBackoffValues), suffix, - topicType, this.maxAttempts, this.numPartitions, this.dltStrategy, - this.kafkaOperations, shouldRetryOn, this.timeout); + .filter(thisBackOffValue::equals) + .count()) + .intValue(); } - private boolean isFixedDelay() { - // If all values are the same, such as in NoBackOffPolicy and FixedBackoffPolicy - return this.backOffValues.size() > 1 && this.backOffValues.stream().distinct().count() == 1; + private DestinationTopic.Properties createProperties(long delayMs, String suffix) { + return new DestinationTopic.Properties(delayMs, suffix, getDestinationTopicType(delayMs), + this.maxAttempts, this.numPartitions, this.dltStrategy, this.kafkaOperations, this.shouldRetryOn, + this.timeout); } private String joinWithRetrySuffix(long parameter) { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java index e83bcec19f..f848da0a95 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java @@ -19,7 +19,6 @@ import static org.assertj.core.api.Assertions.assertThat; import java.util.List; -import java.util.stream.Collectors; import java.util.stream.IntStream; import org.junit.jupiter.api.BeforeEach; @@ -38,6 +37,7 @@ /** * @author Tomaz Fernandes + * @author Wang Zhiyang * @since 2.7 */ @ExtendWith(MockitoExtension.class) @@ -124,7 +124,6 @@ private void assertDltTopic(DestinationTopic.Properties dltProperties) { } @Test - @SuppressWarnings("deprecation") void shouldCreateTwoRetryPropertiesForMultipleBackoffValues() { // when ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); @@ -143,7 +142,7 @@ void shouldCreateTwoRetryPropertiesForMultipleBackoffValues() { List destinationTopicList = propertiesList .stream() .map(properties -> new DestinationTopic("mainTopic" + properties.suffix(), properties)) - .collect(Collectors.toList()); + .toList(); // then assertThat(propertiesList.size() == 4).isTrue(); @@ -197,7 +196,6 @@ void shouldNotCreateDltProperties() { } @Test - @SuppressWarnings("deprecation") void shouldCreateOneRetryPropertyForFixedBackoffWithSingleTopicSameIntervalReuseStrategy() { // when @@ -216,7 +214,7 @@ void shouldCreateOneRetryPropertyForFixedBackoffWithSingleTopicSameIntervalReuse List destinationTopicList = propertiesList .stream() .map(properties -> new DestinationTopic("mainTopic" + properties.suffix(), properties)) - .collect(Collectors.toList()); + .toList(); // then assertThat(propertiesList.size() == 3).isTrue(); @@ -239,7 +237,6 @@ void shouldCreateOneRetryPropertyForFixedBackoffWithSingleTopicSameIntervalReuse } @Test - @SuppressWarnings("deprecation") void shouldCreateRetryPropertiesForFixedBackoffWithMultiTopicStrategy() { // when @@ -258,7 +255,7 @@ void shouldCreateRetryPropertiesForFixedBackoffWithMultiTopicStrategy() { List destinationTopicList = propertiesList .stream() .map(properties -> new DestinationTopic("mainTopic" + properties.suffix(), properties)) - .collect(Collectors.toList()); + .toList(); // then assertThat(propertiesList.size() == 4).isTrue(); @@ -288,7 +285,6 @@ void shouldCreateRetryPropertiesForFixedBackoffWithMultiTopicStrategy() { } @Test - @SuppressWarnings("deprecation") void shouldSuffixRetryTopicsWithIndexIfSuffixWithIndexStrategy() { // setup @@ -305,11 +301,10 @@ void shouldSuffixRetryTopicsWithIndexIfSuffixWithIndexStrategy() { // then IntStream.range(1, maxAttempts).forEach(index -> assertThat(propertiesList.get(index).suffix()) - .isEqualTo(retryTopicSuffix + "-" + String.valueOf(index - 1))); + .isEqualTo(retryTopicSuffix + "-" + (index - 1))); } @Test - @SuppressWarnings("deprecation") void shouldSuffixRetryTopicsWithIndexIfFixedDelayWithMultipleTopics() { // setup @@ -328,11 +323,10 @@ void shouldSuffixRetryTopicsWithIndexIfFixedDelayWithMultipleTopics() { // then IntStream.range(1, maxAttempts) .forEach(index -> assertThat(propertiesList.get(index).suffix()).isEqualTo(retryTopicSuffix + - "-" + String.valueOf(index - 1))); + "-" + (index - 1))); } @Test - @SuppressWarnings("deprecation") void shouldSuffixRetryTopicsWithMixedIfMaxDelayReached() { // setup @@ -351,7 +345,6 @@ void shouldSuffixRetryTopicsWithMixedIfMaxDelayReached() { List propertiesList = factory.createProperties(); // then - assertThat(factory.retryTopicsAmount() == 4).isTrue(); assertThat(propertiesList.size() == 6).isTrue(); assertThat(propertiesList.get(0).suffix()).isEqualTo(""); assertThat(propertiesList.get(1).suffix()).isEqualTo(retryTopicSuffix + "-1000"); @@ -362,7 +355,6 @@ void shouldSuffixRetryTopicsWithMixedIfMaxDelayReached() { } @Test - @SuppressWarnings("deprecation") void shouldReuseRetryTopicsIfMaxDelayReachedWithDelayValueSuffixingStrategy() { // setup @@ -381,17 +373,15 @@ void shouldReuseRetryTopicsIfMaxDelayReachedWithDelayValueSuffixingStrategy() { List propertiesList = factory.createProperties(); // then - assertThat(factory.retryTopicsAmount()).isEqualTo(3); assertThat(propertiesList.size()).isEqualTo(5); assertThat(propertiesList.get(0).suffix()).isEqualTo(""); - assertRetryTopic(propertiesList.get(1), maxAttempts, 1000L, retryTopicSuffix + "-1000", false, false); - assertRetryTopic(propertiesList.get(2), maxAttempts, 2000L, retryTopicSuffix + "-2000", false, false); - assertRetryTopic(propertiesList.get(3), maxAttempts, 3000L, retryTopicSuffix + "-3000", true, false); + assertRetryTopic(propertiesList.get(1), maxAttempts, 1000L, retryTopicSuffix + "-1000", false); + assertRetryTopic(propertiesList.get(2), maxAttempts, 2000L, retryTopicSuffix + "-2000", false); + assertRetryTopic(propertiesList.get(3), maxAttempts, 3000L, retryTopicSuffix + "-3000", true); assertThat(propertiesList.get(4).suffix()).isEqualTo(dltSuffix); } @Test - @SuppressWarnings("deprecation") void shouldReuseRetryTopicsIfMaxDelayReachedWithIndexValueSuffixingStrategy() { // setup @@ -410,17 +400,15 @@ void shouldReuseRetryTopicsIfMaxDelayReachedWithIndexValueSuffixingStrategy() { List propertiesList = factory.createProperties(); // then - assertThat(factory.retryTopicsAmount()).isEqualTo(3); assertThat(propertiesList.size()).isEqualTo(5); assertThat(propertiesList.get(0).suffix()).isEqualTo(""); - assertRetryTopic(propertiesList.get(1), maxAttempts, 1000L, retryTopicSuffix + "-0", false, false); - assertRetryTopic(propertiesList.get(2), maxAttempts, 2000L, retryTopicSuffix + "-1", false, false); - assertRetryTopic(propertiesList.get(3), maxAttempts, 3000L, retryTopicSuffix + "-2", true, false); + assertRetryTopic(propertiesList.get(1), maxAttempts, 1000L, retryTopicSuffix + "-0", false); + assertRetryTopic(propertiesList.get(2), maxAttempts, 2000L, retryTopicSuffix + "-1", false); + assertRetryTopic(propertiesList.get(3), maxAttempts, 3000L, retryTopicSuffix + "-2", true); assertThat(propertiesList.get(4).suffix()).isEqualTo(dltSuffix); } @Test - @SuppressWarnings("deprecation") void shouldNotReuseRetryTopicsIfRepeatedIntervalsAreInTheMiddleOfChain() { // setup @@ -435,21 +423,18 @@ void shouldNotReuseRetryTopicsIfRepeatedIntervalsAreInTheMiddleOfChain() { List propertiesList = factory.createProperties(); // then - assertThat(factory.retryTopicsAmount()).isEqualTo(5); assertThat(propertiesList.size()).isEqualTo(7); assertThat(propertiesList.get(0).suffix()).isEqualTo(""); - assertRetryTopic(propertiesList.get(1), maxAttempts, 1000L, retryTopicSuffix + "-1000", false, false); - assertRetryTopic(propertiesList.get(2), maxAttempts, 2000L, retryTopicSuffix + "-2000-0", false, false); - assertRetryTopic(propertiesList.get(3), maxAttempts, 2000L, retryTopicSuffix + "-2000-1", false, false); - assertRetryTopic(propertiesList.get(4), maxAttempts, 2000L, retryTopicSuffix + "-2000-2", false, false); - assertRetryTopic(propertiesList.get(5), maxAttempts, 3000L, retryTopicSuffix + "-3000", false, false); + assertRetryTopic(propertiesList.get(1), maxAttempts, 1000L, retryTopicSuffix + "-1000", false); + assertRetryTopic(propertiesList.get(2), maxAttempts, 2000L, retryTopicSuffix + "-2000-0", false); + assertRetryTopic(propertiesList.get(3), maxAttempts, 2000L, retryTopicSuffix + "-2000-1", false); + assertRetryTopic(propertiesList.get(4), maxAttempts, 2000L, retryTopicSuffix + "-2000-2", false); + assertRetryTopic(propertiesList.get(5), maxAttempts, 3000L, retryTopicSuffix + "-3000", false); assertThat(propertiesList.get(6).suffix()).isEqualTo(dltSuffix); } - @SuppressWarnings("deprecation") private void assertRetryTopic(DestinationTopic.Properties topicProperties, int maxAttempts, - Long expectedDelay, String expectedSuffix, boolean expectedReusableTopic, - boolean expectedIsSingleTopicRetry) { + Long expectedDelay, String expectedSuffix, boolean expectedReusableTopic) { assertThat(topicProperties.suffix()).isEqualTo(expectedSuffix); assertThat(topicProperties.isRetryTopic()).isTrue(); DestinationTopic topic = new DestinationTopic("irrelevant" + topicProperties.suffix(), topicProperties); From c85a8ba2501172eb2392656f5bf1a4c7cf5c534b Mon Sep 17 00:00:00 2001 From: Wzy19930507 <1208931582@qq.com> Date: Sat, 27 Jan 2024 08:39:21 +0800 Subject: [PATCH 2/3] change copyright year to 2024 --- .../kafka/retrytopic/DestinationTopicPropertiesFactory.java | 4 ++-- .../retrytopic/DestinationTopicPropertiesFactoryTests.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java index 034d8a4ae4..f3190f0beb 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -95,8 +95,8 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff this.timeout = timeout; this.destinationTopicSuffixes = new DestinationTopicSuffixes(retryTopicSuffix, dltSuffix); this.backOffValues = backOffValues; - // Max Attempts to include the initial try. int backOffValuesSize = this.backOffValues.size(); + // Max Attempts to include the initial try. this.maxAttempts = backOffValuesSize + 1; this.shouldRetryOn = (attempt, throwable) -> attempt < this.maxAttempts && exceptionClassifier.classify(throwable); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java index f848da0a95..82ea3fc5f6 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 598322aac8dd9a01129ba9a540c068023cee6a29 Mon Sep 17 00:00:00 2001 From: Wzy19930507 <1208931582@qq.com> Date: Mon, 29 Jan 2024 16:09:17 +0800 Subject: [PATCH 3/3] remove method `getIndexInBackoffValues` cause ambiguity and polish method `createProperties` --- .../DestinationTopicPropertiesFactory.java | 55 ++++++++----------- 1 file changed, 22 insertions(+), 33 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java index f3190f0beb..1119bf39e9 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java @@ -16,9 +16,10 @@ package org.springframework.kafka.retrytopic; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.function.BiPredicate; -import java.util.stream.IntStream; import org.springframework.classify.BinaryExceptionClassifier; import org.springframework.kafka.core.KafkaOperations; @@ -96,14 +97,14 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff this.destinationTopicSuffixes = new DestinationTopicSuffixes(retryTopicSuffix, dltSuffix); this.backOffValues = backOffValues; int backOffValuesSize = this.backOffValues.size(); + this.isSameIntervalReuse = SameIntervalTopicReuseStrategy.SINGLE_TOPIC.equals(sameIntervalTopicReuseStrategy); + this.isFixedDelay = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE.equals(topicSuffixingStrategy) + || backOffValuesSize > 1 && backOffValues.stream().distinct().count() == 1; // Max Attempts to include the initial try. this.maxAttempts = backOffValuesSize + 1; this.shouldRetryOn = (attempt, throwable) -> attempt < this.maxAttempts && exceptionClassifier.classify(throwable); - this.isSameIntervalReuse = SameIntervalTopicReuseStrategy.SINGLE_TOPIC.equals(sameIntervalTopicReuseStrategy); this.retryTopicsAmount = backOffValuesSize - reusableTopicAttempts(); - this.isFixedDelay = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE.equals(topicSuffixingStrategy) - || backOffValuesSize > 1 && backOffValues.stream().distinct().count() == 1; } /** @@ -118,21 +119,15 @@ public DestinationTopicPropertiesFactory autoStartDltHandler(@Nullable Boolean a } public List createProperties() { - int topicAmount = DltStrategy.NO_DLT.equals(this.dltStrategy) - ? this.retryTopicsAmount - : this.retryTopicsAmount + 1; - return IntStream - .rangeClosed(0, topicAmount) - .mapToObj(this::createTopicProperties) - .toList(); - } - - private DestinationTopic.Properties createTopicProperties(int index) { - return index == 0 - ? createMainTopicProperties() - : index <= this.retryTopicsAmount - ? createRetryProperties(index) - : createDltProperties(); + List list = new ArrayList<>(this.retryTopicsAmount + 2); + list.add(createMainTopicProperties()); + for (int backOffIndex = 0; backOffIndex < this.retryTopicsAmount; backOffIndex++) { + list.add(createRetryProperties(backOffIndex)); + } + if (!DltStrategy.NO_DLT.equals(this.dltStrategy)) { + list.add(createDltProperties()); + } + return Collections.unmodifiableList(list); } private DestinationTopic.Properties createMainTopicProperties() { @@ -146,32 +141,27 @@ private DestinationTopic.Properties createDltProperties() { this.kafkaOperations, (a, e) -> false, this.timeout, this.autoStartDltHandler); } - private DestinationTopic.Properties createRetryProperties(int index) { - int indexInBackoffValues = index - 1; - long thisBackOffValue = this.backOffValues.get(indexInBackoffValues); - return createProperties(thisBackOffValue, getTopicSuffix(indexInBackoffValues, thisBackOffValue)); + private DestinationTopic.Properties createRetryProperties(int backOffIndex) { + long thisBackOffValue = this.backOffValues.get(backOffIndex); + return createProperties(thisBackOffValue, getTopicSuffix(backOffIndex, thisBackOffValue)); } - private String getTopicSuffix(int indexInBackoffValues, long thisBackOffValue) { + private String getTopicSuffix(int backOffIndex, long thisBackOffValue) { if (this.isSameIntervalReuse && this.retryTopicsAmount == 1) { return this.destinationTopicSuffixes.getRetrySuffix(); } else if (this.isFixedDelay) { - return joinWithRetrySuffix(indexInBackoffValues); + return joinWithRetrySuffix(backOffIndex); } else { String retrySuffix = joinWithRetrySuffix(thisBackOffValue); if (!this.isSameIntervalReuse && hasDuplicates(thisBackOffValue)) { - return retrySuffix.concat("-" + getIndexInBackoffValues(indexInBackoffValues, thisBackOffValue)); + return retrySuffix.concat("-" + (backOffIndex - this.backOffValues.indexOf(thisBackOffValue))); } return retrySuffix; } } - private int getIndexInBackoffValues(int indexInBackoffValues, Long thisBackOffValue) { - return indexInBackoffValues - this.backOffValues.indexOf(thisBackOffValue); - } - private DestinationTopic.Type getDestinationTopicType(Long backOffValue) { return this.isSameIntervalReuse && hasDuplicates(backOffValue) ? Type.REUSABLE_RETRY_TOPIC : Type.RETRY; } @@ -197,9 +187,8 @@ private int amountOfDuplicates(Long thisBackOffValue) { } private DestinationTopic.Properties createProperties(long delayMs, String suffix) { - return new DestinationTopic.Properties(delayMs, suffix, getDestinationTopicType(delayMs), - this.maxAttempts, this.numPartitions, this.dltStrategy, this.kafkaOperations, this.shouldRetryOn, - this.timeout); + return new DestinationTopic.Properties(delayMs, suffix, getDestinationTopicType(delayMs), this.maxAttempts, + this.numPartitions, this.dltStrategy, this.kafkaOperations, this.shouldRetryOn, this.timeout); } private String joinWithRetrySuffix(long parameter) {