Skip to content

Commit 4e58b43

Browse files
author
Ludovic Boutros
committed
Feat: Add auto_extract_timestamp parameter
1 parent 4a2e368 commit 4e58b43

File tree

5 files changed

+107
-4
lines changed

5 files changed

+107
-4
lines changed

src/main/java/com/splunk/hecclient/HecConfig.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public final class HecConfig {
4040
private String kerberosPrincipal;
4141
private String kerberosKeytabPath;
4242
private int concurrentHecQueueCapacity = 100;
43+
private Boolean autoExtractTimestamp;
4344

4445
public HecConfig(List<String> uris, String token) {
4546
this.uris = uris;
@@ -114,6 +115,8 @@ public int getConcurrentHecQueueCapacity() {
114115

115116
public String getTrustStorePassword() { return trustStorePassword; }
116117

118+
public Boolean getAutoExtractTimestamp() { return autoExtractTimestamp; }
119+
117120
public HecConfig setDisableSSLCertVerification(boolean disableVerfication) {
118121
disableSSLCertVerification = disableVerfication;
119122
return this;
@@ -217,6 +220,11 @@ public HecConfig setConcurrentHecQueueCapacity(int concurrentHecQueueCapacity) {
217220
return this;
218221
}
219222

223+
public HecConfig setAutoExtractTimestamp(Boolean autoExtractTimestamp) {
224+
this.autoExtractTimestamp = autoExtractTimestamp;
225+
return this;
226+
}
227+
220228
public boolean kerberosAuthEnabled() {
221229
return !kerberosPrincipal().isEmpty();
222230
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package com.splunk.hecclient;
2+
3+
import org.apache.http.client.utils.URIBuilder;
4+
5+
import java.net.URI;
6+
import java.net.URISyntaxException;
7+
8+
public class HecURIBuilder {
9+
public static final String AUTO_EXTRACT_TIMESTAMP_PARAMETER = "auto_extract_timestamp";
10+
11+
private final String baseUrl;
12+
private final HecConfig hecConfig;
13+
14+
public HecURIBuilder(String baseUrl, HecConfig hecConfig) {
15+
this.baseUrl = baseUrl;
16+
this.hecConfig = hecConfig;
17+
}
18+
19+
public URI getURI(String endpoint) {
20+
try {
21+
URIBuilder uriBuilder = new URIBuilder(baseUrl)
22+
.setPath(endpoint);
23+
24+
if (hecConfig.getAutoExtractTimestamp() != null) {
25+
uriBuilder.addParameter(AUTO_EXTRACT_TIMESTAMP_PARAMETER, hecConfig.getAutoExtractTimestamp().toString());
26+
}
27+
return uriBuilder.build();
28+
} catch (URISyntaxException e) {
29+
throw new RuntimeException(e);
30+
}
31+
}
32+
}

src/main/java/com/splunk/hecclient/Indexer.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import com.fasterxml.jackson.databind.node.ObjectNode;
2121
import com.splunk.kafka.connect.VersionUtils;
2222
import com.sun.security.auth.module.Krb5LoginModule;
23+
24+
import java.net.URI;
25+
import java.net.URISyntaxException;
2326
import java.security.Principal;
2427
import java.security.PrivilegedAction;
2528
import java.util.HashMap;
@@ -37,6 +40,7 @@
3740
import org.apache.http.client.methods.HttpPost;
3841
import org.apache.http.client.methods.HttpUriRequest;
3942
import org.apache.http.client.protocol.HttpClientContext;
43+
import org.apache.http.client.utils.URIBuilder;
4044
import org.apache.http.impl.client.CloseableHttpClient;
4145
import org.apache.http.message.BasicHeader;
4246
import org.apache.http.protocol.HttpContext;
@@ -51,6 +55,8 @@ final class Indexer implements IndexerInf {
5155
private static final ObjectMapper jsonMapper = new ObjectMapper();
5256

5357
private HecConfig hecConfig;
58+
59+
private HecURIBuilder hecURIBuilder;
5460
private Configuration config;
5561
private CloseableHttpClient httpClient;
5662
private HttpContext context;
@@ -72,6 +78,7 @@ public Indexer(String baseUrl,CloseableHttpClient client,Poller poller,HecConfig
7278
this.hecToken = config.getToken();
7379
this.poller = poller;
7480
this.context = HttpClientContext.create();
81+
this.hecURIBuilder = new HecURIBuilder(baseUrl, hecConfig);
7582
backPressure = 0;
7683

7784
channel = new HecChannel(this);
@@ -132,8 +139,8 @@ public HecChannel getChannel() {
132139
@Override
133140
public boolean send(final EventBatch batch) {
134141
String endpoint = batch.getRestEndpoint();
135-
String url = baseUrl + endpoint;
136-
final HttpPost httpPost = new HttpPost(url);
142+
URI uri = hecURIBuilder.getURI(endpoint);
143+
final HttpPost httpPost = new HttpPost(uri);
137144
httpPost.setHeaders(headers);
138145
if (batch.isEnableCompression()) {
139146
httpPost.setHeader("Content-Encoding", "gzip");

src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
8080
static final String SSL_TRUSTSTORE_PATH_CONF = "splunk.hec.ssl.trust.store.path";
8181
static final String SSL_TRUSTSTORE_TYPE_CONF = "splunk.hec.ssl.trust.store.type";
8282
static final String SSL_TRUSTSTORE_PASSWORD_CONF = "splunk.hec.ssl.trust.store.password";
83+
static final String AUTO_EXTRACT_TIMESTAMP_CONF = "splunk.hec.auto.extract.timestamp";
84+
8385
//Headers
8486
static final String HEADER_SUPPORT_CONF = "splunk.header.support";
8587
static final String HEADER_CUSTOM_CONF = "splunk.header.custom";
@@ -187,6 +189,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
187189
static final String SSL_TRUSTSTORE_PATH_DOC = "Path on the local disk to the certificate trust store.";
188190
static final String SSL_TRUSTSTORE_TYPE_DOC = "Type of the trust store (JKS, PKCS12, ...).";
189191
static final String SSL_TRUSTSTORE_PASSWORD_DOC = "Password for the trust store.";
192+
static final String AUTO_EXTRACT_TIMESTAMP_DOC = "Sends timestamped events to HTTP Event Collector using the Splunk platform JSON event protocol when auto_extract_timestamp is set to \"true\" in the /event URL.";
190193

191194
static final String HEADER_SUPPORT_DOC = "Setting will enable Kafka Record headers to be used for meta data override";
192195
static final String HEADER_CUSTOM_DOC = "Setting will enable look for Record headers with these values and add them"
@@ -264,8 +267,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
264267
final String regex;
265268
final String timestampFormat;
266269
final int queueCapacity;
267-
268270
final String timeZone;
271+
final Boolean autoExtractTimestamp;
269272

270273
SplunkSinkConnectorConfig(Map<String, String> taskConfig) {
271274
super(conf(), taskConfig);
@@ -324,6 +327,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
324327
validateRegexForTimestamp(regex);
325328
queueCapacity = getInt(QUEUE_CAPACITY_CONF);
326329
validateQueueCapacity(queueCapacity);
330+
autoExtractTimestamp = getBoolean(AUTO_EXTRACT_TIMESTAMP_CONF);
327331
}
328332

329333

@@ -341,6 +345,7 @@ public static ConfigDef conf() {
341345
.define(SSL_TRUSTSTORE_PATH_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PATH_DOC)
342346
.define(SSL_TRUSTSTORE_TYPE_CONF, ConfigDef.Type.STRING, "JKS", ConfigDef.Importance.LOW, SSL_TRUSTSTORE_TYPE_DOC)
343347
.define(SSL_TRUSTSTORE_PASSWORD_CONF, ConfigDef.Type.PASSWORD, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PASSWORD_DOC)
348+
.define(AUTO_EXTRACT_TIMESTAMP_CONF, ConfigDef.Type.BOOLEAN, null, ConfigDef.Importance.LOW, AUTO_EXTRACT_TIMESTAMP_DOC)
344349
.define(EVENT_TIMEOUT_CONF, ConfigDef.Type.INT, 300, ConfigDef.Importance.MEDIUM, EVENT_TIMEOUT_DOC)
345350
.define(ACK_POLL_INTERVAL_CONF, ConfigDef.Type.INT, 10, ConfigDef.Importance.MEDIUM, ACK_POLL_INTERVAL_DOC)
346351
.define(ACK_POLL_THREADS_CONF, ConfigDef.Type.INT, 2, ConfigDef.Importance.MEDIUM, ACK_POLL_THREADS_DOC)
@@ -398,7 +403,8 @@ public HecConfig getHecConfig() {
398403
.setHasCustomTrustStore(hasTrustStorePath)
399404
.setKerberosPrincipal(kerberosUserPrincipal)
400405
.setKerberosKeytabPath(kerberosKeytabPath)
401-
.setConcurrentHecQueueCapacity(queueCapacity);
406+
.setConcurrentHecQueueCapacity(queueCapacity)
407+
.setAutoExtractTimestamp(autoExtractTimestamp);
402408
return config;
403409
}
404410

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package com.splunk.hecclient;
2+
3+
import org.junit.Assert;
4+
import org.junit.Test;
5+
6+
import java.net.URI;
7+
import java.util.Collections;
8+
9+
import static com.splunk.hecclient.JsonEventBatch.ENDPOINT;
10+
11+
public class HecURIBuilderTest {
12+
private static final String BASE_URL = "https://localhost:8088";
13+
private static final String TOKEN = "mytoken";
14+
15+
@Test
16+
public void testDefaultValues() {
17+
HecConfig hecConfig = new HecConfig(Collections.emptyList(), TOKEN);
18+
HecURIBuilder builder = new HecURIBuilder(BASE_URL, hecConfig);
19+
20+
URI uri = builder.getURI(ENDPOINT);
21+
22+
Assert.assertEquals("https://localhost:8088/services/collector/event", uri.toString());
23+
}
24+
25+
@Test
26+
public void testAutoExtractTimestamp() {
27+
{
28+
HecConfig hecConfig = new HecConfig(Collections.emptyList(), TOKEN)
29+
.setAutoExtractTimestamp(true);
30+
HecURIBuilder builder = new HecURIBuilder(BASE_URL, hecConfig);
31+
32+
URI uri = builder.getURI(ENDPOINT);
33+
34+
Assert.assertEquals("https://localhost:8088/services/collector/event?" +
35+
HecURIBuilder.AUTO_EXTRACT_TIMESTAMP_PARAMETER + "=true",
36+
uri.toString());
37+
}
38+
{
39+
HecConfig hecConfig = new HecConfig(Collections.emptyList(), TOKEN)
40+
.setAutoExtractTimestamp(false);
41+
HecURIBuilder builder = new HecURIBuilder(BASE_URL, hecConfig);
42+
43+
URI uri = builder.getURI(ENDPOINT);
44+
45+
Assert.assertEquals("https://localhost:8088/services/collector/event?" +
46+
HecURIBuilder.AUTO_EXTRACT_TIMESTAMP_PARAMETER + "=false",
47+
uri.toString());
48+
}
49+
}
50+
}

0 commit comments

Comments
 (0)