diff --git a/README.md b/README.md index 4bca4bb9..a1875176 100644 --- a/README.md +++ b/README.md @@ -107,6 +107,7 @@ Use the below schema to configure Splunk Connect for Kafka "splunk.hec.raw": "", "splunk.hec.raw.line.breaker": "", "splunk.hec.json.event.enrichment": "", + "splunk.hec.auto.extract.timestamp": "", "value.converter": "", "value.converter.schema.registry.url": "", "value.converter.schemas.enable": "", @@ -196,10 +197,11 @@ Use the below schema to configure Splunk Connect for Kafka |-------- |----------------------------|-----------------------| | `splunk.hec.raw.line.breaker` | Only applicable to /raw HEC endpoint. The setting is used to specify a custom line breaker to help Splunk separate the events correctly.
**NOTE:**
For example, you can specify `"#####"` as a special line breaker. Internally, the Splunk Kafka Connector will append this line breaker to every Kafka record to form a clear event boundary. The connector performs data injection in batch mode. On the Splunk platform side, you can configure **`props.conf`** to set up line breaker for the sourcetypes. Then the Splunk software will correctly break events for data flowing through /raw HEC endpoint. For questions on how and when to specify line breaker, go to the FAQ section.|`""`| ##### /event endpoint only -| Name | Description | Default Value | -|-------- |----------------------------|-----------------------| -| `splunk.hec.json.event.enrichment` | Only applicable to /event HEC endpoint. This setting is used to enrich raw data with extra metadata fields. It contains a list of key value pairs separated by ",". The configured enrichment metadata will be indexed along with raw event data by Splunk software.
**NOTE:**
Data enrichment for /event HEC endpoint is only available in Splunk Enterprise 6.5 and above. By default, this setting is empty. See ([Documentation](http://dev.splunk.com/view/event-collector/SP-CAAAE8Y#indexedfield)) for more information.
**Example:** `org=fin,bu=south-east-us`|| -| `splunk.hec.track.data` | When set to `true`, data loss and data injection latency metadata will be indexed along with raw data. This setting only works in conjunction with /event HEC endpoint (`"splunk.hec.raw" : "false"`). Valid settings are `true` or `false`. |`false`| +| Name | Description | Default Value | +|-------- |------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------| +| `splunk.hec.json.event.enrichment` | Only applicable to /event HEC endpoint. This setting is used to enrich raw data with extra metadata fields. It contains a list of key value pairs separated by ",". The configured enrichment metadata will be indexed along with raw event data by Splunk software.
**NOTE:**
Data enrichment for /event HEC endpoint is only available in Splunk Enterprise 6.5 and above. By default, this setting is empty. See ([Documentation](http://dev.splunk.com/view/event-collector/SP-CAAAE8Y#indexedfield)) for more information.
**Example:** `org=fin,bu=south-east-us` | | +| `splunk.hec.track.data` | When set to `true`, data loss and data injection latency metadata will be indexed along with raw data. This setting only works in conjunction with /event HEC endpoint (`"splunk.hec.raw" : "false"`). Valid settings are `true` or `false`. | `false` | +| `splunk.hec.auto.extract.timestamp` | When set to `true`, it forces Splunk HEC to extract the timestamp from the event envelope/event data. See [/services/collector/event](https://docs.splunk.com/Documentation/Splunk/9.1.1/RESTREF/RESTinput#services.2Fcollector.2Fevent) for more details. | `unset` | ### Headers Parameters #### Use Headers diff --git a/src/main/java/com/splunk/hecclient/HecConfig.java b/src/main/java/com/splunk/hecclient/HecConfig.java index b38d20bb..d3cb7dbc 100644 --- a/src/main/java/com/splunk/hecclient/HecConfig.java +++ b/src/main/java/com/splunk/hecclient/HecConfig.java @@ -40,6 +40,7 @@ public final class HecConfig { private String kerberosPrincipal; private String kerberosKeytabPath; private int concurrentHecQueueCapacity = 100; + private Boolean autoExtractTimestamp; public HecConfig(List uris, String token) { this.uris = uris; @@ -114,6 +115,8 @@ public int getConcurrentHecQueueCapacity() { public String getTrustStorePassword() { return trustStorePassword; } + public Boolean getAutoExtractTimestamp() { return autoExtractTimestamp; } + public HecConfig setDisableSSLCertVerification(boolean disableVerfication) { disableSSLCertVerification = disableVerfication; return this; @@ -217,6 +220,11 @@ public HecConfig setConcurrentHecQueueCapacity(int concurrentHecQueueCapacity) { return this; } + public HecConfig setAutoExtractTimestamp(Boolean autoExtractTimestamp) { + this.autoExtractTimestamp = autoExtractTimestamp; + return this; + } + public boolean kerberosAuthEnabled() { return !kerberosPrincipal().isEmpty(); } diff --git a/src/main/java/com/splunk/hecclient/HecURIBuilder.java b/src/main/java/com/splunk/hecclient/HecURIBuilder.java new file mode 100644 index 00000000..d9c60f50 --- /dev/null +++ b/src/main/java/com/splunk/hecclient/HecURIBuilder.java @@ -0,0 +1,32 @@ +package com.splunk.hecclient; + +import org.apache.http.client.utils.URIBuilder; + +import java.net.URI; +import java.net.URISyntaxException; + +public class HecURIBuilder { + public static final String AUTO_EXTRACT_TIMESTAMP_PARAMETER = "auto_extract_timestamp"; + + private final String baseUrl; + private final HecConfig hecConfig; + + public HecURIBuilder(String baseUrl, HecConfig hecConfig) { + this.baseUrl = baseUrl; + this.hecConfig = hecConfig; + } + + public URI getURI(String endpoint) { + try { + URIBuilder uriBuilder = new URIBuilder(baseUrl) + .setPath(endpoint); + + if (hecConfig.getAutoExtractTimestamp() != null) { + uriBuilder.addParameter(AUTO_EXTRACT_TIMESTAMP_PARAMETER, hecConfig.getAutoExtractTimestamp().toString()); + } + return uriBuilder.build(); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/main/java/com/splunk/hecclient/Indexer.java b/src/main/java/com/splunk/hecclient/Indexer.java index d45dbc7e..201535fb 100644 --- a/src/main/java/com/splunk/hecclient/Indexer.java +++ b/src/main/java/com/splunk/hecclient/Indexer.java @@ -20,6 +20,9 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.splunk.kafka.connect.VersionUtils; import com.sun.security.auth.module.Krb5LoginModule; + +import java.net.URI; +import java.net.URISyntaxException; import java.security.Principal; import java.security.PrivilegedAction; import java.util.HashMap; @@ -37,6 +40,7 @@ import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.client.utils.URIBuilder; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.message.BasicHeader; import org.apache.http.protocol.HttpContext; @@ -51,6 +55,8 @@ final class Indexer implements IndexerInf { private static final ObjectMapper jsonMapper = new ObjectMapper(); private HecConfig hecConfig; + + private HecURIBuilder hecURIBuilder; private Configuration config; private CloseableHttpClient httpClient; private HttpContext context; @@ -72,6 +78,7 @@ public Indexer(String baseUrl,CloseableHttpClient client,Poller poller,HecConfig this.hecToken = config.getToken(); this.poller = poller; this.context = HttpClientContext.create(); + this.hecURIBuilder = new HecURIBuilder(baseUrl, hecConfig); backPressure = 0; channel = new HecChannel(this); @@ -132,8 +139,8 @@ public HecChannel getChannel() { @Override public boolean send(final EventBatch batch) { String endpoint = batch.getRestEndpoint(); - String url = baseUrl + endpoint; - final HttpPost httpPost = new HttpPost(url); + URI uri = hecURIBuilder.getURI(endpoint); + final HttpPost httpPost = new HttpPost(uri); httpPost.setHeaders(headers); if (batch.isEnableCompression()) { httpPost.setHeader("Content-Encoding", "gzip"); diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java index f90db541..ea16b9d9 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java @@ -80,6 +80,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { static final String SSL_TRUSTSTORE_PATH_CONF = "splunk.hec.ssl.trust.store.path"; static final String SSL_TRUSTSTORE_TYPE_CONF = "splunk.hec.ssl.trust.store.type"; static final String SSL_TRUSTSTORE_PASSWORD_CONF = "splunk.hec.ssl.trust.store.password"; + static final String AUTO_EXTRACT_TIMESTAMP_CONF = "splunk.hec.auto.extract.timestamp"; + //Headers static final String HEADER_SUPPORT_CONF = "splunk.header.support"; static final String HEADER_CUSTOM_CONF = "splunk.header.custom"; @@ -187,6 +189,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { static final String SSL_TRUSTSTORE_PATH_DOC = "Path on the local disk to the certificate trust store."; static final String SSL_TRUSTSTORE_TYPE_DOC = "Type of the trust store (JKS, PKCS12, ...)."; static final String SSL_TRUSTSTORE_PASSWORD_DOC = "Password for the trust store."; + static final String AUTO_EXTRACT_TIMESTAMP_DOC = "When set to true, it forces Splunk HEC to extract the timestamp from event envelope/event data."; static final String HEADER_SUPPORT_DOC = "Setting will enable Kafka Record headers to be used for meta data override"; static final String HEADER_CUSTOM_DOC = "Setting will enable look for Record headers with these values and add them" @@ -264,8 +267,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { final String regex; final String timestampFormat; final int queueCapacity; - final String timeZone; + final Boolean autoExtractTimestamp; SplunkSinkConnectorConfig(Map taskConfig) { super(conf(), taskConfig); @@ -324,6 +327,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { validateRegexForTimestamp(regex); queueCapacity = getInt(QUEUE_CAPACITY_CONF); validateQueueCapacity(queueCapacity); + autoExtractTimestamp = getBoolean(AUTO_EXTRACT_TIMESTAMP_CONF); } @@ -341,6 +345,7 @@ public static ConfigDef conf() { .define(SSL_TRUSTSTORE_PATH_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PATH_DOC) .define(SSL_TRUSTSTORE_TYPE_CONF, ConfigDef.Type.STRING, "JKS", ConfigDef.Importance.LOW, SSL_TRUSTSTORE_TYPE_DOC) .define(SSL_TRUSTSTORE_PASSWORD_CONF, ConfigDef.Type.PASSWORD, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PASSWORD_DOC) + .define(AUTO_EXTRACT_TIMESTAMP_CONF, ConfigDef.Type.BOOLEAN, null, ConfigDef.Importance.LOW, AUTO_EXTRACT_TIMESTAMP_DOC) .define(EVENT_TIMEOUT_CONF, ConfigDef.Type.INT, 300, ConfigDef.Importance.MEDIUM, EVENT_TIMEOUT_DOC) .define(ACK_POLL_INTERVAL_CONF, ConfigDef.Type.INT, 10, ConfigDef.Importance.MEDIUM, ACK_POLL_INTERVAL_DOC) .define(ACK_POLL_THREADS_CONF, ConfigDef.Type.INT, 2, ConfigDef.Importance.MEDIUM, ACK_POLL_THREADS_DOC) @@ -398,7 +403,8 @@ public HecConfig getHecConfig() { .setHasCustomTrustStore(hasTrustStorePath) .setKerberosPrincipal(kerberosUserPrincipal) .setKerberosKeytabPath(kerberosKeytabPath) - .setConcurrentHecQueueCapacity(queueCapacity); + .setConcurrentHecQueueCapacity(queueCapacity) + .setAutoExtractTimestamp(autoExtractTimestamp); return config; } diff --git a/src/test/java/com/splunk/hecclient/HecURIBuilderTest.java b/src/test/java/com/splunk/hecclient/HecURIBuilderTest.java new file mode 100644 index 00000000..643ac636 --- /dev/null +++ b/src/test/java/com/splunk/hecclient/HecURIBuilderTest.java @@ -0,0 +1,50 @@ +package com.splunk.hecclient; + +import org.junit.Assert; +import org.junit.Test; + +import java.net.URI; +import java.util.Collections; + +import static com.splunk.hecclient.JsonEventBatch.ENDPOINT; + +public class HecURIBuilderTest { + private static final String BASE_URL = "https://localhost:8088"; + private static final String TOKEN = "mytoken"; + + @Test + public void testDefaultValues() { + HecConfig hecConfig = new HecConfig(Collections.emptyList(), TOKEN); + HecURIBuilder builder = new HecURIBuilder(BASE_URL, hecConfig); + + URI uri = builder.getURI(ENDPOINT); + + Assert.assertEquals("https://localhost:8088/services/collector/event", uri.toString()); + } + + @Test + public void testAutoExtractTimestamp() { + { + HecConfig hecConfig = new HecConfig(Collections.emptyList(), TOKEN) + .setAutoExtractTimestamp(true); + HecURIBuilder builder = new HecURIBuilder(BASE_URL, hecConfig); + + URI uri = builder.getURI(ENDPOINT); + + Assert.assertEquals("https://localhost:8088/services/collector/event?" + + HecURIBuilder.AUTO_EXTRACT_TIMESTAMP_PARAMETER + "=true", + uri.toString()); + } + { + HecConfig hecConfig = new HecConfig(Collections.emptyList(), TOKEN) + .setAutoExtractTimestamp(false); + HecURIBuilder builder = new HecURIBuilder(BASE_URL, hecConfig); + + URI uri = builder.getURI(ENDPOINT); + + Assert.assertEquals("https://localhost:8088/services/collector/event?" + + HecURIBuilder.AUTO_EXTRACT_TIMESTAMP_PARAMETER + "=false", + uri.toString()); + } + } +} \ No newline at end of file diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java index 2222ac4e..2198fae6 100644 --- a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java +++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java @@ -28,6 +28,9 @@ import org.junit.Assert; import org.junit.Test; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.time.Instant; import java.util.*; public class SplunkSinkTaskTest { @@ -266,14 +269,27 @@ public void putWithRawAndAck() { @Test public void checkExtractedTimestamp() { + + SplunkSinkTask task = new SplunkSinkTask(); - Collection record = createSinkRecords(1,"{\"id\": \"19\",\"host\":\"host-01\",\"source\":\"bu\",\"fields\":{\"hn\":\"hostname1\",\"CLASS\":\"class1\",\"cust_id\":\"000013934\",\"time\": \"Jun 13 2010 23:11:52.454 UTC\",\"category\":\"IFdata\",\"ifname\":\"LoopBack7\",\"IFdata.Bits received\":\"0\",\"IFdata.Bits sent\":\"0\"}"); UnitUtil uu = new UnitUtil(0); Map config = uu.createTaskConfig(); config.put(SplunkSinkConnectorConfig.RAW_CONF, String.valueOf(false)); config.put(SplunkSinkConnectorConfig.ENABLE_TIMESTAMP_EXTRACTION_CONF, String.valueOf(true)); config.put(SplunkSinkConnectorConfig.REGEX_CONF, "\\\"time\\\":\\s*\\\"(?