From 8c7159084f6578e97c65e3fbe0a2f94f80724889 Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Fri, 21 Apr 2023 15:32:31 +0530 Subject: [PATCH 1/2] Make queue capacity configurable --- .../java/com/splunk/hecclient/ConcurrentHec.java | 2 +- src/main/java/com/splunk/hecclient/HecConfig.java | 10 ++++++++++ .../kafka/connect/SplunkSinkConnectorConfig.java | 12 ++++++++++-- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/splunk/hecclient/ConcurrentHec.java b/src/main/java/com/splunk/hecclient/ConcurrentHec.java index 804b9ba1..7a57b8ed 100644 --- a/src/main/java/com/splunk/hecclient/ConcurrentHec.java +++ b/src/main/java/com/splunk/hecclient/ConcurrentHec.java @@ -37,7 +37,7 @@ public ConcurrentHec(int numberOfThreads, boolean useAck, HecConfig config, Poll } public ConcurrentHec(int numberOfThreads, boolean useAck, HecConfig config, PollerCallback cb, LoadBalancerInf loadBalancer) { - batches = new LinkedBlockingQueue<>(100); + batches = new LinkedBlockingQueue<>(config.getConcurrentHecQueueCapacity()); ThreadFactory e = (Runnable r) -> new Thread(r, "Concurrent-HEC-worker"); executorService = Executors.newFixedThreadPool(numberOfThreads, e); initHec(numberOfThreads, useAck, config, cb, loadBalancer); diff --git a/src/main/java/com/splunk/hecclient/HecConfig.java b/src/main/java/com/splunk/hecclient/HecConfig.java index d29eccb5..b38d20bb 100644 --- a/src/main/java/com/splunk/hecclient/HecConfig.java +++ b/src/main/java/com/splunk/hecclient/HecConfig.java @@ -39,6 +39,7 @@ public final class HecConfig { private int lbPollInterval = 120; // in seconds private String kerberosPrincipal; private String kerberosKeytabPath; + private int concurrentHecQueueCapacity = 100; public HecConfig(List uris, String token) { this.uris = uris; @@ -101,6 +102,10 @@ public int getBackoffThresholdSeconds() { return backoffThresholdSeconds; } + public int getConcurrentHecQueueCapacity() { + return concurrentHecQueueCapacity; + } + public boolean getHasCustomTrustStore() { return hasCustomTrustStore; } public String getTrustStorePath() { return trustStorePath; } @@ -207,6 +212,11 @@ public HecConfig setKerberosKeytabPath(String kerberosKeytabPath) { return this; } + public HecConfig setConcurrentHecQueueCapacity(int concurrentHecQueueCapacity) { + this.concurrentHecQueueCapacity = concurrentHecQueueCapacity; + return this; + } + public boolean kerberosAuthEnabled() { return !kerberosPrincipal().isEmpty(); } diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java index 2d18f587..3a0ecd95 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java @@ -55,6 +55,9 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { static final String SOCKET_TIMEOUT_CONF = "splunk.hec.socket.timeout"; // seconds static final String SSL_VALIDATE_CERTIFICATES_CONF = "splunk.hec.ssl.validate.certs"; static final String ENABLE_COMPRESSSION_CONF = "splunk.hec.enable.compression"; + // only applicable when "splunk.hec.threads" > 1 + static final String QUEUE_CAPACITY_CONF = "splunk.hec.concurrent.queue.capacity"; + // Acknowledgement Parameters // Use Ack static final String ACK_CONF = "splunk.hec.ack.enabled"; @@ -192,6 +195,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { static final String HEADER_SOURCETYPE_DOC = "Header to use for Splunk Header Sourcetype"; static final String HEADER_HOST_DOC = "Header to use for Splunk Header Host"; + static final String QUEUE_CAPACITY_DOC = "This setting controls the queue capacity for concurrency"; // Load Balancer static final String LB_POLL_INTERVAL_DOC = "This setting controls the load balancer polling interval. By default, " + "this setting is 120 seconds."; @@ -257,6 +261,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { final boolean enableTimestampExtraction; final String regex; final String timestampFormat; + final int queueCapacity; SplunkSinkConnectorConfig(Map taskConfig) { super(conf(), taskConfig); @@ -312,6 +317,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { regex = getString(REGEX_CONF); timestampFormat = getString(TIMESTAMP_FORMAT_CONF).trim(); validateRegexForTimestamp(regex); + queueCapacity = getInt(QUEUE_CAPACITY_CONF); } @@ -360,7 +366,8 @@ public static ConfigDef conf() { .define(KERBEROS_KEYTAB_PATH_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, KERBEROS_KEYTAB_LOCATION_DOC) .define(ENABLE_TIMESTAMP_EXTRACTION_CONF, ConfigDef.Type.BOOLEAN, false , ConfigDef.Importance.MEDIUM, ENABLE_TIMESTAMP_EXTRACTION_DOC) .define(REGEX_CONF, ConfigDef.Type.STRING, "" , ConfigDef.Importance.MEDIUM, REGEX_DOC) - .define(TIMESTAMP_FORMAT_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, TIMESTAMP_FORMAT_DOC); + .define(TIMESTAMP_FORMAT_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, TIMESTAMP_FORMAT_DOC) + .define(QUEUE_CAPACITY_CONF, ConfigDef.Type.INT, 100, ConfigDef.Importance.LOW, QUEUE_CAPACITY_DOC); } /** @@ -384,7 +391,8 @@ public HecConfig getHecConfig() { .setTrustStorePassword(trustStorePassword) .setHasCustomTrustStore(hasTrustStorePath) .setKerberosPrincipal(kerberosUserPrincipal) - .setKerberosKeytabPath(kerberosKeytabPath); + .setKerberosKeytabPath(kerberosKeytabPath) + .setConcurrentHecQueueCapacity(queueCapacity); return config; } From 99b0606669cdc2893f65eeed659d69e1794bf447 Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Wed, 26 Apr 2023 17:42:03 +0530 Subject: [PATCH 2/2] Add test cases --- .../connect/SplunkSinkConnectorConfig.java | 8 +++++- .../connect/SplunkSinkConnecterTest.java | 28 +++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java index 3a0ecd95..84097c79 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java @@ -318,7 +318,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { timestampFormat = getString(TIMESTAMP_FORMAT_CONF).trim(); validateRegexForTimestamp(regex); queueCapacity = getInt(QUEUE_CAPACITY_CONF); - + validateQueueCapacity(queueCapacity); } @@ -558,6 +558,12 @@ private void validateRegexForTimestamp(String regex) { } } + private void validateQueueCapacity(int queueCapacity) { + if (queueCapacity <= 0) { + throw new ConfigException("queue capacity should be greater than " + queueCapacity); + } + } + private static boolean getNamedGroupCandidates(String regex) { Matcher m = Pattern.compile("\\(\\?<([a-zA-Z][a-zA-Z0-9]*)>").matcher(regex); while (m.find()) { diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java index 4864bfaf..652ff6a4 100644 --- a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java +++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java @@ -261,6 +261,34 @@ public void testInvalidSplunkConfigurationsWithValidationEnabled() { Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs)); } + @Test + public void testValidQueueCapacity() { + final Map configs = new HashMap<>(); + addNecessaryConfigs(configs); + SplunkSinkConnector connector = new SplunkSinkConnector(); + configs.put("splunk.hec.concurrent.queue.capacity", "100"); + configs.put("topics", "b"); + configs.put("splunk.indexes", "b"); + MockHecClientWrapper clientInstance = new MockHecClientWrapper(); + clientInstance.client.setResponse(CloseableHttpClientMock.success); + ((SplunkSinkConnector) connector).setHecInstance(clientInstance); + Assertions.assertDoesNotThrow(()->connector.validate(configs)); + } + + @Test + public void testInvalidQueueCapacity() { + final Map configs = new HashMap<>(); + addNecessaryConfigs(configs); + SplunkSinkConnector connector = new SplunkSinkConnector(); + configs.put("splunk.hec.concurrent.queue.capacity", "-1"); + configs.put("topics", "b"); + configs.put("splunk.indexes", "b"); + MockHecClientWrapper clientInstance = new MockHecClientWrapper(); + clientInstance.client.setResponse(CloseableHttpClientMock.success); + ((SplunkSinkConnector) connector).setHecInstance(clientInstance); + Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs)); + } + private void addNecessaryConfigs(Map configs) { configs.put(URI_CONF, TEST_URI); configs.put(TOKEN_CONF, "blah");