From a8f065c85e4f628b196a01a29efc5a246dd001ab Mon Sep 17 00:00:00 2001 From: majusko Date: Wed, 19 May 2021 22:41:34 +0200 Subject: [PATCH] Added transactional support to configuration. --- README.md | 1 + .../github/majusko/pulsar/PulsarAutoConfiguration.java | 1 + .../majusko/pulsar/properties/PulsarProperties.java | 9 +++++++++ src/main/resources/application-test.properties | 2 ++ .../java/io/github/majusko/pulsar/TestConsumers.java | 2 +- 5 files changed, 14 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index b62bea8..11ced45 100644 --- a/README.md +++ b/README.md @@ -100,6 +100,7 @@ pulsar.max-backoff-interval-sec=10 pulsar.consumer-name-delimiter= pulsar.namespace=default pulsar.tenant=public +pulsar.transactionCoordinatorEnabled=false #Consumer pulsar.consumer.default.dead-letter-policy-max-redeliver-count=-1 diff --git a/src/main/java/io/github/majusko/pulsar/PulsarAutoConfiguration.java b/src/main/java/io/github/majusko/pulsar/PulsarAutoConfiguration.java index ebfa26f..5df6037 100644 --- a/src/main/java/io/github/majusko/pulsar/PulsarAutoConfiguration.java +++ b/src/main/java/io/github/majusko/pulsar/PulsarAutoConfiguration.java @@ -36,6 +36,7 @@ public PulsarClient pulsarClient() throws PulsarClientException { .operationTimeout(pulsarProperties.getOperationTimeoutSec(), TimeUnit.SECONDS) .startingBackoffInterval(pulsarProperties.getStartingBackoffIntervalMs(), TimeUnit.MILLISECONDS) .maxBackoffInterval(pulsarProperties.getMaxBackoffIntervalSec(), TimeUnit.SECONDS) + .enableTransaction(pulsarProperties.getTransactionCoordinatorEnabled()) .build(); } } diff --git a/src/main/java/io/github/majusko/pulsar/properties/PulsarProperties.java b/src/main/java/io/github/majusko/pulsar/properties/PulsarProperties.java index f47f3fd..5a8c8d1 100644 --- a/src/main/java/io/github/majusko/pulsar/properties/PulsarProperties.java +++ b/src/main/java/io/github/majusko/pulsar/properties/PulsarProperties.java @@ -16,6 +16,7 @@ public class PulsarProperties { private String consumerNameDelimiter = ""; private String namespace = "default"; private String tenant = "public"; + private Boolean transactionCoordinatorEnabled = false; public String getServiceUrl() { return serviceUrl; @@ -112,4 +113,12 @@ public String getTenant() { public void setTenant(String tenant) { this.tenant = tenant; } + + public Boolean getTransactionCoordinatorEnabled() { + return transactionCoordinatorEnabled; + } + + public void setTransactionCoordinatorEnabled(Boolean transactionCoordinatorEnabled) { + this.transactionCoordinatorEnabled = transactionCoordinatorEnabled; + } } \ No newline at end of file diff --git a/src/main/resources/application-test.properties b/src/main/resources/application-test.properties index dba2c8e..3d7158f 100644 --- a/src/main/resources/application-test.properties +++ b/src/main/resources/application-test.properties @@ -1,5 +1,7 @@ pulsar.consumer.default.ack-timeout-ms=1000 pulsar.consumer.default.dead-letter-policy-max-redeliver-count=1 +pulsar.transactionCoordinatorEnabled=true + #Custom Topics my.custom.topic.name=custom-topic-name-from-app-prop \ No newline at end of file diff --git a/src/test/java/io/github/majusko/pulsar/TestConsumers.java b/src/test/java/io/github/majusko/pulsar/TestConsumers.java index 202d0c3..c94bb0f 100644 --- a/src/test/java/io/github/majusko/pulsar/TestConsumers.java +++ b/src/test/java/io/github/majusko/pulsar/TestConsumers.java @@ -29,7 +29,7 @@ public class TestConsumers { public AtomicInteger failTwiceRetryCount = new AtomicInteger(0); public AtomicInteger topicOverflowDueToExceptionRetryCount = new AtomicInteger(0); - @PulsarConsumer(topic = "topic-one", clazz = MyMsg.class, serialization = Serialization.JSON) + @PulsarConsumer(topic = "topic-one", clazz = MyMsg.class, subscriptionType = SubscriptionType.Shared) public void topicOneListener(MyMsg myMsg) { Assertions.assertNotNull(myMsg); mockTopicListenerReceived.set(true);