Skip to content

Commit 8953a55

Browse files
author
Don Tregonning
committed
Updates based on review
1 parent 8c709a3 commit 8953a55

File tree

5 files changed

+94
-107
lines changed

5 files changed

+94
-107
lines changed

src/main/java/com/splunk/hecclient/JsonEventBatch.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,7 @@ public boolean equals(Object obj) {
6161
return new EqualsBuilder()
6262
.append(endpoint, other.endpoint)
6363
.isEquals();
64-
} else {
65-
return false;
6664
}
65+
return false;
6766
}
6867
}

src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,14 +146,16 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
146146
+ "latency metadata will be indexed along with raw data. This setting only works in "
147147
+ "conjunction with /event HEC endpoint (\"splunk.hec.raw\" : \"false\"). By default"
148148
+ ", this is set to false.";
149-
static final String HEC_EVENT_FORMATTED_DOC = "Ensures events already formatted correctly for HEC are parsed "
150-
+ "correctly in the connector, before being sent to Splunk";
149+
static final String HEC_EVENT_FORMATTED_DOC = "Ensures events that are pre-formatted into the properly formatted HEC "
150+
+ "JSON format as per http://dev.splunk.com/view/event-collector/SP-CAAAE6P have meta-data and event data indexed "
151+
+ "correctly by Splunk.";
151152
// TBD
152153
static final String SSL_TRUSTSTORE_PATH_DOC = "Path on the local disk to the certificate trust store.";
153154
static final String SSL_TRUSTSTORE_PASSWORD_DOC = "Password for the trust store.";
154155

155156
static final String HEADER_SUPPORT_DOC = "Setting will enable Kafka Record headers to be used for meta data override";
156-
static final String HEADER_CUSTOM_DOC = "Setting will enable look for Record headers with these values and pass them ";
157+
static final String HEADER_CUSTOM_DOC = "Setting will enable look for Record headers with these values and add them"
158+
+ "to each event if present. Custom headers are configured separated by comma for multiple headers. ex, (\"custom_header_1,custom_header_2,custom_header_3\").";
157159
static final String HEADER_INDEX_DOC = "Header to use for Splunk Header Index";
158160
static final String HEADER_SOURCE_DOC = "Header to use for Splunk Header Source";
159161
static final String HEADER_SOURCETYPE_DOC = "Header to use for Splunk Header Sourcetype";

src/main/java/com/splunk/kafka/connect/KafkaHeaderUtility.java renamed to src/main/java/com/splunk/kafka/connect/SplunkSinkRecord.java

Lines changed: 17 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,22 @@
1919
import org.apache.kafka.connect.sink.SinkRecord;
2020

