Skip to content

Commit 2cb7549

Browse files
authored
Merge pull request #387 from splunk/queue_capacity
Make queue capacity configurable
2 parents 3d96692 + b58840b commit 2cb7549

File tree

4 files changed

+56
-4
lines changed

4 files changed

+56
-4
lines changed

src/main/java/com/splunk/hecclient/ConcurrentHec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public ConcurrentHec(int numberOfThreads, boolean useAck, HecConfig config, Poll
3737
}
3838

3939
public ConcurrentHec(int numberOfThreads, boolean useAck, HecConfig config, PollerCallback cb, LoadBalancerInf loadBalancer) {
40-
batches = new LinkedBlockingQueue<>(100);
40+
batches = new LinkedBlockingQueue<>(config.getConcurrentHecQueueCapacity());
4141
ThreadFactory e = (Runnable r) -> new Thread(r, "Concurrent-HEC-worker");
4242
executorService = Executors.newFixedThreadPool(numberOfThreads, e);
4343
initHec(numberOfThreads, useAck, config, cb, loadBalancer);

src/main/java/com/splunk/hecclient/HecConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public final class HecConfig {
3939
private int lbPollInterval = 120; // in seconds
4040
private String kerberosPrincipal;
4141
private String kerberosKeytabPath;
42+
private int concurrentHecQueueCapacity = 100;
4243

4344
public HecConfig(List<String> uris, String token) {
4445
this.uris = uris;
@@ -101,6 +102,10 @@ public int getBackoffThresholdSeconds() {
101102
return backoffThresholdSeconds;
102103
}
103104

105+
public int getConcurrentHecQueueCapacity() {
106+
return concurrentHecQueueCapacity;
107+
}
108+
104109
public boolean getHasCustomTrustStore() { return hasCustomTrustStore; }
105110

106111
public String getTrustStorePath() { return trustStorePath; }
@@ -207,6 +212,11 @@ public HecConfig setKerberosKeytabPath(String kerberosKeytabPath) {
207212
return this;
208213
}
209214

215+
public HecConfig setConcurrentHecQueueCapacity(int concurrentHecQueueCapacity) {
216+
this.concurrentHecQueueCapacity = concurrentHecQueueCapacity;
217+
return this;
218+
}
219+
210220
public boolean kerberosAuthEnabled() {
211221
return !kerberosPrincipal().isEmpty();
212222
}

src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
5555
static final String SOCKET_TIMEOUT_CONF = "splunk.hec.socket.timeout"; // seconds
5656
static final String SSL_VALIDATE_CERTIFICATES_CONF = "splunk.hec.ssl.validate.certs";
5757
static final String ENABLE_COMPRESSSION_CONF = "splunk.hec.enable.compression";
58+
// only applicable when "splunk.hec.threads" > 1
59+
static final String QUEUE_CAPACITY_CONF = "splunk.hec.concurrent.queue.capacity";
60+
5861
// Acknowledgement Parameters
5962
// Use Ack
6063
static final String ACK_CONF = "splunk.hec.ack.enabled";
@@ -192,6 +195,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
192195
static final String HEADER_SOURCETYPE_DOC = "Header to use for Splunk Header Sourcetype";
193196
static final String HEADER_HOST_DOC = "Header to use for Splunk Header Host";
194197

198+
static final String QUEUE_CAPACITY_DOC = "This setting controls the queue capacity for concurrency";
195199
// Load Balancer
196200
static final String LB_POLL_INTERVAL_DOC = "This setting controls the load balancer polling interval. By default, "
197201
+ "this setting is 120 seconds.";
@@ -257,6 +261,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
257261
final boolean enableTimestampExtraction;
258262
final String regex;
259263
final String timestampFormat;
264+
final int queueCapacity;
260265

261266
SplunkSinkConnectorConfig(Map<String, String> taskConfig) {
262267
super(conf(), taskConfig);
@@ -312,7 +317,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
312317
regex = getString(REGEX_CONF);
313318
timestampFormat = getString(TIMESTAMP_FORMAT_CONF).trim();
314319
validateRegexForTimestamp(regex);
315-
320+
queueCapacity = getInt(QUEUE_CAPACITY_CONF);
321+
validateQueueCapacity(queueCapacity);
316322
}
317323

318324

@@ -360,7 +366,8 @@ public static ConfigDef conf() {
360366
.define(KERBEROS_KEYTAB_PATH_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, KERBEROS_KEYTAB_LOCATION_DOC)
361367
.define(ENABLE_TIMESTAMP_EXTRACTION_CONF, ConfigDef.Type.BOOLEAN, false , ConfigDef.Importance.MEDIUM, ENABLE_TIMESTAMP_EXTRACTION_DOC)
362368
.define(REGEX_CONF, ConfigDef.Type.STRING, "" , ConfigDef.Importance.MEDIUM, REGEX_DOC)
363-
.define(TIMESTAMP_FORMAT_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, TIMESTAMP_FORMAT_DOC);
369+
.define(TIMESTAMP_FORMAT_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, TIMESTAMP_FORMAT_DOC)
370+
.define(QUEUE_CAPACITY_CONF, ConfigDef.Type.INT, 100, ConfigDef.Importance.LOW, QUEUE_CAPACITY_DOC);
364371
}
365372

366373
/**
@@ -384,7 +391,8 @@ public HecConfig getHecConfig() {
384391
.setTrustStorePassword(trustStorePassword)
385392
.setHasCustomTrustStore(hasTrustStorePath)
386393
.setKerberosPrincipal(kerberosUserPrincipal)
387-
.setKerberosKeytabPath(kerberosKeytabPath);
394+
.setKerberosKeytabPath(kerberosKeytabPath)
395+
.setConcurrentHecQueueCapacity(queueCapacity);
388396
return config;
389397
}
390398

@@ -550,6 +558,12 @@ private void validateRegexForTimestamp(String regex) {
550558
}
551559
}
552560

561+
private void validateQueueCapacity(int queueCapacity) {
562+
if (queueCapacity <= 0) {
563+
throw new ConfigException("queue capacity should be greater than " + queueCapacity);
564+
}
565+
}
566+
553567
private static boolean getNamedGroupCandidates(String regex) {
554568
Matcher m = Pattern.compile("\\(\\?<([a-zA-Z][a-zA-Z0-9]*)>").matcher(regex);
555569
while (m.find()) {

src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,34 @@ public void testInvalidSplunkConfigurationsWithValidationEnabled() {
261261
Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs));
262262
}
263263

264+
@Test
265+
public void testValidQueueCapacity() {
266+
final Map<String, String> configs = new HashMap<>();
267+
addNecessaryConfigs(configs);
268+
SplunkSinkConnector connector = new SplunkSinkConnector();
269+
configs.put("splunk.hec.concurrent.queue.capacity", "100");
270+
configs.put("topics", "b");
271+
configs.put("splunk.indexes", "b");
272+
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
273+
clientInstance.client.setResponse(CloseableHttpClientMock.success);
274+
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
275+
Assertions.assertDoesNotThrow(()->connector.validate(configs));
276+
}
277+
278+
@Test
279+
public void testInvalidQueueCapacity() {
280+
final Map<String, String> configs = new HashMap<>();
281+
addNecessaryConfigs(configs);
282+
SplunkSinkConnector connector = new SplunkSinkConnector();
283+
configs.put("splunk.hec.concurrent.queue.capacity", "-1");
284+
configs.put("topics", "b");
285+
configs.put("splunk.indexes", "b");
286+
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
287+
clientInstance.client.setResponse(CloseableHttpClientMock.success);
288+
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
289+
Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs));
290+
}
291+
264292
private void addNecessaryConfigs(Map<String, String> configs) {
265293
configs.put(URI_CONF, TEST_URI);
266294
configs.put(TOKEN_CONF, "blah");

0 commit comments

Comments
 (0)