Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ pulsar.max-backoff-interval-sec=10
pulsar.consumer-name-delimiter=
pulsar.namespace=default
pulsar.tenant=public
pulsar.transactionCoordinatorEnabled=false

#Consumer
pulsar.consumer.default.dead-letter-policy-max-redeliver-count=-1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public PulsarClient pulsarClient() throws PulsarClientException {
.operationTimeout(pulsarProperties.getOperationTimeoutSec(), TimeUnit.SECONDS)
.startingBackoffInterval(pulsarProperties.getStartingBackoffIntervalMs(), TimeUnit.MILLISECONDS)
.maxBackoffInterval(pulsarProperties.getMaxBackoffIntervalSec(), TimeUnit.SECONDS)
.enableTransaction(pulsarProperties.getTransactionCoordinatorEnabled())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class PulsarProperties {
private String consumerNameDelimiter = "";
private String namespace = "default";
private String tenant = "public";
private Boolean transactionCoordinatorEnabled = false;

public String getServiceUrl() {
return serviceUrl;
Expand Down Expand Up @@ -112,4 +113,12 @@ public String getTenant() {
public void setTenant(String tenant) {
this.tenant = tenant;
}

public Boolean getTransactionCoordinatorEnabled() {
return transactionCoordinatorEnabled;
}

public void setTransactionCoordinatorEnabled(Boolean transactionCoordinatorEnabled) {
this.transactionCoordinatorEnabled = transactionCoordinatorEnabled;
}
}
2 changes: 2 additions & 0 deletions src/main/resources/application-test.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pulsar.consumer.default.ack-timeout-ms=1000
pulsar.consumer.default.dead-letter-policy-max-redeliver-count=1

pulsar.transactionCoordinatorEnabled=true

#Custom Topics
my.custom.topic.name=custom-topic-name-from-app-prop
2 changes: 1 addition & 1 deletion src/test/java/io/github/majusko/pulsar/TestConsumers.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class TestConsumers {
public AtomicInteger failTwiceRetryCount = new AtomicInteger(0);
public AtomicInteger topicOverflowDueToExceptionRetryCount = new AtomicInteger(0);

@PulsarConsumer(topic = "topic-one", clazz = MyMsg.class, serialization = Serialization.JSON)
@PulsarConsumer(topic = "topic-one", clazz = MyMsg.class, subscriptionType = SubscriptionType.Shared)
public void topicOneListener(MyMsg myMsg) {
Assertions.assertNotNull(myMsg);
mockTopicListenerReceived.set(true);
Expand Down