From 4528b17d160f679cb7e6301c68cc3b240a3492dc Mon Sep 17 00:00:00 2001 From: Brett Randall Date: Tue, 10 Oct 2023 21:09:29 +1100 Subject: [PATCH] Fixed potential NullPointerException for Kafka headers with null value. In Connect, a message Header can be null, so additional null-checks are required when processing header values e.g. for the Splunk index, to avoid an NPE. Adds a failing -> passing test. --- .../splunk/kafka/connect/SplunkSinkTask.java | 16 +++++----- .../kafka/connect/SplunkSinkTaskTest.java | 29 +++++++++++++++++-- 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java index 9179a9f1..6bc8ae61 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java @@ -222,7 +222,7 @@ public String headerId(SinkRecord sinkRecord) { StringBuilder headerString = new StringBuilder(); - if(indexHeader != null) { + if(indexHeader != null && indexHeader.value() != null) { headerString.append(indexHeader.value().toString()); } else { if(metas != null) { @@ -232,7 +232,7 @@ public String headerId(SinkRecord sinkRecord) { headerString.append(insertHeaderToken()); - if(hostHeader != null) { + if(hostHeader != null && hostHeader.value() != null) { headerString.append(hostHeader.value().toString()); } else { if(metas != null) { @@ -242,7 +242,7 @@ public String headerId(SinkRecord sinkRecord) { headerString.append(insertHeaderToken()); - if(sourceHeader != null) { + if(sourceHeader != null && sourceHeader.value() != null) { headerString.append(sourceHeader.value().toString()); } else { if(metas != null) { @@ -252,7 +252,7 @@ public String headerId(SinkRecord sinkRecord) { headerString.append(insertHeaderToken()); - if(sourcetypeHeader != null) { + if(sourcetypeHeader != null && sourcetypeHeader.value() != null) { headerString.append(sourcetypeHeader.value().toString()); } else { if(metas != null) { @@ -450,16 +450,16 @@ private Event addHeaders(Event event, SinkRecord record) { Header headerSource = headers.lastWithName(connectorConfig.headerSource); Header headerSourcetype = headers.lastWithName(connectorConfig.headerSourcetype); - if (headerIndex != null) { + if (headerIndex != null && headerIndex.value() != null) { event.setIndex(headerIndex.value().toString()); } - if (headerHost != null) { + if (headerHost != null && headerHost.value() != null) { event.setHost(headerHost.value().toString()); } - if (headerSource != null) { + if (headerSource != null && headerSource.value() != null) { event.setSource(headerSource.value().toString()); } - if (headerSourcetype != null) { + if (headerSourcetype != null && headerSourcetype.value() != null) { event.setSourcetype(headerSourcetype.value().toString()); } diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java index f5ae931e..c9014ecc 100644 --- a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java +++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java @@ -17,19 +17,17 @@ import com.splunk.hecclient.Event; import com.splunk.hecclient.EventBatch; -import com.splunk.hecclient.JsonEvent; import com.splunk.hecclient.RawEventBatch; -import org.apache.commons.logging.Log; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.header.Header; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.Assert; import org.junit.Test; -import java.text.ParseException; import java.util.*; public class SplunkSinkTaskTest { @@ -242,6 +240,25 @@ public void putWithNullEvent() { task.stop(); } + @Test + public void putWithNullIndexHeaderValue() { + UnitUtil uu = new UnitUtil(0); + Map config = uu.createTaskConfig(); + config.put(SplunkSinkConnectorConfig.RAW_CONF, String.valueOf(true)); + config.put(SplunkSinkConnectorConfig.ACK_CONF, String.valueOf(true)); + config.put(SplunkSinkConnectorConfig.MAX_BATCH_SIZE_CONF, String.valueOf(1)); + config.put(SplunkSinkConnectorConfig.HEADER_SUPPORT_CONF, String.valueOf("true")); + config.put(SplunkSinkConnectorConfig.HEADER_INDEX_CONF, "index"); + SplunkSinkTask task = new SplunkSinkTask(); + HecMock hec = new HecMock(task); + hec.setSendReturnResult(HecMock.success); + task.setHec(hec); + task.start(config); + task.put(createSinkRecordWithNullIndexHeaderValue()); + Assert.assertEquals(1, hec.getBatches().size()); + task.stop(); + } + @Test public void putWithRawAndAck() { putWithSuccess(true, true); @@ -455,6 +472,12 @@ private Collection createNullSinkRecord() { return records; } + private Collection createSinkRecordWithNullIndexHeaderValue() { + List records = new ArrayList<>(createSinkRecords(1)); + records.get(0).headers().add("index", null, null); + return records; + } + private List createTopicPartitionList() { ArrayList tps = new ArrayList<>(); tps.add(new TopicPartition("mytopic", 1));