diff --git a/src/main/java/com/splunk/hecclient/HecAckPoller.java b/src/main/java/com/splunk/hecclient/HecAckPoller.java index 0fcfafa9..832f6af7 100644 --- a/src/main/java/com/splunk/hecclient/HecAckPoller.java +++ b/src/main/java/com/splunk/hecclient/HecAckPoller.java @@ -209,9 +209,9 @@ public void stickySessionHandler(HecChannel channel) { log.info("Failing {} batches for the channel {}, these will be resent by the connector.", channelBatches.size(), oldChannelId); if (pollerCallback != null) { List expired = new ArrayList<>(); - Iterator iter = channelBatches.entrySet().iterator(); + Iterator> iter = channelBatches.entrySet().iterator(); while(iter.hasNext()) { - Map.Entry pair = (Map.Entry) iter.next(); + Map.Entry pair = iter.next(); EventBatch batch = pair.getValue(); totalOutstandingEventBatches.decrementAndGet(); batch.fail(); diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkRecord.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkRecord.java index b8f59745..49701e8f 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkRecord.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkRecord.java @@ -15,8 +15,12 @@ */ package com.splunk.kafka.connect; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.sink.SinkRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * SplunkSinkRecord provides helper functionality to enable Header support for the Splunk Connect for Kafka, Namely @@ -27,6 +31,7 @@ * @since 1.1.0 */ public class SplunkSinkRecord { + private static final Logger log = LoggerFactory.getLogger(SplunkSinkRecord.class); Headers headers; SplunkSinkConnectorConfig connectorConfig; String splunkHeaderIndex = ""; @@ -48,7 +53,10 @@ public SplunkSinkRecord() {} public SplunkSinkRecord(SinkRecord record, SplunkSinkConnectorConfig connectorConfig) { this.connectorConfig = connectorConfig; this.headers = record.headers(); - setMetadataValues(); + if(this.headers != null) { + log.info("not null headers"); + setMetadataValues(); + } } /** @@ -75,10 +83,49 @@ protected boolean compareRecordHeaders(SinkRecord record) { } private void setMetadataValues() { - splunkHeaderIndex = this.headers.lastWithName(connectorConfig.headerIndex).value().toString(); - splunkHeaderHost = this.headers.lastWithName(connectorConfig.headerHost).value().toString(); - splunkHeaderSource = this.headers.lastWithName(connectorConfig.headerSource).value().toString(); - splunkHeaderSourcetype = this.headers.lastWithName(connectorConfig.headerSourcetype).value().toString(); + log.info("Made it to setMetadataValues"); + + if(this.headers.lastWithName(connectorConfig.headerIndex).value() != null) { + splunkHeaderIndex = this.headers.lastWithName(connectorConfig.headerIndex).value().toString(); + } + if(this.headers.lastWithName(connectorConfig.headerHost).value() != null) { + splunkHeaderHost = this.headers.lastWithName(connectorConfig.headerHost).value().toString(); + } + if(this.headers.lastWithName(connectorConfig.headerSource).value() != null) { + splunkHeaderSource = this.headers.lastWithName(connectorConfig.headerSource).value().toString(); + } + if(this.headers.lastWithName(connectorConfig.headerSourcetype).value() != null) { + splunkHeaderSourcetype = this.headers.lastWithName(connectorConfig.headerSourcetype).value().toString(); + } + } + + public String id() { + return splunkHeaderIndex + "$$$" + splunkHeaderHost + "$$$" + + splunkHeaderSource + "$$$" + splunkHeaderSourcetype; + } + + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(splunkHeaderIndex) + .append(splunkHeaderHost) + .append(splunkHeaderSource) + .append(splunkHeaderSourcetype) + .toHashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof SplunkSinkRecord) { + final SplunkSinkRecord other = (SplunkSinkRecord) obj; + return new EqualsBuilder() + .append(splunkHeaderIndex, other.splunkHeaderIndex) + .append(splunkHeaderHost, other.splunkHeaderHost) + .append(splunkHeaderSource, other.splunkHeaderSource) + .append(splunkHeaderSourcetype, other.splunkHeaderSourcetype) + .isEquals(); + } + return false; } public Headers getHeaders() { diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java index 736bec8d..1e4422f1 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java @@ -141,7 +141,7 @@ private void preventTooManyOutstandingEvents() { private void handleRaw(final Collection records) { if(connectorConfig.headerSupport) { - handleRecordsWithHeader(records); + if(records != null) { handleRecordsWithHeader(records); } } else if (connectorConfig.hasMetaDataConfigured()) { @@ -158,21 +158,63 @@ else if (connectorConfig.hasMetaDataConfigured()) { } private void handleRecordsWithHeader(final Collection records) { - List recordsWithSameHeaders = new ArrayList<>(); - SplunkSinkRecord splunkSinkRecord = new SplunkSinkRecord(); + HashMap> recordsWithSameHeaders = new HashMap<>(); for (SinkRecord record : records) { - if (splunkSinkRecord.compareRecordHeaders(record)) { - recordsWithSameHeaders.add(record); - continue; + String key = headerId(record); + if (!recordsWithSameHeaders.containsKey(key)) { + ArrayList recordList = new ArrayList(); + recordsWithSameHeaders.put(key, recordList); } + ArrayList recordList = recordsWithSameHeaders.get(key); + recordList.add(record); + recordsWithSameHeaders.put(key, recordList); + } + + int index = 0; + Iterator>> itr = recordsWithSameHeaders.entrySet().iterator(); + while(itr.hasNext()) { + Map.Entry set = itr.next(); + String splunkSinkRecordKey = (String)set.getKey(); + ArrayList recordArrayList = (ArrayList)set.getValue(); + EventBatch batch = createRawHeaderEventBatch(splunkSinkRecordKey); + sendEvents(recordArrayList, batch); + index++; + } + log.debug("{} records have been bucketed in to {} batches",records.size(), index); + } + + public String headerId(SinkRecord sinkRecord) { + Headers headers = sinkRecord.headers(); + String headerId = ""; + + if(headers.lastWithName(connectorConfig.headerIndex) != null) { + headerId += headers.lastWithName(connectorConfig.headerIndex).value().toString(); + } + + headerId = insertheaderToken(headerId); + + if(headers.lastWithName(connectorConfig.headerHost) != null) { + headerId += headers.lastWithName(connectorConfig.headerHost).value().toString(); + } + + headerId = insertheaderToken(headerId); - EventBatch batch = createRawHeaderEventBatch(splunkSinkRecord); - sendEvents(recordsWithSameHeaders, batch); - recordsWithSameHeaders.clear(); - recordsWithSameHeaders.add(record); - splunkSinkRecord = new SplunkSinkRecord(record, connectorConfig); + if(headers.lastWithName(connectorConfig.headerSource) != null) { + headerId += headers.lastWithName(connectorConfig.headerSource).value().toString(); } + + headerId = insertheaderToken(headerId); + + if(headers.lastWithName(connectorConfig.headerSourcetype) != null) { + headerId += headers.lastWithName(connectorConfig.headerSourcetype).value().toString(); + } + + return headerId; + } + + public String insertheaderToken(String id) { + return id + "$$$"; } private void handleEvent(final Collection records) { @@ -217,15 +259,17 @@ private void send(final EventBatch batch) { } } - private EventBatch createRawHeaderEventBatch(SplunkSinkRecord splunkSinkRecord) { + private EventBatch createRawHeaderEventBatch(String splunkSinkRecord) { + String[] split = splunkSinkRecord.split("[$]{3}"); + return RawEventBatch.factory() - .setIndex(splunkSinkRecord.getSplunkHeaderIndex()) - .setSourcetype(splunkSinkRecord.getSplunkHeaderSourcetype()) - .setSource(splunkSinkRecord.getSplunkHeaderSource()) - .setHost(splunkSinkRecord.getSplunkHeaderHost()) + .setIndex(split[0]) + .setSourcetype(split[1]) + .setSource(split[2]) + .setHost(split[3]) .build(); - } + // setup metadata on RawEventBatch private EventBatch createRawEventBatch(final TopicPartition tp) { if (tp == null) { diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java index 59c34170..5a0448a8 100644 --- a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java +++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java @@ -226,6 +226,12 @@ public void putWithRawAndAckWithoutMeta() { putWithSuccess(true, false); } + @Test + public void handleRecordsWithHeaders() { + + + } + private void putWithSuccess(boolean raw, boolean withMeta) { int batchSize = 100; int total = 1000;