Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/ci_build_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ on:
description: Publish token for Semgrep
required: true

permissions:
checks: write
pull-requests: write

jobs:
workflow_approval:
name: Approve workflow
Expand Down Expand Up @@ -99,7 +103,7 @@ jobs:
path: /tmp/splunk-kafka-connect*.jar

- name: Publish Unit Test Results
uses: EnricoMi/publish-unit-test-result-action/composite@v1
uses: EnricoMi/publish-unit-test-result-action@v2
if: always()
with:
check_name: Unit Test Results
Expand Down
59 changes: 25 additions & 34 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<junit.version>4.13.2</junit.version>
<junit.jupiter.version>5.3.2</junit.jupiter.version>
<junit.vintage.version>5.3.2</junit.vintage.version>
<junit.platform.version>1.3.2</junit.platform.version>
<junit.jupiter.version>5.9.2</junit.jupiter.version>
<junit.vintage.version>5.9.2</junit.vintage.version>
<junit.platform.version>1.9.2</junit.platform.version>
<jackson.version>2.14.2</jackson.version>
<kafka.version>3.4.0</kafka.version>
<slf4j.version>2.0.7</slf4j.version>
</properties>

<dependencies>
Expand All @@ -26,20 +29,20 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.12.6</version>
<version>${jackson.version}</version>
<scope>compile</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.7.1</version>
<version>${jackson.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>2.8.1</version>
<version>${kafka.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -79,7 +82,7 @@
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
<version>4.5.14</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
Expand Down Expand Up @@ -108,74 +111,62 @@
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.14</version>
<version>1.15</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.2</version>
<version>2.20.0</version>
<scope>compile</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.26</version>
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.26</version>
<version>${slf4j.version}</version>
<scope>compile</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-cli/commons-cli -->
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.4</version>
<version>1.5.0</version>
</dependency>
<!-- To avoid compiler warnings about @API annotations in JUnit code -->
<dependency>
<groupId>org.apiguardian</groupId>
<artifactId>apiguardian-api</artifactId>
<version>1.0.0</version>
<version>1.1.2</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.7</version>
<version>3.12.0</version>
<scope>compile</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/io.confluent/kafka-connect-protobuf-converter -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-protobuf-converter</artifactId>
<version>7.1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.21.7</version>
<version>3.22.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.9.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.jetbrains.kotlin/kotlin-stdlib -->
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib</artifactId>
<version>1.7.0</version>
<version>2.10.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
Expand All @@ -197,7 +188,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jxr-plugin</artifactId>
<version>2.3</version>
<version>3.3.0</version>
</plugin>
</plugins>
</reporting>
Expand All @@ -207,7 +198,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<version>3.4.1</version>
<configuration>
</configuration>
<executions>
Expand All @@ -228,7 +219,7 @@
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<version>3.11.0</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
Expand All @@ -240,7 +231,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>2.17</version>
<version>3.2.1</version>
<executions>
<execution>
<id>validate</id>
Expand All @@ -260,7 +251,7 @@
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.8.6</version>
<version>0.8.9</version>
<configuration>
<excludes>
<exclude>**/*com/splunk/hecclient/examples/**/*</exclude>
Expand Down Expand Up @@ -311,7 +302,7 @@

<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.2</version>
<version>3.0.0</version>
<configuration>
<!-- Sets the VM argument line used when unit tests are run. -->
<argLine>${surefireArgLine}</argLine>
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/splunk/hecclient/ConcurrentHec.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public ConcurrentHec(int numberOfThreads, boolean useAck, HecConfig config, Poll
}

public ConcurrentHec(int numberOfThreads, boolean useAck, HecConfig config, PollerCallback cb, LoadBalancerInf loadBalancer) {
batches = new LinkedBlockingQueue<>(100);
batches = new LinkedBlockingQueue<>(config.getConcurrentHecQueueCapacity());
ThreadFactory e = (Runnable r) -> new Thread(r, "Concurrent-HEC-worker");
executorService = Executors.newFixedThreadPool(numberOfThreads, e);
initHec(numberOfThreads, useAck, config, cb, loadBalancer);
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/com/splunk/hecclient/HecConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public final class HecConfig {
private int lbPollInterval = 120; // in seconds
private String kerberosPrincipal;
private String kerberosKeytabPath;
private int concurrentHecQueueCapacity = 100;

public HecConfig(List<String> uris, String token) {
this.uris = uris;
Expand Down Expand Up @@ -101,6 +102,10 @@ public int getBackoffThresholdSeconds() {
return backoffThresholdSeconds;
}

public int getConcurrentHecQueueCapacity() {
return concurrentHecQueueCapacity;
}

public boolean getHasCustomTrustStore() { return hasCustomTrustStore; }

public String getTrustStorePath() { return trustStorePath; }
Expand Down Expand Up @@ -207,6 +212,11 @@ public HecConfig setKerberosKeytabPath(String kerberosKeytabPath) {
return this;
}

public HecConfig setConcurrentHecQueueCapacity(int concurrentHecQueueCapacity) {
this.concurrentHecQueueCapacity = concurrentHecQueueCapacity;
return this;
}

public boolean kerberosAuthEnabled() {
return !kerberosPrincipal().isEmpty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
static final String SOCKET_TIMEOUT_CONF = "splunk.hec.socket.timeout"; // seconds
static final String SSL_VALIDATE_CERTIFICATES_CONF = "splunk.hec.ssl.validate.certs";
static final String ENABLE_COMPRESSSION_CONF = "splunk.hec.enable.compression";
// only applicable when "splunk.hec.threads" > 1
static final String QUEUE_CAPACITY_CONF = "splunk.hec.concurrent.queue.capacity";

// Acknowledgement Parameters
// Use Ack
static final String ACK_CONF = "splunk.hec.ack.enabled";
Expand Down Expand Up @@ -192,6 +195,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
static final String HEADER_SOURCETYPE_DOC = "Header to use for Splunk Header Sourcetype";
static final String HEADER_HOST_DOC = "Header to use for Splunk Header Host";

static final String QUEUE_CAPACITY_DOC = "This setting controls the queue capacity for concurrency";
// Load Balancer
static final String LB_POLL_INTERVAL_DOC = "This setting controls the load balancer polling interval. By default, "
+ "this setting is 120 seconds.";
Expand Down Expand Up @@ -257,6 +261,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
final boolean enableTimestampExtraction;
final String regex;
final String timestampFormat;
final int queueCapacity;

SplunkSinkConnectorConfig(Map<String, String> taskConfig) {
super(conf(), taskConfig);
Expand Down Expand Up @@ -312,7 +317,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
regex = getString(REGEX_CONF);
timestampFormat = getString(TIMESTAMP_FORMAT_CONF).trim();
validateRegexForTimestamp(regex);

queueCapacity = getInt(QUEUE_CAPACITY_CONF);
validateQueueCapacity(queueCapacity);
}


Expand Down Expand Up @@ -360,7 +366,8 @@ public static ConfigDef conf() {
.define(KERBEROS_KEYTAB_PATH_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, KERBEROS_KEYTAB_LOCATION_DOC)
.define(ENABLE_TIMESTAMP_EXTRACTION_CONF, ConfigDef.Type.BOOLEAN, false , ConfigDef.Importance.MEDIUM, ENABLE_TIMESTAMP_EXTRACTION_DOC)
.define(REGEX_CONF, ConfigDef.Type.STRING, "" , ConfigDef.Importance.MEDIUM, REGEX_DOC)
.define(TIMESTAMP_FORMAT_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, TIMESTAMP_FORMAT_DOC);
.define(TIMESTAMP_FORMAT_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, TIMESTAMP_FORMAT_DOC)
.define(QUEUE_CAPACITY_CONF, ConfigDef.Type.INT, 100, ConfigDef.Importance.LOW, QUEUE_CAPACITY_DOC);
}

/**
Expand All @@ -384,7 +391,8 @@ public HecConfig getHecConfig() {
.setTrustStorePassword(trustStorePassword)
.setHasCustomTrustStore(hasTrustStorePath)
.setKerberosPrincipal(kerberosUserPrincipal)
.setKerberosKeytabPath(kerberosKeytabPath);
.setKerberosKeytabPath(kerberosKeytabPath)
.setConcurrentHecQueueCapacity(queueCapacity);
return config;
}

Expand Down Expand Up @@ -550,6 +558,12 @@ private void validateRegexForTimestamp(String regex) {
}
}

private void validateQueueCapacity(int queueCapacity) {
if (queueCapacity <= 0) {
throw new ConfigException("queue capacity should be greater than " + queueCapacity);
}
}

private static boolean getNamedGroupCandidates(String regex) {
Matcher m = Pattern.compile("\\(\\?<([a-zA-Z][a-zA-Z0-9]*)>").matcher(regex);
while (m.find()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,34 @@ public void testInvalidSplunkConfigurationsWithValidationEnabled() {
Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs));
}

@Test
public void testValidQueueCapacity() {
final Map<String, String> configs = new HashMap<>();
addNecessaryConfigs(configs);
SplunkSinkConnector connector = new SplunkSinkConnector();
configs.put("splunk.hec.concurrent.queue.capacity", "100");
configs.put("topics", "b");
configs.put("splunk.indexes", "b");
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
clientInstance.client.setResponse(CloseableHttpClientMock.success);
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
Assertions.assertDoesNotThrow(()->connector.validate(configs));
}

@Test
public void testInvalidQueueCapacity() {
final Map<String, String> configs = new HashMap<>();
addNecessaryConfigs(configs);
SplunkSinkConnector connector = new SplunkSinkConnector();
configs.put("splunk.hec.concurrent.queue.capacity", "-1");
configs.put("topics", "b");
configs.put("splunk.indexes", "b");
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
clientInstance.client.setResponse(CloseableHttpClientMock.success);
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs));
}

private void addNecessaryConfigs(Map<String, String> configs) {
configs.put(URI_CONF, TEST_URI);
configs.put(TOKEN_CONF, "blah");
Expand Down
28 changes: 14 additions & 14 deletions test/testcases/test_data_onboarding.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@ def test_data_onboarding(self, setup, test_scenario, test_input, expected):
logger.info("Splunk received %s events in the last hour", len(events))
assert len(events) == expected

@pytest.mark.parametrize("test_scenario, test_input, expected", [
("protobuf", "sourcetype::protobuf", 1),
])
def test_proto_data_onboarding(self, setup, test_scenario, test_input, expected):
logger.info(f"testing {test_scenario} input={test_input} expected={expected} event(s)")
search_query = f"index={setup['splunk_index']} | search {test_input}"
logger.info(search_query)
events = check_events_from_splunk(start_time="-15m@m",
url=setup["splunkd_url"],
user=setup["splunk_user"],
query=[f"search {search_query}"],
password=setup["splunk_password"])
logger.info("Splunk received %s events in the last hour", len(events))
assert len(events) == expected
# @pytest.mark.parametrize("test_scenario, test_input, expected", [
# ("protobuf", "sourcetype::protobuf", 1),
# ])
# def test_proto_data_onboarding(self, setup, test_scenario, test_input, expected):
# logger.info(f"testing {test_scenario} input={test_input} expected={expected} event(s)")
# search_query = f"index={setup['splunk_index']} | search {test_input}"
# logger.info(search_query)
# events = check_events_from_splunk(start_time="-15m@m",
# url=setup["splunkd_url"],
# user=setup["splunk_user"],
# query=[f"search {search_query}"],
# password=setup["splunk_password"])
# logger.info("Splunk received %s events in the last hour", len(events))
# assert len(events) == expected

@pytest.mark.parametrize("test_scenario, test_input, expected", [
("date_format", "latest=1365209605.000 sourcetype::date_format", "2010-06-13T23:11:52.454+00:00"),
Expand Down