diff --git a/.github/workflows/ci_build_test.yaml b/.github/workflows/ci_build_test.yaml
index b16211a6..2306467f 100644
--- a/.github/workflows/ci_build_test.yaml
+++ b/.github/workflows/ci_build_test.yaml
@@ -15,6 +15,10 @@ on:
description: Publish token for Semgrep
required: true
+permissions:
+ checks: write
+ pull-requests: write
+
jobs:
workflow_approval:
name: Approve workflow
@@ -99,7 +103,7 @@ jobs:
path: /tmp/splunk-kafka-connect*.jar
- name: Publish Unit Test Results
- uses: EnricoMi/publish-unit-test-result-action/composite@v1
+ uses: EnricoMi/publish-unit-test-result-action@v2
if: always()
with:
check_name: Unit Test Results
diff --git a/pom.xml b/pom.xml
index c4c4988c..36bc2e8c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -15,9 +15,12 @@
1.8
1.8
4.13.2
- 5.3.2
- 5.3.2
- 1.3.2
+ 5.9.2
+ 5.9.2
+ 1.9.2
+ 2.14.2
+ 3.4.0
+ 2.0.7
@@ -26,20 +29,20 @@
com.fasterxml.jackson.core
jackson-core
- 2.12.6
+ ${jackson.version}
compile
com.fasterxml.jackson.core
jackson-databind
- 2.12.7.1
+ ${jackson.version}
compile
org.apache.kafka
connect-api
- 2.8.1
+ ${kafka.version}
compile
@@ -79,7 +82,7 @@
org.apache.httpcomponents
httpclient
- 4.5.13
+ 4.5.14
commons-logging
@@ -108,14 +111,14 @@
commons-codec
commons-codec
- 1.14
+ 1.15
compile
org.apache.logging.log4j
log4j-core
- 2.17.2
+ 2.20.0
compile
@@ -123,59 +126,47 @@
org.slf4j
slf4j-simple
- 1.7.26
+ ${slf4j.version}
test
org.slf4j
slf4j-api
- 1.7.26
+ ${slf4j.version}
compile
commons-cli
commons-cli
- 1.4
+ 1.5.0
org.apiguardian
apiguardian-api
- 1.0.0
+ 1.1.2
test
org.apache.commons
commons-lang3
- 3.7
+ 3.12.0
compile
-
-
- io.confluent
- kafka-connect-protobuf-converter
- 7.1.1
-
com.google.protobuf
protobuf-java
- 3.21.7
+ 3.22.2
com.google.code.gson
gson
- 2.9.0
-
-
-
- org.jetbrains.kotlin
- kotlin-stdlib
- 1.7.0
+ 2.10.1
@@ -197,7 +188,7 @@
org.apache.maven.plugins
maven-jxr-plugin
- 2.3
+ 3.3.0
@@ -207,7 +198,7 @@
org.apache.maven.plugins
maven-shade-plugin
- 3.1.0
+ 3.4.1
@@ -228,7 +219,7 @@
maven-compiler-plugin
- 3.7.0
+ 3.11.0
${java.version}
${java.version}
@@ -240,7 +231,7 @@
org.apache.maven.plugins
maven-checkstyle-plugin
- 2.17
+ 3.2.1
validate
@@ -260,7 +251,7 @@
org.jacoco
jacoco-maven-plugin
- 0.8.6
+ 0.8.9
**/*com/splunk/hecclient/examples/**/*
@@ -311,7 +302,7 @@
maven-surefire-plugin
- 2.22.2
+ 3.0.0
${surefireArgLine}
diff --git a/src/main/java/com/splunk/hecclient/ConcurrentHec.java b/src/main/java/com/splunk/hecclient/ConcurrentHec.java
index 804b9ba1..7a57b8ed 100644
--- a/src/main/java/com/splunk/hecclient/ConcurrentHec.java
+++ b/src/main/java/com/splunk/hecclient/ConcurrentHec.java
@@ -37,7 +37,7 @@ public ConcurrentHec(int numberOfThreads, boolean useAck, HecConfig config, Poll
}
public ConcurrentHec(int numberOfThreads, boolean useAck, HecConfig config, PollerCallback cb, LoadBalancerInf loadBalancer) {
- batches = new LinkedBlockingQueue<>(100);
+ batches = new LinkedBlockingQueue<>(config.getConcurrentHecQueueCapacity());
ThreadFactory e = (Runnable r) -> new Thread(r, "Concurrent-HEC-worker");
executorService = Executors.newFixedThreadPool(numberOfThreads, e);
initHec(numberOfThreads, useAck, config, cb, loadBalancer);
diff --git a/src/main/java/com/splunk/hecclient/HecConfig.java b/src/main/java/com/splunk/hecclient/HecConfig.java
index d29eccb5..b38d20bb 100644
--- a/src/main/java/com/splunk/hecclient/HecConfig.java
+++ b/src/main/java/com/splunk/hecclient/HecConfig.java
@@ -39,6 +39,7 @@ public final class HecConfig {
private int lbPollInterval = 120; // in seconds
private String kerberosPrincipal;
private String kerberosKeytabPath;
+ private int concurrentHecQueueCapacity = 100;
public HecConfig(List uris, String token) {
this.uris = uris;
@@ -101,6 +102,10 @@ public int getBackoffThresholdSeconds() {
return backoffThresholdSeconds;
}
+ public int getConcurrentHecQueueCapacity() {
+ return concurrentHecQueueCapacity;
+ }
+
public boolean getHasCustomTrustStore() { return hasCustomTrustStore; }
public String getTrustStorePath() { return trustStorePath; }
@@ -207,6 +212,11 @@ public HecConfig setKerberosKeytabPath(String kerberosKeytabPath) {
return this;
}
+ public HecConfig setConcurrentHecQueueCapacity(int concurrentHecQueueCapacity) {
+ this.concurrentHecQueueCapacity = concurrentHecQueueCapacity;
+ return this;
+ }
+
public boolean kerberosAuthEnabled() {
return !kerberosPrincipal().isEmpty();
}
diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java
index 2d18f587..84097c79 100644
--- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java
+++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java
@@ -55,6 +55,9 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
static final String SOCKET_TIMEOUT_CONF = "splunk.hec.socket.timeout"; // seconds
static final String SSL_VALIDATE_CERTIFICATES_CONF = "splunk.hec.ssl.validate.certs";
static final String ENABLE_COMPRESSSION_CONF = "splunk.hec.enable.compression";
+ // only applicable when "splunk.hec.threads" > 1
+ static final String QUEUE_CAPACITY_CONF = "splunk.hec.concurrent.queue.capacity";
+
// Acknowledgement Parameters
// Use Ack
static final String ACK_CONF = "splunk.hec.ack.enabled";
@@ -192,6 +195,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
static final String HEADER_SOURCETYPE_DOC = "Header to use for Splunk Header Sourcetype";
static final String HEADER_HOST_DOC = "Header to use for Splunk Header Host";
+ static final String QUEUE_CAPACITY_DOC = "This setting controls the queue capacity for concurrency";
// Load Balancer
static final String LB_POLL_INTERVAL_DOC = "This setting controls the load balancer polling interval. By default, "
+ "this setting is 120 seconds.";
@@ -257,6 +261,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
final boolean enableTimestampExtraction;
final String regex;
final String timestampFormat;
+ final int queueCapacity;
SplunkSinkConnectorConfig(Map taskConfig) {
super(conf(), taskConfig);
@@ -312,7 +317,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
regex = getString(REGEX_CONF);
timestampFormat = getString(TIMESTAMP_FORMAT_CONF).trim();
validateRegexForTimestamp(regex);
-
+ queueCapacity = getInt(QUEUE_CAPACITY_CONF);
+ validateQueueCapacity(queueCapacity);
}
@@ -360,7 +366,8 @@ public static ConfigDef conf() {
.define(KERBEROS_KEYTAB_PATH_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, KERBEROS_KEYTAB_LOCATION_DOC)
.define(ENABLE_TIMESTAMP_EXTRACTION_CONF, ConfigDef.Type.BOOLEAN, false , ConfigDef.Importance.MEDIUM, ENABLE_TIMESTAMP_EXTRACTION_DOC)
.define(REGEX_CONF, ConfigDef.Type.STRING, "" , ConfigDef.Importance.MEDIUM, REGEX_DOC)
- .define(TIMESTAMP_FORMAT_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, TIMESTAMP_FORMAT_DOC);
+ .define(TIMESTAMP_FORMAT_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, TIMESTAMP_FORMAT_DOC)
+ .define(QUEUE_CAPACITY_CONF, ConfigDef.Type.INT, 100, ConfigDef.Importance.LOW, QUEUE_CAPACITY_DOC);
}
/**
@@ -384,7 +391,8 @@ public HecConfig getHecConfig() {
.setTrustStorePassword(trustStorePassword)
.setHasCustomTrustStore(hasTrustStorePath)
.setKerberosPrincipal(kerberosUserPrincipal)
- .setKerberosKeytabPath(kerberosKeytabPath);
+ .setKerberosKeytabPath(kerberosKeytabPath)
+ .setConcurrentHecQueueCapacity(queueCapacity);
return config;
}
@@ -550,6 +558,12 @@ private void validateRegexForTimestamp(String regex) {
}
}
+ private void validateQueueCapacity(int queueCapacity) {
+ if (queueCapacity <= 0) {
+ throw new ConfigException("queue capacity should be greater than " + queueCapacity);
+ }
+ }
+
private static boolean getNamedGroupCandidates(String regex) {
Matcher m = Pattern.compile("\\(\\?<([a-zA-Z][a-zA-Z0-9]*)>").matcher(regex);
while (m.find()) {
diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java
index 4864bfaf..652ff6a4 100644
--- a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java
+++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java
@@ -261,6 +261,34 @@ public void testInvalidSplunkConfigurationsWithValidationEnabled() {
Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs));
}
+ @Test
+ public void testValidQueueCapacity() {
+ final Map configs = new HashMap<>();
+ addNecessaryConfigs(configs);
+ SplunkSinkConnector connector = new SplunkSinkConnector();
+ configs.put("splunk.hec.concurrent.queue.capacity", "100");
+ configs.put("topics", "b");
+ configs.put("splunk.indexes", "b");
+ MockHecClientWrapper clientInstance = new MockHecClientWrapper();
+ clientInstance.client.setResponse(CloseableHttpClientMock.success);
+ ((SplunkSinkConnector) connector).setHecInstance(clientInstance);
+ Assertions.assertDoesNotThrow(()->connector.validate(configs));
+ }
+
+ @Test
+ public void testInvalidQueueCapacity() {
+ final Map configs = new HashMap<>();
+ addNecessaryConfigs(configs);
+ SplunkSinkConnector connector = new SplunkSinkConnector();
+ configs.put("splunk.hec.concurrent.queue.capacity", "-1");
+ configs.put("topics", "b");
+ configs.put("splunk.indexes", "b");
+ MockHecClientWrapper clientInstance = new MockHecClientWrapper();
+ clientInstance.client.setResponse(CloseableHttpClientMock.success);
+ ((SplunkSinkConnector) connector).setHecInstance(clientInstance);
+ Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs));
+ }
+
private void addNecessaryConfigs(Map configs) {
configs.put(URI_CONF, TEST_URI);
configs.put(TOKEN_CONF, "blah");
diff --git a/test/testcases/test_data_onboarding.py b/test/testcases/test_data_onboarding.py
index 839157b8..b3116cb4 100644
--- a/test/testcases/test_data_onboarding.py
+++ b/test/testcases/test_data_onboarding.py
@@ -27,20 +27,20 @@ def test_data_onboarding(self, setup, test_scenario, test_input, expected):
logger.info("Splunk received %s events in the last hour", len(events))
assert len(events) == expected
- @pytest.mark.parametrize("test_scenario, test_input, expected", [
- ("protobuf", "sourcetype::protobuf", 1),
- ])
- def test_proto_data_onboarding(self, setup, test_scenario, test_input, expected):
- logger.info(f"testing {test_scenario} input={test_input} expected={expected} event(s)")
- search_query = f"index={setup['splunk_index']} | search {test_input}"
- logger.info(search_query)
- events = check_events_from_splunk(start_time="-15m@m",
- url=setup["splunkd_url"],
- user=setup["splunk_user"],
- query=[f"search {search_query}"],
- password=setup["splunk_password"])
- logger.info("Splunk received %s events in the last hour", len(events))
- assert len(events) == expected
+ # @pytest.mark.parametrize("test_scenario, test_input, expected", [
+ # ("protobuf", "sourcetype::protobuf", 1),
+ # ])
+ # def test_proto_data_onboarding(self, setup, test_scenario, test_input, expected):
+ # logger.info(f"testing {test_scenario} input={test_input} expected={expected} event(s)")
+ # search_query = f"index={setup['splunk_index']} | search {test_input}"
+ # logger.info(search_query)
+ # events = check_events_from_splunk(start_time="-15m@m",
+ # url=setup["splunkd_url"],
+ # user=setup["splunk_user"],
+ # query=[f"search {search_query}"],
+ # password=setup["splunk_password"])
+ # logger.info("Splunk received %s events in the last hour", len(events))
+ # assert len(events) == expected
@pytest.mark.parametrize("test_scenario, test_input, expected", [
("date_format", "latest=1365209605.000 sourcetype::date_format", "2010-06-13T23:11:52.454+00:00"),