2121
/**
22-
* KafkaHeaderUtility provides helper functionality to enable Header support for the Splunk Connect for Kafka, Namely
22+
* SplunkSinkRecord provides helper functionality to enable Header support for the Splunk Connect for Kafka, Namely
2323
* Header functionality introspection and comparison.
2424
* <p>
2525
*
2626
* @version 1.1.0
2727
* @since 1.1.0
2828
*/
29-
public class KafkaHeaderUtility {
29+
public class SplunkSinkRecord {
3030
Headers headers;
3131
SplunkSinkConnectorConfig connectorConfig;
3232
String splunkHeaderIndex = "";
3333
String splunkHeaderHost = "";
3434
String splunkHeaderSource = "";
3535
String splunkHeaderSourcetype = "";
3636

37-
public KafkaHeaderUtility() {}
37+
public SplunkSinkRecord() {}
3838

3939
/**
4040
* Creates a new Kafka Header utility object. Will take a Kafka SinkRecord and Splunk Sink Connector configuration
@@ -45,10 +45,10 @@ public KafkaHeaderUtility() {}
4545
* @version 1.1.0
4646
* @since 1.1.0
4747
*/
48-
public KafkaHeaderUtility(SinkRecord record, SplunkSinkConnectorConfig connectorConfig) {
48+
public SplunkSinkRecord(SinkRecord record, SplunkSinkConnectorConfig connectorConfig) {
4949
this.connectorConfig = connectorConfig;
5050
this.headers = record.headers();
51-
setMetadataValues(record.headers());
51+
setMetadataValues();
5252
}
5353

5454
/**
@@ -60,46 +60,39 @@ public KafkaHeaderUtility(SinkRecord record, SplunkSinkConnectorConfig connector
6060
* @version 1.1.0
6161
* @since 1.1.0
6262
*/
63-
public boolean compareRecordHeaders(SinkRecord record) {
63+
protected boolean compareRecordHeaders(SinkRecord record) {
6464
headers = record.headers();
65+
String index = headers.lastWithName(connectorConfig.headerIndex).value().toString();
66+
String host = headers.lastWithName(connectorConfig.headerHost).value().toString();
67+
String source = headers.lastWithName(connectorConfig.headerSource).value().toString();
68+
String sourcetype = headers.lastWithName(connectorConfig.headerSourcetype).value().toString();
6569

66-
if(splunkHeaderIndex.equals(headers.lastWithName(connectorConfig.headerIndex).value().toString()) &&
67-
splunkHeaderHost.equals(headers.lastWithName(connectorConfig.headerHost).value().toString()) &&
68-
splunkHeaderSource.equals(headers.lastWithName(connectorConfig.headerSource).value().toString()) &&
69-
splunkHeaderSourcetype.equals(headers.lastWithName(connectorConfig.headerSourcetype).value().toString())) {
70+
if(splunkHeaderIndex.equals(index) && splunkHeaderHost.equals(host) &&
71+
splunkHeaderSource.equals(source) && splunkHeaderSourcetype.equals(sourcetype)) {
7072
return true;
7173
}
7274
return false;
7375
}
7476

75-
public void setHeaders(Headers headers) {
76-
this.headers = headers;
77-
setMetadataValues(headers);
78-
}
79-
80-
public void setMetadataValues(Headers headers) {
81-
splunkHeaderIndex = headers.lastWithName(connectorConfig.headerIndex).value().toString();
82-
splunkHeaderHost = headers.lastWithName(connectorConfig.headerHost).value().toString();
83-
splunkHeaderSource = headers.lastWithName(connectorConfig.headerSource).value().toString();
84-
splunkHeaderSourcetype = headers.lastWithName(connectorConfig.headerSourcetype).value().toString();
77+
private void setMetadataValues() {
78+
splunkHeaderIndex = this.headers.lastWithName(connectorConfig.headerIndex).value().toString();
79+
splunkHeaderHost = this.headers.lastWithName(connectorConfig.headerHost).value().toString();
80+
splunkHeaderSource = this.headers.lastWithName(connectorConfig.headerSource).value().toString();
81+
splunkHeaderSourcetype = this.headers.lastWithName(connectorConfig.headerSourcetype).value().toString();
8582
}
8683

8784
public Headers getHeaders() {
8885
return headers;
8986
}
90-
9187
public String getSplunkHeaderIndex() {
9288
return splunkHeaderIndex;
9389
}
94-
9590
public String getSplunkHeaderHost() {
9691
return splunkHeaderHost;
9792
}
98-
9993
public String getSplunkHeaderSource() {
10094
return splunkHeaderSource;
10195
}
102-
10396
public String getSplunkHeaderSourcetype() {
10497
return splunkHeaderSourcetype;
10598
}

src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java

Lines changed: 62 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -141,46 +141,40 @@ private void preventTooManyOutstandingEvents() {
141141

142142
private void handleRaw(final Collection<SinkRecord> records) {
143143
if(connectorConfig.headerSupport) {
144-
boolean first_record = true;
145-
List<SinkRecord> likeRecords = new ArrayList<>();
146-
KafkaHeaderUtility kafkaHeaderUtility = null;
147-
148-
for(SinkRecord record : records) {
149-
if(first_record) {
150-
likeRecords.add(record);
151-
kafkaHeaderUtility = new KafkaHeaderUtility(record, connectorConfig);
152-
first_record = false;
153-
continue;
154-
}
155-
156-
if(kafkaHeaderUtility.compareRecordHeaders(record)) {
157-
likeRecords.add(record);
158-
}
159-
else {
160-
EventBatch batch = createRawHeaderEventBatch(kafkaHeaderUtility);
161-
sendEvents(likeRecords, batch);
162-
likeRecords.clear();
163-
likeRecords.add(record);
164-
kafkaHeaderUtility = new KafkaHeaderUtility(record, connectorConfig);
165-
}
166-
}
167-
EventBatch batch = createRawHeaderEventBatch(kafkaHeaderUtility);
168-
sendEvents(likeRecords, batch);
144+
handleRecordsWithHeader(records);
169145
}
146+
170147
else if (connectorConfig.hasMetaDataConfigured()) {
171148
// when setup metadata - index, source, sourcetype, we need partition records for /raw
172149
Map<TopicPartition, Collection<SinkRecord>> partitionedRecords = partitionRecords(records);
173150
for (Map.Entry<TopicPartition, Collection<SinkRecord>> entry: partitionedRecords.entrySet()) {
174151
EventBatch batch = createRawEventBatch(entry.getKey());
175152
sendEvents(entry.getValue(), batch);
176153
}
177-
}
178-
else {
154+
} else {
179155
EventBatch batch = createRawEventBatch(null);
180156
sendEvents(records, batch);
181157
}
182158
}
183159

160+
private void handleRecordsWithHeader(final Collection<SinkRecord> records) {
161+
List<SinkRecord> recordsWithSameHeaders = new ArrayList<>();
162+
SplunkSinkRecord splunkSinkRecord = new SplunkSinkRecord();
163+
164+
for (SinkRecord record : records) {
165+
if (splunkSinkRecord.compareRecordHeaders(record)) {
166+
recordsWithSameHeaders.add(record);
167+
continue;
168+
}
169+
170+
EventBatch batch = createRawHeaderEventBatch(splunkSinkRecord);
171+
sendEvents(recordsWithSameHeaders, batch);
172+
recordsWithSameHeaders.clear();
173+
recordsWithSameHeaders.add(record);
174+
splunkSinkRecord = new SplunkSinkRecord(record, connectorConfig);
175+
}
176+
}
177+
184178
private void handleEvent(final Collection<SinkRecord> records) {
185179
EventBatch batch = new JsonEventBatch();
186180
sendEvents(records, batch);
@@ -223,12 +217,12 @@ private void send(final EventBatch batch) {
223217
}
224218
}
225219

226-
private EventBatch createRawHeaderEventBatch(KafkaHeaderUtility kafkaHeaderUtility) {
220+
private EventBatch createRawHeaderEventBatch(SplunkSinkRecord splunkSinkRecord) {
227221
return RawEventBatch.factory()
228-
.setIndex(kafkaHeaderUtility.getSplunkHeaderIndex())
229-
.setSourcetype(kafkaHeaderUtility.getSplunkHeaderSourcetype())
230-
.setSource(kafkaHeaderUtility.getSplunkHeaderSource())
231-
.setHost(kafkaHeaderUtility.getSplunkHeaderHost())
222+
.setIndex(splunkSinkRecord.getSplunkHeaderIndex())
223+
.setSourcetype(splunkSinkRecord.getSplunkHeaderSourcetype())
224+
.setSource(splunkSinkRecord.getSplunkHeaderSource())
225+
.setHost(splunkSinkRecord.getSplunkHeaderHost())
232226
.build();
233227

234228
}
@@ -288,7 +282,7 @@ private Event createHecEventFrom(final SinkRecord record) {
288282
if (connectorConfig.raw) {
289283
RawEvent event = new RawEvent(record.value(), record);
290284
event.setLineBreaker(connectorConfig.lineBreaker);
291-
if(connectorConfig.headerSupport) { event = (RawEvent)addHeadersToHecEvent(event, record); }
285+
if(connectorConfig.headerSupport) { event = (RawEvent)addHeaders(event, record); }
292286
return event;
293287
}
294288

@@ -303,13 +297,12 @@ private Event createHecEventFrom(final SinkRecord record) {
303297
log.error("event does not follow correct HEC pre-formatted format", record.toString());
304298
event = createHECEventNonFormatted(record);
305299
}
306-
}
307-
else {
300+
} else {
308301
event = createHECEventNonFormatted(record);
309302
}
310303

311304
if(connectorConfig.headerSupport) {
312-
addHeadersToHecEvent(event, record);
305+
addHeaders(event, record);
313306
}
314307

315308
if (connectorConfig.trackData) {
@@ -325,41 +318,43 @@ private Event createHecEventFrom(final SinkRecord record) {
325318
return event;
326319
}
327320

328-
private Event addHeadersToHecEvent(Event event, SinkRecord record) {
329-
//TODO: Will need exception handling around String serialization
321+
private Event addHeaders(Event event, SinkRecord record) {
330322
Headers headers = record.headers();
331-
log.debug(headers.toString());
332-
if (!headers.isEmpty()) {
333-
if (headers.lastWithName(connectorConfig.headerIndex) != null) {
334-
log.debug("splunk_index header detected, value is: " + headers.lastWithName(connectorConfig.headerIndex).value().toString());
335-
event.setIndex(headers.lastWithName(connectorConfig.headerIndex).value().toString());
336-
}
337-
if (headers.lastWithName(connectorConfig.headerHost) != null) {
338-
log.debug("splunk_host header detected, value is: " + headers.lastWithName(connectorConfig.headerHost).value().toString());
339-
event.setHost(headers.lastWithName(connectorConfig.headerHost).value().toString());
340-
}
341-
if (headers.lastWithName(connectorConfig.headerSource) != null) {
342-
log.debug("splunk_source header detected, value is: " + headers.lastWithName(connectorConfig.headerSource).value().toString());
343-
event.setSource(headers.lastWithName(connectorConfig.headerSource).value().toString());
344-
}
345-
if (headers.lastWithName(connectorConfig.headerSourcetype) != null) {
346-
log.debug("splunk_sourcetype header detected, value is: " + headers.lastWithName(connectorConfig.headerSourcetype).value().toString());
347-
event.setSourcetype(headers.lastWithName(connectorConfig.headerSourcetype).value().toString());
348-
}
323+
if(headers.isEmpty() && connectorConfig.headerCustom.isEmpty()) {
324+
return event;
325+
}
326+
327+
if (headers.lastWithName(connectorConfig.headerIndex) != null) {
328+
//log.debug("splunk_index header detected, value is: " + headers.lastWithName(connectorConfig.headerIndex).value().toString());
329+
event.setIndex(headers.lastWithName(connectorConfig.headerIndex).value().toString());
330+
}
331+
if (headers.lastWithName(connectorConfig.headerHost) != null) {
332+
//log.debug("splunk_host header detected, value is: " + headers.lastWithName(connectorConfig.headerHost).value().toString());
333+
event.setHost(headers.lastWithName(connectorConfig.headerHost).value().toString());
334+
}
335+
if (headers.lastWithName(connectorConfig.headerSource) != null) {
336+
//log.debug("splunk_source header detected, value is: " + headers.lastWithName(connectorConfig.headerSource).value().toString());
337+
event.setSource(headers.lastWithName(connectorConfig.headerSource).value().toString());
338+
}
339+
if (headers.lastWithName(connectorConfig.headerSourcetype) != null) {
340+
//log.debug("splunk_sourcetype header detected, value is: " + headers.lastWithName(connectorConfig.headerSourcetype).value().toString());
341+
event.setSourcetype(headers.lastWithName(connectorConfig.headerSourcetype).value().toString());
342+
}
349343

350-
if (!connectorConfig.headerCustom.isEmpty()) {
351-
String[] customHeaders = connectorConfig.headerCustom.split(",");
352-
Map<String, String> headerMap = new HashMap<>();
353-
for (String header : customHeaders) {
354-
if (headers.lastWithName(header) != null) {
355-
log.debug(header + " header detected, value is: " + headers.lastWithName(header).value().toString());
356-
headerMap.put(header, headers.lastWithName(header).value().toString());
357-
} else {
358-
log.debug(header + " header value not present in event.");
359-
}
344+
// Custom headers are configured with a comma separated list passed in configuration
345+
// "custom_header_1,custom_header_2,custom_header_3"
346+
if (!connectorConfig.headerCustom.isEmpty()) {
347+
String[] customHeaders = connectorConfig.headerCustom.split(",");
348+
Map<String, String> headerMap = new HashMap<>();
349+
for (String header : customHeaders) {
350+
if (headers.lastWithName(header) != null) {
351+
log.debug(header + " header detected, value is: " + headers.lastWithName(header).value().toString());
352+
headerMap.put(header, headers.lastWithName(header).value().toString());
353+
} else {
354+
log.debug(header + " header value not present in event.");
360355
}
361-
event.addFields(headerMap);
362356
}
357+
event.addFields(headerMap);
363358
}
364359
return event;
365360
}

src/test/java/com/splunk/kafka/connect/KafkaHeaderUtilityTest.java renamed to src/test/java/com/splunk/kafka/connect/SplunkSinkRecordTest.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,13 @@
1717

1818
import org.apache.kafka.connect.header.Headers;
1919
import org.apache.kafka.connect.data.Schema;
20-
import org.apache.kafka.connect.sink.SinkConnector;
2120
import org.apache.kafka.connect.sink.SinkRecord;
2221
import org.junit.Assert;
2322
import org.junit.jupiter.api.Test;
2423

2524
import java.util.Map;
2625

27-
public class KafkaHeaderUtilityTest {
26+
public class SplunkSinkRecordTest {
2827

2928
@Test
3029
public void checkKafkaHeaderUtilityGetters() {
@@ -43,12 +42,12 @@ public void checkKafkaHeaderUtilityGetters() {
4342

4443
System.out.println(headers.toString());
4544

46-
KafkaHeaderUtility kafkaHeaderUtility = new KafkaHeaderUtility(record, connectorConfig);
45+
SplunkSinkRecord splunkSinkRecord = new SplunkSinkRecord(record, connectorConfig);
4746

48-
Assert.assertEquals("splunk.header.index", (kafkaHeaderUtility.getSplunkHeaderIndex()));
49-
Assert.assertEquals("splunk.header.host", (kafkaHeaderUtility.getSplunkHeaderHost()));
50-
Assert.assertEquals("splunk.header.source", (kafkaHeaderUtility.getSplunkHeaderSource()));
51-
Assert.assertEquals("splunk.header.sourcetype", (kafkaHeaderUtility.getSplunkHeaderSourcetype()));
47+
Assert.assertEquals("splunk.header.index", (splunkSinkRecord.getSplunkHeaderIndex()));
48+
Assert.assertEquals("splunk.header.host", (splunkSinkRecord.getSplunkHeaderHost()));
49+
Assert.assertEquals("splunk.header.source", (splunkSinkRecord.getSplunkHeaderSource()));
50+
Assert.assertEquals("splunk.header.sourcetype", (splunkSinkRecord.getSplunkHeaderSourcetype()));
5251
}
5352

5453
@Test
@@ -66,8 +65,7 @@ public void CompareRecordHeaders() {
6665

6766
SplunkSinkConnectorConfig connectorConfig = new SplunkSinkConnectorConfig(config);
6867

69-
KafkaHeaderUtility kafkaHeaderUtility = new KafkaHeaderUtility(record_1, connectorConfig);
70-
kafkaHeaderUtility.setHeaders(headers_1);
68+
SplunkSinkRecord splunkSinkRecord = new SplunkSinkRecord(record_1, connectorConfig);
7169

7270
SinkRecord record_2 = setupRecord();
7371

@@ -77,7 +75,7 @@ public void CompareRecordHeaders() {
7775
headers_2.addString("splunk.header.source", "headersource");
7876
headers_2.addString("splunk.header.sourcetype", "test message");
7977

80-
Assert.assertTrue(kafkaHeaderUtility.compareRecordHeaders(record_2));
78+
Assert.assertTrue(splunkSinkRecord.compareRecordHeaders(record_2));
8179

8280
SinkRecord record_3 = setupRecord();
8381

@@ -87,7 +85,7 @@ public void CompareRecordHeaders() {
8785
headers_3.addString("splunk.header.source", "headersource");
8886
headers_3.addString("splunk.header.sourcetype", "test message");
8987

90-
Assert.assertFalse(kafkaHeaderUtility.compareRecordHeaders(record_3));
88+
Assert.assertFalse(splunkSinkRecord.compareRecordHeaders(record_3));
9189
}
9290

9391
public SinkRecord setupRecord() {

0 commit comments

Comments
 (0)