diff --git a/dependency-reduced-pom.xml b/dependency-reduced-pom.xml index 6c52b247..bd4710fb 100644 --- a/dependency-reduced-pom.xml +++ b/dependency-reduced-pom.xml @@ -1,214 +1,214 @@ - - - 4.0.0 - com.github.splunk.kafka.connect - splunk-kafka-connect - splunk-kafka-connect - v1.1.1-alpha - - - - maven-shade-plugin - 3.1.0 - - - package - - shade - - - - - org.apache.kafka:kafka-clients - - - - - - - - - maven-compiler-plugin - 3.7.0 - - ${java.version} - ${java.version} - -Xlint:all - true - true - - - - maven-checkstyle-plugin - 2.17 - - - validate - validate - - check - - - google_checks.xml - - - - - - org.jacoco - jacoco-maven-plugin - 0.7.9 - - - pre-unit-test - - prepare-agent - - - ${project.build.directory}/coverage-reports/jacoco-ut.exec - surefireArgLine - - - - post-unit-test - test - - report - - - ${project.build.directory}/coverage-reports/jacoco-ut.exec - ${project.build.directory}/coverage-reports/jacoco-ut - - - - - - **/*com/splunk/hecclient/examples/**/* - - - - - maven-surefire-plugin - 2.19.1 - - - org.junit.platform - junit-platform-surefire-provider - ${junit.platform.version} - - - - ${surefireArgLine} - ${skip.unit.tests} - false - - **/Test*.java - **/*Test.java - **/*Tests.java - **/*TestCase.java - - - slow - - - - - - - - org.junit.jupiter - junit-jupiter-api - 5.0.1 - test - - - opentest4j - org.opentest4j - - - junit-platform-commons - org.junit.platform - - - - - junit - junit - 4.12 - test - - - hamcrest-core - org.hamcrest - - - - - org.junit.platform - junit-platform-launcher - 1.0.1 - test - - - junit-platform-engine - org.junit.platform - - - - - org.junit.jupiter - junit-jupiter-engine - 5.0.1 - test - - - junit-platform-engine - org.junit.platform - - - - - org.junit.vintage - junit-vintage-engine - 4.12.1 - test - - - junit-platform-engine - org.junit.platform - - - - - org.slf4j - slf4j-simple - 1.7.25 - test - - - org.apiguardian - apiguardian-api - 1.0.0 - test - - - - - - maven-jxr-plugin - 2.3 - - - - - 1.0.1 - 1.8 - ${junit.version}.1 - 5.0.1 - 1.8 - UTF-8 - 1.8 - 4.12 - - - + + + 4.0.0 + com.github.splunk.kafka.connect + splunk-kafka-connect + splunk-kafka-connect + v1.1.1 + + + + maven-shade-plugin + 3.1.0 + + + package + + shade + + + + + org.apache.kafka:kafka-clients + + + + + + + + + maven-compiler-plugin + 3.7.0 + + ${java.version} + ${java.version} + -Xlint:all + true + true + + + + maven-checkstyle-plugin + 2.17 + + + validate + validate + + check + + + google_checks.xml + + + + + + org.jacoco + jacoco-maven-plugin + 0.7.9 + + + pre-unit-test + + prepare-agent + + + ${project.build.directory}/coverage-reports/jacoco-ut.exec + surefireArgLine + + + + post-unit-test + test + + report + + + ${project.build.directory}/coverage-reports/jacoco-ut.exec + ${project.build.directory}/coverage-reports/jacoco-ut + + + + + + **/*com/splunk/hecclient/examples/**/* + + + + + maven-surefire-plugin + 2.19.1 + + + org.junit.platform + junit-platform-surefire-provider + ${junit.platform.version} + + + + ${surefireArgLine} + ${skip.unit.tests} + false + + **/Test*.java + **/*Test.java + **/*Tests.java + **/*TestCase.java + + + slow + + + + + + + + org.junit.jupiter + junit-jupiter-api + 5.0.1 + test + + + opentest4j + org.opentest4j + + + junit-platform-commons + org.junit.platform + + + + + junit + junit + 4.12 + test + + + hamcrest-core + org.hamcrest + + + + + org.junit.platform + junit-platform-launcher + 1.0.1 + test + + + junit-platform-engine + org.junit.platform + + + + + org.junit.jupiter + junit-jupiter-engine + 5.0.1 + test + + + junit-platform-engine + org.junit.platform + + + + + org.junit.vintage + junit-vintage-engine + 4.12.1 + test + + + junit-platform-engine + org.junit.platform + + + + + org.slf4j + slf4j-simple + 1.7.25 + test + + + org.apiguardian + apiguardian-api + 1.0.0 + test + + + + + + maven-jxr-plugin + 2.3 + + + + + 1.0.1 + 1.8 + ${junit.version}.1 + 5.0.1 + 1.8 + UTF-8 + 1.8 + 4.12 + + + diff --git a/kafka-connect-splunk.iml b/kafka-connect-splunk.iml index 2258fa16..9d688376 100644 --- a/kafka-connect-splunk.iml +++ b/kafka-connect-splunk.iml @@ -12,10 +12,6 @@ - - - - diff --git a/pom.xml b/pom.xml index 553f5631..497976a6 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.github.splunk.kafka.connect splunk-kafka-connect - v1.1.1-alpha + v1.1.1 splunk-kafka-connect diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java index 9b2c4dae..5f0ce394 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java @@ -25,6 +25,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.connect.header.Header; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.*; import org.slf4j.Logger; @@ -42,6 +44,16 @@ public final class SplunkSinkTask extends SinkTask implements PollerCallback { private long lastFlushed = System.currentTimeMillis(); private long threadId = Thread.currentThread().getId(); + private static final String HOSTNAME; + static { + String h = null; + try { + h = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + } + HOSTNAME = h; + } + @Override public void start(Map taskConfig) { connectorConfig = new SplunkSinkConnectorConfig(taskConfig); @@ -172,9 +184,10 @@ private void handleRecordsWithHeader(final Collection records) { Iterator>> itr = recordsWithSameHeaders.entrySet().iterator(); while(itr.hasNext()) { - Map.Entry set = itr.next(); - String splunkSinkRecordKey = (String)set.getKey(); - ArrayList recordArrayList = (ArrayList)set.getValue(); + Map.Entry> set = itr.next(); + String splunkSinkRecordKey = set.getKey(); + ArrayList recordArrayList = set.getValue(); + EventBatch batch = createRawHeaderEventBatch(splunkSinkRecordKey); sendEvents(recordArrayList, batch); } @@ -189,30 +202,43 @@ public String headerId(SinkRecord sinkRecord) { Header sourceHeader = headers.lastWithName(connectorConfig.headerSource); Header sourcetypeHeader = headers.lastWithName(connectorConfig.headerSourcetype); + Map metas = connectorConfig.topicMetas.get(sinkRecord.topic()); + + StringBuilder headerString = new StringBuilder(); if(indexHeader != null) { headerString.append(indexHeader.value().toString()); + } else { + headerString.append(metas.get("index")); } headerString.append(insertHeaderToken()); if(hostHeader != null) { headerString.append(hostHeader.value().toString()); + } else { + headerString.append("default-host"); } headerString.append(insertHeaderToken()); if(sourceHeader != null) { headerString.append(sourceHeader.value().toString()); + } else { + headerString.append(metas.get("source")); } headerString.append(insertHeaderToken()); if(sourcetypeHeader != null) { headerString.append(sourcetypeHeader.value().toString()); + } else { + headerString.append(metas.get("sourcetype")); } + headerString.append(insertHeaderToken()); + return headerString.toString(); } @@ -263,13 +289,13 @@ private void send(final EventBatch batch) { } private EventBatch createRawHeaderEventBatch(String splunkSinkRecord) { - String[] split = splunkSinkRecord.split("[$]{3}"); + String[] split = splunkSinkRecord.split("[$]{3}", -1); return RawEventBatch.factory() .setIndex(split[0]) - .setSourcetype(split[1]) + .setHost(split[1]) .setSource(split[2]) - .setHost(split[3]) + .setSourcetype(split[3]) .build(); } @@ -341,8 +367,9 @@ private Event createHecEventFrom(final SinkRecord record) { if(connectorConfig.hecEventFormatted) { try { event = objectMapper.readValue(record.value().toString(), JsonEvent.class); + event.addFields(connectorConfig.enrichments); } catch(Exception e) { - log.error("event does not follow correct HEC pre-formatted format", record.toString()); + log.error("event does not follow correct HEC pre-formatted format: {}", record.value().toString()); event = createHECEventNonFormatted(record); } } else { @@ -359,6 +386,8 @@ private Event createHecEventFrom(final SinkRecord record) { trackMetas.put("kafka_timestamp", String.valueOf(record.timestamp())); trackMetas.put("kafka_topic", record.topic()); trackMetas.put("kafka_partition", String.valueOf(record.kafkaPartition())); + if (HOSTNAME != null) + trackMetas.put("kafka_connect_host", HOSTNAME); event.addFields(trackMetas); } event.validate();