Skip to content

Commit a01cc3f

Browse files
author
Vihas Splunk
committed
Make queue capacity configurable
1 parent d01add4 commit a01cc3f

File tree

3 files changed

+17
-1
lines changed

3 files changed

+17
-1
lines changed

src/main/java/com/splunk/hecclient/ConcurrentHec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public ConcurrentHec(int numberOfThreads, boolean useAck, HecConfig config, Poll
3737
}
3838

3939
public ConcurrentHec(int numberOfThreads, boolean useAck, HecConfig config, PollerCallback cb, LoadBalancerInf loadBalancer) {
40-
batches = new LinkedBlockingQueue<>(100);
40+
batches = new LinkedBlockingQueue<>(config.getConcurrentHecQueueCapacity());
4141
ThreadFactory e = (Runnable r) -> new Thread(r, "Concurrent-HEC-worker");
4242
executorService = Executors.newFixedThreadPool(numberOfThreads, e);
4343
initHec(numberOfThreads, useAck, config, cb, loadBalancer);

src/main/java/com/splunk/hecclient/HecConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public final class HecConfig {
3939
private int lbPollInterval = 120; // in seconds
4040
private String kerberosPrincipal;
4141
private String kerberosKeytabPath;
42+
private int concurrentHecQueueCapacity = 100;
4243

4344
public HecConfig(List<String> uris, String token) {
4445
this.uris = uris;
@@ -101,6 +102,10 @@ public int getBackoffThresholdSeconds() {
101102
return backoffThresholdSeconds;
102103
}
103104

105+
public int getConcurrentHecQueueCapacity() {
106+
return concurrentHecQueueCapacity;
107+
}
108+
104109
public boolean getHasCustomTrustStore() { return hasCustomTrustStore; }
105110

106111
public String getTrustStorePath() { return trustStorePath; }
@@ -207,6 +212,11 @@ public HecConfig setKerberosKeytabPath(String kerberosKeytabPath) {
207212
return this;
208213
}
209214

215+
public HecConfig setConcurrentHecQueueCapacity(int concurrentHecQueueCapacity) {
216+
this.concurrentHecQueueCapacity = concurrentHecQueueCapacity;
217+
return this;
218+
}
219+
210220
public boolean kerberosAuthEnabled() {
211221
return !kerberosPrincipal().isEmpty();
212222
}

src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
5555
static final String SOCKET_TIMEOUT_CONF = "splunk.hec.socket.timeout"; // seconds
5656
static final String SSL_VALIDATE_CERTIFICATES_CONF = "splunk.hec.ssl.validate.certs";
5757
static final String ENABLE_COMPRESSSION_CONF = "splunk.hec.enable.compression";
58+
// only applicable when "splunk.hec.threads" > 1
59+
static final String QUEUE_CAPACITY_CONF = "splunk.hec.concurrent.queue.capacity";
60+
5861
// Acknowledgement Parameters
5962
// Use Ack
6063
static final String ACK_CONF = "splunk.hec.ack.enabled";
@@ -257,6 +260,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
257260
final boolean enableTimestampExtraction;
258261
final String regex;
259262
final String timestampFormat;
263+
final int queueCapacity;
260264

261265
SplunkSinkConnectorConfig(Map<String, String> taskConfig) {
262266
super(conf(), taskConfig);
@@ -312,6 +316,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
312316
regex = getString(REGEX_CONF);
313317
timestampFormat = getString(TIMESTAMP_FORMAT_CONF).trim();
314318
validateRegexForTimestamp(regex);
319+
queueCapacity = getInt(QUEUE_CAPACITY_CONF);
315320

316321
}
317322

@@ -385,6 +390,7 @@ public HecConfig getHecConfig() {
385390
.setHasCustomTrustStore(hasTrustStorePath)
386391
.setKerberosPrincipal(kerberosUserPrincipal)
387392
.setKerberosKeytabPath(kerberosKeytabPath);
393+
.setConcurrentHecQueueCapacity(queueCapacity);
388394
return config;
389395
}
390396

0 commit comments

Comments
 (0)