From 4e58b43b6e2ae82f90bf8c2c84000930a8b4b8d4 Mon Sep 17 00:00:00 2001 From: Ludovic Boutros Date: Wed, 18 Oct 2023 15:12:49 +0200 Subject: [PATCH 1/4] Feat: Add auto_extract_timestamp parameter --- .../java/com/splunk/hecclient/HecConfig.java | 8 +++ .../com/splunk/hecclient/HecURIBuilder.java | 32 ++++++++++++ .../java/com/splunk/hecclient/Indexer.java | 11 +++- .../connect/SplunkSinkConnectorConfig.java | 10 +++- .../splunk/hecclient/HecURIBuilderTest.java | 50 +++++++++++++++++++ 5 files changed, 107 insertions(+), 4 deletions(-) create mode 100644 src/main/java/com/splunk/hecclient/HecURIBuilder.java create mode 100644 src/test/java/com/splunk/hecclient/HecURIBuilderTest.java diff --git a/src/main/java/com/splunk/hecclient/HecConfig.java b/src/main/java/com/splunk/hecclient/HecConfig.java index b38d20bb..d3cb7dbc 100644 --- a/src/main/java/com/splunk/hecclient/HecConfig.java +++ b/src/main/java/com/splunk/hecclient/HecConfig.java @@ -40,6 +40,7 @@ public final class HecConfig { private String kerberosPrincipal; private String kerberosKeytabPath; private int concurrentHecQueueCapacity = 100; + private Boolean autoExtractTimestamp; public HecConfig(List uris, String token) { this.uris = uris; @@ -114,6 +115,8 @@ public int getConcurrentHecQueueCapacity() { public String getTrustStorePassword() { return trustStorePassword; } + public Boolean getAutoExtractTimestamp() { return autoExtractTimestamp; } + public HecConfig setDisableSSLCertVerification(boolean disableVerfication) { disableSSLCertVerification = disableVerfication; return this; @@ -217,6 +220,11 @@ public HecConfig setConcurrentHecQueueCapacity(int concurrentHecQueueCapacity) { return this; } + public HecConfig setAutoExtractTimestamp(Boolean autoExtractTimestamp) { + this.autoExtractTimestamp = autoExtractTimestamp; + return this; + } + public boolean kerberosAuthEnabled() { return !kerberosPrincipal().isEmpty(); } diff --git a/src/main/java/com/splunk/hecclient/HecURIBuilder.java b/src/main/java/com/splunk/hecclient/HecURIBuilder.java new file mode 100644 index 00000000..d9c60f50 --- /dev/null +++ b/src/main/java/com/splunk/hecclient/HecURIBuilder.java @@ -0,0 +1,32 @@ +package com.splunk.hecclient; + +import org.apache.http.client.utils.URIBuilder; + +import java.net.URI; +import java.net.URISyntaxException; + +public class HecURIBuilder { + public static final String AUTO_EXTRACT_TIMESTAMP_PARAMETER = "auto_extract_timestamp"; + + private final String baseUrl; + private final HecConfig hecConfig; + + public HecURIBuilder(String baseUrl, HecConfig hecConfig) { + this.baseUrl = baseUrl; + this.hecConfig = hecConfig; + } + + public URI getURI(String endpoint) { + try { + URIBuilder uriBuilder = new URIBuilder(baseUrl) + .setPath(endpoint); + + if (hecConfig.getAutoExtractTimestamp() != null) { + uriBuilder.addParameter(AUTO_EXTRACT_TIMESTAMP_PARAMETER, hecConfig.getAutoExtractTimestamp().toString()); + } + return uriBuilder.build(); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/main/java/com/splunk/hecclient/Indexer.java b/src/main/java/com/splunk/hecclient/Indexer.java index d45dbc7e..201535fb 100644 --- a/src/main/java/com/splunk/hecclient/Indexer.java +++ b/src/main/java/com/splunk/hecclient/Indexer.java @@ -20,6 +20,9 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.splunk.kafka.connect.VersionUtils; import com.sun.security.auth.module.Krb5LoginModule; + +import java.net.URI; +import java.net.URISyntaxException; import java.security.Principal; import java.security.PrivilegedAction; import java.util.HashMap; @@ -37,6 +40,7 @@ import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.client.utils.URIBuilder; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.message.BasicHeader; import org.apache.http.protocol.HttpContext; @@ -51,6 +55,8 @@ final class Indexer implements IndexerInf { private static final ObjectMapper jsonMapper = new ObjectMapper(); private HecConfig hecConfig; + + private HecURIBuilder hecURIBuilder; private Configuration config; private CloseableHttpClient httpClient; private HttpContext context; @@ -72,6 +78,7 @@ public Indexer(String baseUrl,CloseableHttpClient client,Poller poller,HecConfig this.hecToken = config.getToken(); this.poller = poller; this.context = HttpClientContext.create(); + this.hecURIBuilder = new HecURIBuilder(baseUrl, hecConfig); backPressure = 0; channel = new HecChannel(this); @@ -132,8 +139,8 @@ public HecChannel getChannel() { @Override public boolean send(final EventBatch batch) { String endpoint = batch.getRestEndpoint(); - String url = baseUrl + endpoint; - final HttpPost httpPost = new HttpPost(url); + URI uri = hecURIBuilder.getURI(endpoint); + final HttpPost httpPost = new HttpPost(uri); httpPost.setHeaders(headers); if (batch.isEnableCompression()) { httpPost.setHeader("Content-Encoding", "gzip"); diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java index f90db541..1ee02dae 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java @@ -80,6 +80,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { static final String SSL_TRUSTSTORE_PATH_CONF = "splunk.hec.ssl.trust.store.path"; static final String SSL_TRUSTSTORE_TYPE_CONF = "splunk.hec.ssl.trust.store.type"; static final String SSL_TRUSTSTORE_PASSWORD_CONF = "splunk.hec.ssl.trust.store.password"; + static final String AUTO_EXTRACT_TIMESTAMP_CONF = "splunk.hec.auto.extract.timestamp"; + //Headers static final String HEADER_SUPPORT_CONF = "splunk.header.support"; static final String HEADER_CUSTOM_CONF = "splunk.header.custom"; @@ -187,6 +189,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { static final String SSL_TRUSTSTORE_PATH_DOC = "Path on the local disk to the certificate trust store."; static final String SSL_TRUSTSTORE_TYPE_DOC = "Type of the trust store (JKS, PKCS12, ...)."; static final String SSL_TRUSTSTORE_PASSWORD_DOC = "Password for the trust store."; + static final String AUTO_EXTRACT_TIMESTAMP_DOC = "Sends timestamped events to HTTP Event Collector using the Splunk platform JSON event protocol when auto_extract_timestamp is set to \"true\" in the /event URL."; static final String HEADER_SUPPORT_DOC = "Setting will enable Kafka Record headers to be used for meta data override"; static final String HEADER_CUSTOM_DOC = "Setting will enable look for Record headers with these values and add them" @@ -264,8 +267,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { final String regex; final String timestampFormat; final int queueCapacity; - final String timeZone; + final Boolean autoExtractTimestamp; SplunkSinkConnectorConfig(Map taskConfig) { super(conf(), taskConfig); @@ -324,6 +327,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { validateRegexForTimestamp(regex); queueCapacity = getInt(QUEUE_CAPACITY_CONF); validateQueueCapacity(queueCapacity); + autoExtractTimestamp = getBoolean(AUTO_EXTRACT_TIMESTAMP_CONF); } @@ -341,6 +345,7 @@ public static ConfigDef conf() { .define(SSL_TRUSTSTORE_PATH_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PATH_DOC) .define(SSL_TRUSTSTORE_TYPE_CONF, ConfigDef.Type.STRING, "JKS", ConfigDef.Importance.LOW, SSL_TRUSTSTORE_TYPE_DOC) .define(SSL_TRUSTSTORE_PASSWORD_CONF, ConfigDef.Type.PASSWORD, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PASSWORD_DOC) + .define(AUTO_EXTRACT_TIMESTAMP_CONF, ConfigDef.Type.BOOLEAN, null, ConfigDef.Importance.LOW, AUTO_EXTRACT_TIMESTAMP_DOC) .define(EVENT_TIMEOUT_CONF, ConfigDef.Type.INT, 300, ConfigDef.Importance.MEDIUM, EVENT_TIMEOUT_DOC) .define(ACK_POLL_INTERVAL_CONF, ConfigDef.Type.INT, 10, ConfigDef.Importance.MEDIUM, ACK_POLL_INTERVAL_DOC) .define(ACK_POLL_THREADS_CONF, ConfigDef.Type.INT, 2, ConfigDef.Importance.MEDIUM, ACK_POLL_THREADS_DOC) @@ -398,7 +403,8 @@ public HecConfig getHecConfig() { .setHasCustomTrustStore(hasTrustStorePath) .setKerberosPrincipal(kerberosUserPrincipal) .setKerberosKeytabPath(kerberosKeytabPath) - .setConcurrentHecQueueCapacity(queueCapacity); + .setConcurrentHecQueueCapacity(queueCapacity) + .setAutoExtractTimestamp(autoExtractTimestamp); return config; } diff --git a/src/test/java/com/splunk/hecclient/HecURIBuilderTest.java b/src/test/java/com/splunk/hecclient/HecURIBuilderTest.java new file mode 100644 index 00000000..643ac636 --- /dev/null +++ b/src/test/java/com/splunk/hecclient/HecURIBuilderTest.java @@ -0,0 +1,50 @@ +package com.splunk.hecclient; + +import org.junit.Assert; +import org.junit.Test; + +import java.net.URI; +import java.util.Collections; + +import static com.splunk.hecclient.JsonEventBatch.ENDPOINT; + +public class HecURIBuilderTest { + private static final String BASE_URL = "https://localhost:8088"; + private static final String TOKEN = "mytoken"; + + @Test + public void testDefaultValues() { + HecConfig hecConfig = new HecConfig(Collections.emptyList(), TOKEN); + HecURIBuilder builder = new HecURIBuilder(BASE_URL, hecConfig); + + URI uri = builder.getURI(ENDPOINT); + + Assert.assertEquals("https://localhost:8088/services/collector/event", uri.toString()); + } + + @Test + public void testAutoExtractTimestamp() { + { + HecConfig hecConfig = new HecConfig(Collections.emptyList(), TOKEN) + .setAutoExtractTimestamp(true); + HecURIBuilder builder = new HecURIBuilder(BASE_URL, hecConfig); + + URI uri = builder.getURI(ENDPOINT); + + Assert.assertEquals("https://localhost:8088/services/collector/event?" + + HecURIBuilder.AUTO_EXTRACT_TIMESTAMP_PARAMETER + "=true", + uri.toString()); + } + { + HecConfig hecConfig = new HecConfig(Collections.emptyList(), TOKEN) + .setAutoExtractTimestamp(false); + HecURIBuilder builder = new HecURIBuilder(BASE_URL, hecConfig); + + URI uri = builder.getURI(ENDPOINT); + + Assert.assertEquals("https://localhost:8088/services/collector/event?" + + HecURIBuilder.AUTO_EXTRACT_TIMESTAMP_PARAMETER + "=false", + uri.toString()); + } + } +} \ No newline at end of file From f085b261f50edb8fae9143bfa78d8801a8c7b07c Mon Sep 17 00:00:00 2001 From: Ludovic Boutros Date: Wed, 18 Oct 2023 15:13:59 +0200 Subject: [PATCH 2/4] Fix: date extraction test should use locale --- .../kafka/connect/SplunkSinkTaskTest.java | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java index 998b9238..fe06f0ac 100644 --- a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java +++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java @@ -30,6 +30,8 @@ import org.junit.Test; import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.time.Instant; import java.util.*; public class SplunkSinkTaskTest { @@ -249,14 +251,27 @@ public void putWithRawAndAck() { @Test public void checkExtractedTimestamp() { + + SplunkSinkTask task = new SplunkSinkTask(); - Collection record = createSinkRecords(1,"{\"id\": \"19\",\"host\":\"host-01\",\"source\":\"bu\",\"fields\":{\"hn\":\"hostname1\",\"CLASS\":\"class1\",\"cust_id\":\"000013934\",\"time\": \"Jun 13 2010 23:11:52.454 UTC\",\"category\":\"IFdata\",\"ifname\":\"LoopBack7\",\"IFdata.Bits received\":\"0\",\"IFdata.Bits sent\":\"0\"}"); UnitUtil uu = new UnitUtil(0); Map config = uu.createTaskConfig(); config.put(SplunkSinkConnectorConfig.RAW_CONF, String.valueOf(false)); config.put(SplunkSinkConnectorConfig.ENABLE_TIMESTAMP_EXTRACTION_CONF, String.valueOf(true)); config.put(SplunkSinkConnectorConfig.REGEX_CONF, "\\\"time\\\":\\s*\\\"(?