Skip to content

Commit dd8f7d3

Browse files
author
Vihas Splunk
committed
Make queue capacity configurable
1 parent d211a80 commit dd8f7d3

File tree

3 files changed

+21
-3
lines changed

3 files changed

+21
-3
lines changed

src/main/java/com/splunk/hecclient/ConcurrentHec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public ConcurrentHec(int numberOfThreads, boolean useAck, HecConfig config, Poll
3737
}
3838

3939
public ConcurrentHec(int numberOfThreads, boolean useAck, HecConfig config, PollerCallback cb, LoadBalancerInf loadBalancer) {
40-
batches = new LinkedBlockingQueue<>(100);
40+
batches = new LinkedBlockingQueue<>(config.getConcurrentHecQueueCapacity());
4141
ThreadFactory e = (Runnable r) -> new Thread(r, "Concurrent-HEC-worker");
4242
executorService = Executors.newFixedThreadPool(numberOfThreads, e);
4343
initHec(numberOfThreads, useAck, config, cb, loadBalancer);

src/main/java/com/splunk/hecclient/HecConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public final class HecConfig {
3939
private int lbPollInterval = 120; // in seconds
4040
private String kerberosPrincipal;
4141
private String kerberosKeytabPath;
42+
private int concurrentHecQueueCapacity = 100;
4243

4344
public HecConfig(List<String> uris, String token) {
4445
this.uris = uris;
@@ -101,6 +102,10 @@ public int getBackoffThresholdSeconds() {
101102
return backoffThresholdSeconds;
102103
}
103104

105+
public int getConcurrentHecQueueCapacity() {
106+
return concurrentHecQueueCapacity;
107+
}
108+
104109
public boolean getHasCustomTrustStore() { return hasCustomTrustStore; }
105110

106111
public String getTrustStorePath() { return trustStorePath; }
@@ -207,6 +212,11 @@ public HecConfig setKerberosKeytabPath(String kerberosKeytabPath) {
207212
return this;
208213
}
209214

215+
public HecConfig setConcurrentHecQueueCapacity(int concurrentHecQueueCapacity) {
216+
this.concurrentHecQueueCapacity = concurrentHecQueueCapacity;
217+
return this;
218+
}
219+
210220
public boolean kerberosAuthEnabled() {
211221
return !kerberosPrincipal().isEmpty();
212222
}

src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
5555
static final String SOCKET_TIMEOUT_CONF = "splunk.hec.socket.timeout"; // seconds
5656
static final String SSL_VALIDATE_CERTIFICATES_CONF = "splunk.hec.ssl.validate.certs";
5757
static final String ENABLE_COMPRESSSION_CONF = "splunk.hec.enable.compression";
58+
// only applicable when "splunk.hec.threads" > 1
59+
static final String QUEUE_CAPACITY_CONF = "splunk.hec.concurrent.queue.capacity";
60+
5861
// Acknowledgement Parameters
5962
// Use Ack
6063
static final String ACK_CONF = "splunk.hec.ack.enabled";
@@ -192,6 +195,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
192195
static final String HEADER_SOURCETYPE_DOC = "Header to use for Splunk Header Sourcetype";
193196
static final String HEADER_HOST_DOC = "Header to use for Splunk Header Host";
194197

198+
static final String QUEUE_CAPACITY_DOC = "This setting controls the queue capacity for concurrency";
195199
// Load Balancer
196200
static final String LB_POLL_INTERVAL_DOC = "This setting controls the load balancer polling interval. By default, "
197201
+ "this setting is 120 seconds.";
@@ -257,6 +261,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
257261
final boolean enableTimestampExtraction;
258262
final String regex;
259263
final String timestampFormat;
264+
final int queueCapacity;
260265

261266
SplunkSinkConnectorConfig(Map<String, String> taskConfig) {
262267
super(conf(), taskConfig);
@@ -312,6 +317,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
312317
regex = getString(REGEX_CONF);
313318
timestampFormat = getString(TIMESTAMP_FORMAT_CONF).trim();
314319
validateRegexForTimestamp(regex);
320+
queueCapacity = getInt(QUEUE_CAPACITY_CONF);
315321

316322
}
317323

@@ -360,7 +366,8 @@ public static ConfigDef conf() {
360366
.define(KERBEROS_KEYTAB_PATH_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, KERBEROS_KEYTAB_LOCATION_DOC)
361367
.define(ENABLE_TIMESTAMP_EXTRACTION_CONF, ConfigDef.Type.BOOLEAN, false , ConfigDef.Importance.MEDIUM, ENABLE_TIMESTAMP_EXTRACTION_DOC)
362368
.define(REGEX_CONF, ConfigDef.Type.STRING, "" , ConfigDef.Importance.MEDIUM, REGEX_DOC)
363-
.define(TIMESTAMP_FORMAT_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, TIMESTAMP_FORMAT_DOC);
369+
.define(TIMESTAMP_FORMAT_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, TIMESTAMP_FORMAT_DOC)
370+
.define(QUEUE_CAPACITY_CONF, ConfigDef.Type.INT, 100, ConfigDef.Importance.LOW, QUEUE_CAPACITY_DOC);
364371
}
365372

366373
/**
@@ -384,7 +391,8 @@ public HecConfig getHecConfig() {
384391
.setTrustStorePassword(trustStorePassword)
385392
.setHasCustomTrustStore(hasTrustStorePath)
386393
.setKerberosPrincipal(kerberosUserPrincipal)
387-
.setKerberosKeytabPath(kerberosKeytabPath);
394+
.setKerberosKeytabPath(kerberosKeytabPath)
395+
.setConcurrentHecQueueCapacity(queueCapacity);
388396
return config;
389397
}
390398

0 commit comments

Comments
 (0)