From d7bb9316c2810fb8ef02839bc8c061786531a724 Mon Sep 17 00:00:00 2001 From: Don Tregonning Date: Thu, 8 Nov 2018 18:17:00 -0800 Subject: [PATCH 1/2] header reworks --- dependency-reduced-pom.xml | 2 +- kafka-connect-splunk.iml | 4 ---- pom.xml | 2 +- .../splunk/kafka/connect/SplunkSinkTask.java | 19 ++++++++++++++++--- 4 files changed, 18 insertions(+), 9 deletions(-) diff --git a/dependency-reduced-pom.xml b/dependency-reduced-pom.xml index a1442fac..7471f220 100644 --- a/dependency-reduced-pom.xml +++ b/dependency-reduced-pom.xml @@ -4,7 +4,7 @@ com.github.splunk.kafka.connect splunk-kafka-connect splunk-kafka-connect - v1.1.0 + v1.1.1 diff --git a/kafka-connect-splunk.iml b/kafka-connect-splunk.iml index 2258fa16..9d688376 100644 --- a/kafka-connect-splunk.iml +++ b/kafka-connect-splunk.iml @@ -12,10 +12,6 @@ - - - - diff --git a/pom.xml b/pom.xml index 77d4b094..497976a6 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.github.splunk.kafka.connect splunk-kafka-connect - v1.1.0 + v1.1.1 splunk-kafka-connect diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java index 9b2c4dae..4e453acc 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java @@ -189,30 +189,43 @@ public String headerId(SinkRecord sinkRecord) { Header sourceHeader = headers.lastWithName(connectorConfig.headerSource); Header sourcetypeHeader = headers.lastWithName(connectorConfig.headerSourcetype); + Map metas = connectorConfig.topicMetas.get(sinkRecord.topic()); + + StringBuilder headerString = new StringBuilder(); if(indexHeader != null) { headerString.append(indexHeader.value().toString()); + } else { + headerString.append(metas.get("index")); } headerString.append(insertHeaderToken()); if(hostHeader != null) { headerString.append(hostHeader.value().toString()); + } else { + headerString.append("default-host"); } headerString.append(insertHeaderToken()); if(sourceHeader != null) { headerString.append(sourceHeader.value().toString()); + } else { + headerString.append(metas.get("source")); } headerString.append(insertHeaderToken()); if(sourcetypeHeader != null) { headerString.append(sourcetypeHeader.value().toString()); + } else { + headerString.append(metas.get("sourcetype")); } + headerString.append(insertHeaderToken()); + return headerString.toString(); } @@ -263,13 +276,13 @@ private void send(final EventBatch batch) { } private EventBatch createRawHeaderEventBatch(String splunkSinkRecord) { - String[] split = splunkSinkRecord.split("[$]{3}"); + String[] split = splunkSinkRecord.split("[$]{3}", -1); return RawEventBatch.factory() .setIndex(split[0]) - .setSourcetype(split[1]) + .setHost(split[1]) .setSource(split[2]) - .setHost(split[3]) + .setSourcetype(split[3]) .build(); } From f8a669d34845b6b2b259c3716263c9f39953a981 Mon Sep 17 00:00:00 2001 From: Don Tregonning Date: Tue, 11 Dec 2018 15:30:10 -0800 Subject: [PATCH 2/2] Code improvements from INGEST-6610 --- .../splunk/kafka/connect/SplunkSinkTask.java | 24 +++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java index 4e453acc..5f0ce394 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java @@ -25,6 +25,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.connect.header.Header; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.*; import org.slf4j.Logger; @@ -42,6 +44,16 @@ public final class SplunkSinkTask extends SinkTask implements PollerCallback { private long lastFlushed = System.currentTimeMillis(); private long threadId = Thread.currentThread().getId(); + private static final String HOSTNAME; + static { + String h = null; + try { + h = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + } + HOSTNAME = h; + } + @Override public void start(Map taskConfig) { connectorConfig = new SplunkSinkConnectorConfig(taskConfig); @@ -172,9 +184,10 @@ private void handleRecordsWithHeader(final Collection records) { Iterator>> itr = recordsWithSameHeaders.entrySet().iterator(); while(itr.hasNext()) { - Map.Entry set = itr.next(); - String splunkSinkRecordKey = (String)set.getKey(); - ArrayList recordArrayList = (ArrayList)set.getValue(); + Map.Entry> set = itr.next(); + String splunkSinkRecordKey = set.getKey(); + ArrayList recordArrayList = set.getValue(); + EventBatch batch = createRawHeaderEventBatch(splunkSinkRecordKey); sendEvents(recordArrayList, batch); } @@ -354,8 +367,9 @@ private Event createHecEventFrom(final SinkRecord record) { if(connectorConfig.hecEventFormatted) { try { event = objectMapper.readValue(record.value().toString(), JsonEvent.class); + event.addFields(connectorConfig.enrichments); } catch(Exception e) { - log.error("event does not follow correct HEC pre-formatted format", record.toString()); + log.error("event does not follow correct HEC pre-formatted format: {}", record.value().toString()); event = createHECEventNonFormatted(record); } } else { @@ -372,6 +386,8 @@ private Event createHecEventFrom(final SinkRecord record) { trackMetas.put("kafka_timestamp", String.valueOf(record.timestamp())); trackMetas.put("kafka_topic", record.topic()); trackMetas.put("kafka_partition", String.valueOf(record.kafkaPartition())); + if (HOSTNAME != null) + trackMetas.put("kafka_connect_host", HOSTNAME); event.addFields(trackMetas); } event.validate();