Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/main/java/com/splunk/hecclient/HecAckPoller.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,9 @@ public void stickySessionHandler(HecChannel channel) {
log.info("Failing {} batches for the channel {}, these will be resent by the connector.", channelBatches.size(), oldChannelId);
if (pollerCallback != null) {
List<EventBatch> expired = new ArrayList<>();
Iterator iter = channelBatches.entrySet().iterator();
Iterator<Map.Entry<Long,EventBatch>> iter = channelBatches.entrySet().iterator();
while(iter.hasNext()) {
Map.Entry<Long, EventBatch> pair = (Map.Entry) iter.next();
Map.Entry<Long, EventBatch> pair = iter.next();
EventBatch batch = pair.getValue();
totalOutstandingEventBatches.decrementAndGet();
batch.fail();
Expand Down
57 changes: 52 additions & 5 deletions src/main/java/com/splunk/kafka/connect/SplunkSinkRecord.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
*/
package com.splunk.kafka.connect;

import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* SplunkSinkRecord provides helper functionality to enable Header support for the Splunk Connect for Kafka, Namely
Expand All @@ -27,6 +31,7 @@
* @since 1.1.0
*/
public class SplunkSinkRecord {
private static final Logger log = LoggerFactory.getLogger(SplunkSinkRecord.class);
Headers headers;
SplunkSinkConnectorConfig connectorConfig;
String splunkHeaderIndex = "";
Expand All @@ -48,7 +53,10 @@ public SplunkSinkRecord() {}
public SplunkSinkRecord(SinkRecord record, SplunkSinkConnectorConfig connectorConfig) {
this.connectorConfig = connectorConfig;
this.headers = record.headers();
setMetadataValues();
if(this.headers != null) {
log.info("not null headers");
setMetadataValues();
}
}

/**
Expand All @@ -75,10 +83,49 @@ protected boolean compareRecordHeaders(SinkRecord record) {
}

private void setMetadataValues() {
splunkHeaderIndex = this.headers.lastWithName(connectorConfig.headerIndex).value().toString();
splunkHeaderHost = this.headers.lastWithName(connectorConfig.headerHost).value().toString();
splunkHeaderSource = this.headers.lastWithName(connectorConfig.headerSource).value().toString();
splunkHeaderSourcetype = this.headers.lastWithName(connectorConfig.headerSourcetype).value().toString();
log.info("Made it to setMetadataValues");

if(this.headers.lastWithName(connectorConfig.headerIndex).value() != null) {
splunkHeaderIndex = this.headers.lastWithName(connectorConfig.headerIndex).value().toString();
}
if(this.headers.lastWithName(connectorConfig.headerHost).value() != null) {
splunkHeaderHost = this.headers.lastWithName(connectorConfig.headerHost).value().toString();
}
if(this.headers.lastWithName(connectorConfig.headerSource).value() != null) {
splunkHeaderSource = this.headers.lastWithName(connectorConfig.headerSource).value().toString();
}
if(this.headers.lastWithName(connectorConfig.headerSourcetype).value() != null) {
splunkHeaderSourcetype = this.headers.lastWithName(connectorConfig.headerSourcetype).value().toString();
}
}

public String id() {
return splunkHeaderIndex + "$$$" + splunkHeaderHost + "$$$"
+ splunkHeaderSource + "$$$" + splunkHeaderSourcetype;
}

@Override
public int hashCode() {
return new HashCodeBuilder()
.append(splunkHeaderIndex)
.append(splunkHeaderHost)
.append(splunkHeaderSource)
.append(splunkHeaderSourcetype)
.toHashCode();
}

@Override
public boolean equals(Object obj) {
if (obj instanceof SplunkSinkRecord) {
final SplunkSinkRecord other = (SplunkSinkRecord) obj;
return new EqualsBuilder()
.append(splunkHeaderIndex, other.splunkHeaderIndex)
.append(splunkHeaderHost, other.splunkHeaderHost)
.append(splunkHeaderSource, other.splunkHeaderSource)
.append(splunkHeaderSourcetype, other.splunkHeaderSourcetype)
.isEquals();
}
return false;
}

public Headers getHeaders() {
Expand Down
78 changes: 61 additions & 17 deletions src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ private void preventTooManyOutstandingEvents() {

private void handleRaw(final Collection<SinkRecord> records) {
if(connectorConfig.headerSupport) {
handleRecordsWithHeader(records);
if(records != null) { handleRecordsWithHeader(records); }
}

else if (connectorConfig.hasMetaDataConfigured()) {
Expand All @@ -158,21 +158,63 @@ else if (connectorConfig.hasMetaDataConfigured()) {
}

private void handleRecordsWithHeader(final Collection<SinkRecord> records) {
List<SinkRecord> recordsWithSameHeaders = new ArrayList<>();
SplunkSinkRecord splunkSinkRecord = new SplunkSinkRecord();
HashMap<String, ArrayList<SinkRecord>> recordsWithSameHeaders = new HashMap<>();

for (SinkRecord record : records) {
if (splunkSinkRecord.compareRecordHeaders(record)) {
recordsWithSameHeaders.add(record);
continue;
String key = headerId(record);
if (!recordsWithSameHeaders.containsKey(key)) {
ArrayList<SinkRecord> recordList = new ArrayList<SinkRecord>();
recordsWithSameHeaders.put(key, recordList);
}
ArrayList<SinkRecord> recordList = recordsWithSameHeaders.get(key);
recordList.add(record);
recordsWithSameHeaders.put(key, recordList);
}

int index = 0;
Iterator<Map.Entry<String, ArrayList<SinkRecord>>> itr = recordsWithSameHeaders.entrySet().iterator();
while(itr.hasNext()) {
Map.Entry set = itr.next();
String splunkSinkRecordKey = (String)set.getKey();
ArrayList<SinkRecord> recordArrayList = (ArrayList)set.getValue();
EventBatch batch = createRawHeaderEventBatch(splunkSinkRecordKey);
sendEvents(recordArrayList, batch);
index++;
}
log.debug("{} records have been bucketed in to {} batches",records.size(), index);
}

public String headerId(SinkRecord sinkRecord) {
Headers headers = sinkRecord.headers();
String headerId = "";

if(headers.lastWithName(connectorConfig.headerIndex) != null) {
headerId += headers.lastWithName(connectorConfig.headerIndex).value().toString();
}

headerId = insertheaderToken(headerId);

if(headers.lastWithName(connectorConfig.headerHost) != null) {
headerId += headers.lastWithName(connectorConfig.headerHost).value().toString();
}

headerId = insertheaderToken(headerId);

EventBatch batch = createRawHeaderEventBatch(splunkSinkRecord);
sendEvents(recordsWithSameHeaders, batch);
recordsWithSameHeaders.clear();
recordsWithSameHeaders.add(record);
splunkSinkRecord = new SplunkSinkRecord(record, connectorConfig);
if(headers.lastWithName(connectorConfig.headerSource) != null) {
headerId += headers.lastWithName(connectorConfig.headerSource).value().toString();
}

headerId = insertheaderToken(headerId);

if(headers.lastWithName(connectorConfig.headerSourcetype) != null) {
headerId += headers.lastWithName(connectorConfig.headerSourcetype).value().toString();
}

return headerId;
}

public String insertheaderToken(String id) {
return id + "$$$";
}

private void handleEvent(final Collection<SinkRecord> records) {
Expand Down Expand Up @@ -217,15 +259,17 @@ private void send(final EventBatch batch) {
}
}

private EventBatch createRawHeaderEventBatch(SplunkSinkRecord splunkSinkRecord) {
private EventBatch createRawHeaderEventBatch(String splunkSinkRecord) {
String[] split = splunkSinkRecord.split("[$]{3}");

return RawEventBatch.factory()
.setIndex(splunkSinkRecord.getSplunkHeaderIndex())
.setSourcetype(splunkSinkRecord.getSplunkHeaderSourcetype())
.setSource(splunkSinkRecord.getSplunkHeaderSource())
.setHost(splunkSinkRecord.getSplunkHeaderHost())
.setIndex(split[0])
.setSourcetype(split[1])
.setSource(split[2])
.setHost(split[3])
.build();

}

// setup metadata on RawEventBatch
private EventBatch createRawEventBatch(final TopicPartition tp) {
if (tp == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@ public void putWithRawAndAckWithoutMeta() {
putWithSuccess(true, false);
}

@Test
public void handleRecordsWithHeaders() {


}

private void putWithSuccess(boolean raw, boolean withMeta) {
int batchSize = 100;
int total = 1000;
Expand Down