From 4e7ba60feb710f691cbf681dc8496b879d454017 Mon Sep 17 00:00:00 2001 From: Don Tregonning Date: Wed, 22 Aug 2018 19:06:57 -0700 Subject: [PATCH 1/3] issue-165 Modify raw batching functionality to use hashmap instad of list --- .../kafka/connect/SplunkSinkRecord.java | 26 +++++++++++++++ .../splunk/kafka/connect/SplunkSinkTask.java | 32 ++++++++++++------- 2 files changed, 46 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkRecord.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkRecord.java index b8f59745..83f56d34 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkRecord.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkRecord.java @@ -15,6 +15,8 @@ */ package com.splunk.kafka.connect; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.sink.SinkRecord; @@ -81,6 +83,30 @@ private void setMetadataValues() { splunkHeaderSourcetype = this.headers.lastWithName(connectorConfig.headerSourcetype).value().toString(); } + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(splunkHeaderIndex) + .append(splunkHeaderHost) + .append(splunkHeaderSource) + .append(splunkHeaderSourcetype) + .toHashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof SplunkSinkRecord) { + final SplunkSinkRecord other = (SplunkSinkRecord) obj; + return new EqualsBuilder() + .append(splunkHeaderIndex, other.splunkHeaderIndex) + .append(splunkHeaderHost, other.splunkHeaderHost) + .append(splunkHeaderSource, other.splunkHeaderSource) + .append(splunkHeaderSourcetype, other.splunkHeaderSourcetype) + .isEquals(); + } + return false; + } + public Headers getHeaders() { return headers; } diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java index 736bec8d..d46be050 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java @@ -29,6 +29,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.net.ssl.SSLEngineResult; + public final class SplunkSinkTask extends SinkTask implements PollerCallback { private static final Logger log = LoggerFactory.getLogger(SplunkSinkTask.class); private static final long flushWindow = 30 * 1000; // 30 seconds @@ -158,20 +160,26 @@ else if (connectorConfig.hasMetaDataConfigured()) { } private void handleRecordsWithHeader(final Collection records) { - List recordsWithSameHeaders = new ArrayList<>(); - SplunkSinkRecord splunkSinkRecord = new SplunkSinkRecord(); - + HashMap> recordsWithSameHeaders = new HashMap<>(); + SplunkSinkRecord splunkSinkRecord; for (SinkRecord record : records) { - if (splunkSinkRecord.compareRecordHeaders(record)) { - recordsWithSameHeaders.add(record); - continue; - } - - EventBatch batch = createRawHeaderEventBatch(splunkSinkRecord); - sendEvents(recordsWithSameHeaders, batch); - recordsWithSameHeaders.clear(); - recordsWithSameHeaders.add(record); splunkSinkRecord = new SplunkSinkRecord(record, connectorConfig); + + if (!recordsWithSameHeaders.containsKey(splunkSinkRecord)) { + recordsWithSameHeaders.put(splunkSinkRecord, new ArrayList<>()); + } + ArrayList recordList = recordsWithSameHeaders.get(record); + recordList.add(record); + recordsWithSameHeaders.put(splunkSinkRecord, recordList); + } + + Iterator itr = recordsWithSameHeaders.entrySet().iterator(); + while(itr.hasNext()) { + Map.Entry set = (Map.Entry)itr.next(); + SplunkSinkRecord splunkSinkRecordKey = (SplunkSinkRecord)set.getKey(); + ArrayList recordArrayList = (ArrayList)set.getValue(); + EventBatch batch = createRawHeaderEventBatch(1); + sendEvents(recordArrayList, batch); } } From 4e1372afb4bef93869b556dbecadb163b03442b8 Mon Sep 17 00:00:00 2001 From: Don Tregonning Date: Thu, 23 Aug 2018 17:02:46 -0700 Subject: [PATCH 2/3] WIP --- .../kafka/connect/SplunkSinkRecord.java | 30 ++++++++-- .../splunk/kafka/connect/SplunkSinkTask.java | 55 ++++++++++++++++--- .../kafka/connect/SplunkSinkTaskTest.java | 6 ++ 3 files changed, 79 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkRecord.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkRecord.java index 83f56d34..d209e853 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkRecord.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkRecord.java @@ -19,6 +19,8 @@ import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.sink.SinkRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * SplunkSinkRecord provides helper functionality to enable Header support for the Splunk Connect for Kafka, Namely @@ -29,6 +31,7 @@ * @since 1.1.0 */ public class SplunkSinkRecord { + private static final Logger log = LoggerFactory.getLogger(SplunkSinkRecord.class); Headers headers; SplunkSinkConnectorConfig connectorConfig; String splunkHeaderIndex = ""; @@ -50,7 +53,10 @@ public SplunkSinkRecord() {} public SplunkSinkRecord(SinkRecord record, SplunkSinkConnectorConfig connectorConfig) { this.connectorConfig = connectorConfig; this.headers = record.headers(); - setMetadataValues(); + if(this.headers != null) { + log.info("not null headers"); + setMetadataValues(); + } } /** @@ -77,10 +83,24 @@ protected boolean compareRecordHeaders(SinkRecord record) { } private void setMetadataValues() { - splunkHeaderIndex = this.headers.lastWithName(connectorConfig.headerIndex).value().toString(); - splunkHeaderHost = this.headers.lastWithName(connectorConfig.headerHost).value().toString(); - splunkHeaderSource = this.headers.lastWithName(connectorConfig.headerSource).value().toString(); - splunkHeaderSourcetype = this.headers.lastWithName(connectorConfig.headerSourcetype).value().toString(); + log.info("Made it to setMetadataValues"); + + if(this.headers.lastWithName(connectorConfig.headerIndex).value() != null) { + splunkHeaderIndex = this.headers.lastWithName(connectorConfig.headerIndex).value().toString(); + } + if(this.headers.lastWithName(connectorConfig.headerHost).value() != null) { + splunkHeaderHost = this.headers.lastWithName(connectorConfig.headerHost).value().toString(); + } + if(this.headers.lastWithName(connectorConfig.headerSource).value() != null) { + splunkHeaderSource = this.headers.lastWithName(connectorConfig.headerSource).value().toString(); + } + if(this.headers.lastWithName(connectorConfig.headerSourcetype).value() != null) { + splunkHeaderSourcetype = this.headers.lastWithName(connectorConfig.headerSourcetype).value().toString(); + } + } + + public String id() { + return } @Override diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java index d46be050..bbf67ce4 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java @@ -16,6 +16,7 @@ package com.splunk.kafka.connect; import com.splunk.hecclient.*; +import com.sun.tools.corba.se.idl.StringGen; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.errors.RetriableException; @@ -143,7 +144,7 @@ private void preventTooManyOutstandingEvents() { private void handleRaw(final Collection records) { if(connectorConfig.headerSupport) { - handleRecordsWithHeader(records); + if(records != null) { handleRecordsWithHeader(records); } } else if (connectorConfig.hasMetaDataConfigured()) { @@ -160,27 +161,67 @@ else if (connectorConfig.hasMetaDataConfigured()) { } private void handleRecordsWithHeader(final Collection records) { - HashMap> recordsWithSameHeaders = new HashMap<>(); + log.info("Inside handle records"); + + HashMap> recordsWithSameHeaders = new HashMap<>(); SplunkSinkRecord splunkSinkRecord; for (SinkRecord record : records) { - splunkSinkRecord = new SplunkSinkRecord(record, connectorConfig); + log.info("Inside loop"); - if (!recordsWithSameHeaders.containsKey(splunkSinkRecord)) { - recordsWithSameHeaders.put(splunkSinkRecord, new ArrayList<>()); + // splunkSinkRecord = new SplunkSinkRecord(record, connectorConfig); + String key = headerId(record); + if (!recordsWithSameHeaders.containsKey(key)) { + recordsWithSameHeaders.put(key, new ArrayList<>()); } ArrayList recordList = recordsWithSameHeaders.get(record); recordList.add(record); - recordsWithSameHeaders.put(splunkSinkRecord, recordList); + recordsWithSameHeaders.put(key, recordList); } + int index = 0; Iterator itr = recordsWithSameHeaders.entrySet().iterator(); while(itr.hasNext()) { + log.info("Sending Log {}", index); Map.Entry set = (Map.Entry)itr.next(); SplunkSinkRecord splunkSinkRecordKey = (SplunkSinkRecord)set.getKey(); ArrayList recordArrayList = (ArrayList)set.getValue(); - EventBatch batch = createRawHeaderEventBatch(1); + EventBatch batch = createRawHeaderEventBatch(splunkSinkRecordKey); sendEvents(recordArrayList, batch); + index++; + } + } + + public String headerId(SinkRecord sinkRecord) { + Headers headers = sinkRecord.headers(); + String headerId = ""; + + if(headers.lastWithName(connectorConfig.headerIndex).value() != null) { + headerId += headers.lastWithName(connectorConfig.headerIndex).value().toString(); } + + insertheaderToken(headerId); + + if(headers.lastWithName(connectorConfig.headerHost).value() != null) { + headerId += headers.lastWithName(connectorConfig.headerHost).value().toString(); + } + + insertheaderToken(headerId); + + if(headers.lastWithName(connectorConfig.headerSource).value() != null) { + headerId += headers.lastWithName(connectorConfig.headerSource).value().toString(); + } + + insertheaderToken(headerId); + + if(headers.lastWithName(connectorConfig.headerSourcetype).value() != null) { + headerId += headers.lastWithName(connectorConfig.headerSourcetype).value().toString(); + } + + return headerId; + } + + public String insertheaderToken(String id) { + return id += "$$$"; } private void handleEvent(final Collection records) { diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java index 59c34170..5a0448a8 100644 --- a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java +++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java @@ -226,6 +226,12 @@ public void putWithRawAndAckWithoutMeta() { putWithSuccess(true, false); } + @Test + public void handleRecordsWithHeaders() { + + + } + private void putWithSuccess(boolean raw, boolean withMeta) { int batchSize = 100; int total = 1000; From 7906cc01106c38d95ef89a0ed578503d743686c9 Mon Sep 17 00:00:00 2001 From: Don Tregonning Date: Mon, 27 Aug 2018 12:27:12 -0700 Subject: [PATCH 3/3] issue-165 - fixed categorization of like Headers --- .../com/splunk/hecclient/HecAckPoller.java | 4 +- .../kafka/connect/SplunkSinkRecord.java | 3 +- .../splunk/kafka/connect/SplunkSinkTask.java | 53 +++++++++---------- 3 files changed, 28 insertions(+), 32 deletions(-) diff --git a/src/main/java/com/splunk/hecclient/HecAckPoller.java b/src/main/java/com/splunk/hecclient/HecAckPoller.java index 0fcfafa9..832f6af7 100644 --- a/src/main/java/com/splunk/hecclient/HecAckPoller.java +++ b/src/main/java/com/splunk/hecclient/HecAckPoller.java @@ -209,9 +209,9 @@ public void stickySessionHandler(HecChannel channel) { log.info("Failing {} batches for the channel {}, these will be resent by the connector.", channelBatches.size(), oldChannelId); if (pollerCallback != null) { List expired = new ArrayList<>(); - Iterator iter = channelBatches.entrySet().iterator(); + Iterator> iter = channelBatches.entrySet().iterator(); while(iter.hasNext()) { - Map.Entry pair = (Map.Entry) iter.next(); + Map.Entry pair = iter.next(); EventBatch batch = pair.getValue(); totalOutstandingEventBatches.decrementAndGet(); batch.fail(); diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkRecord.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkRecord.java index d209e853..49701e8f 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkRecord.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkRecord.java @@ -100,7 +100,8 @@ private void setMetadataValues() { } public String id() { - return + return splunkHeaderIndex + "$$$" + splunkHeaderHost + "$$$" + + splunkHeaderSource + "$$$" + splunkHeaderSourcetype; } @Override diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java index bbf67ce4..1e4422f1 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java @@ -16,7 +16,6 @@ package com.splunk.kafka.connect; import com.splunk.hecclient.*; -import com.sun.tools.corba.se.idl.StringGen; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.errors.RetriableException; @@ -30,8 +29,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.SSLEngineResult; - public final class SplunkSinkTask extends SinkTask implements PollerCallback { private static final Logger log = LoggerFactory.getLogger(SplunkSinkTask.class); private static final long flushWindow = 30 * 1000; // 30 seconds @@ -161,59 +158,55 @@ else if (connectorConfig.hasMetaDataConfigured()) { } private void handleRecordsWithHeader(final Collection records) { - log.info("Inside handle records"); - HashMap> recordsWithSameHeaders = new HashMap<>(); - SplunkSinkRecord splunkSinkRecord; - for (SinkRecord record : records) { - log.info("Inside loop"); - // splunkSinkRecord = new SplunkSinkRecord(record, connectorConfig); + for (SinkRecord record : records) { String key = headerId(record); if (!recordsWithSameHeaders.containsKey(key)) { - recordsWithSameHeaders.put(key, new ArrayList<>()); + ArrayList recordList = new ArrayList(); + recordsWithSameHeaders.put(key, recordList); } - ArrayList recordList = recordsWithSameHeaders.get(record); + ArrayList recordList = recordsWithSameHeaders.get(key); recordList.add(record); recordsWithSameHeaders.put(key, recordList); } int index = 0; - Iterator itr = recordsWithSameHeaders.entrySet().iterator(); + Iterator>> itr = recordsWithSameHeaders.entrySet().iterator(); while(itr.hasNext()) { - log.info("Sending Log {}", index); - Map.Entry set = (Map.Entry)itr.next(); - SplunkSinkRecord splunkSinkRecordKey = (SplunkSinkRecord)set.getKey(); + Map.Entry set = itr.next(); + String splunkSinkRecordKey = (String)set.getKey(); ArrayList recordArrayList = (ArrayList)set.getValue(); EventBatch batch = createRawHeaderEventBatch(splunkSinkRecordKey); sendEvents(recordArrayList, batch); index++; } + log.debug("{} records have been bucketed in to {} batches",records.size(), index); } public String headerId(SinkRecord sinkRecord) { Headers headers = sinkRecord.headers(); String headerId = ""; - if(headers.lastWithName(connectorConfig.headerIndex).value() != null) { + if(headers.lastWithName(connectorConfig.headerIndex) != null) { headerId += headers.lastWithName(connectorConfig.headerIndex).value().toString(); } - insertheaderToken(headerId); + headerId = insertheaderToken(headerId); - if(headers.lastWithName(connectorConfig.headerHost).value() != null) { + if(headers.lastWithName(connectorConfig.headerHost) != null) { headerId += headers.lastWithName(connectorConfig.headerHost).value().toString(); } - insertheaderToken(headerId); + headerId = insertheaderToken(headerId); - if(headers.lastWithName(connectorConfig.headerSource).value() != null) { + if(headers.lastWithName(connectorConfig.headerSource) != null) { headerId += headers.lastWithName(connectorConfig.headerSource).value().toString(); } - insertheaderToken(headerId); + headerId = insertheaderToken(headerId); - if(headers.lastWithName(connectorConfig.headerSourcetype).value() != null) { + if(headers.lastWithName(connectorConfig.headerSourcetype) != null) { headerId += headers.lastWithName(connectorConfig.headerSourcetype).value().toString(); } @@ -221,7 +214,7 @@ public String headerId(SinkRecord sinkRecord) { } public String insertheaderToken(String id) { - return id += "$$$"; + return id + "$$$"; } private void handleEvent(final Collection records) { @@ -266,15 +259,17 @@ private void send(final EventBatch batch) { } } - private EventBatch createRawHeaderEventBatch(SplunkSinkRecord splunkSinkRecord) { + private EventBatch createRawHeaderEventBatch(String splunkSinkRecord) { + String[] split = splunkSinkRecord.split("[$]{3}"); + return RawEventBatch.factory() - .setIndex(splunkSinkRecord.getSplunkHeaderIndex()) - .setSourcetype(splunkSinkRecord.getSplunkHeaderSourcetype()) - .setSource(splunkSinkRecord.getSplunkHeaderSource()) - .setHost(splunkSinkRecord.getSplunkHeaderHost()) + .setIndex(split[0]) + .setSourcetype(split[1]) + .setSource(split[2]) + .setHost(split[3]) .build(); - } + // setup metadata on RawEventBatch private EventBatch createRawEventBatch(final TopicPartition tp) { if (tp == null) {