From b55b1259f432db899708d14b7397ffd4c32d8f0f Mon Sep 17 00:00:00 2001 From: Brett Randall Date: Tue, 10 Oct 2023 21:09:29 +1100 Subject: [PATCH] Fixed potential NullPointerException for Kafka headers with null value. In Connect, a message Header can be null, so additional null-checks are required when processing header values e.g. for the Splunk index, to avoid an NPE. Adds a failing -> passing test. --- .../splunk/kafka/connect/SplunkSinkTask.java | 16 ++++++------ .../kafka/connect/SplunkSinkTaskTest.java | 25 +++++++++++++++++++ 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java index 407c2c97..ee9e969b 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java @@ -212,7 +212,7 @@ public String headerId(SinkRecord sinkRecord) { StringBuilder headerString = new StringBuilder(); - if(indexHeader != null) { + if(indexHeader != null && indexHeader.value() != null) { headerString.append(indexHeader.value().toString()); } else { if(metas != null) { @@ -222,7 +222,7 @@ public String headerId(SinkRecord sinkRecord) { headerString.append(insertHeaderToken()); - if(hostHeader != null) { + if(hostHeader != null && hostHeader.value() != null) { headerString.append(hostHeader.value().toString()); } else { if(metas != null) { @@ -232,7 +232,7 @@ public String headerId(SinkRecord sinkRecord) { headerString.append(insertHeaderToken()); - if(sourceHeader != null) { + if(sourceHeader != null && sourceHeader.value() != null) { headerString.append(sourceHeader.value().toString()); } else { if(metas != null) { @@ -242,7 +242,7 @@ public String headerId(SinkRecord sinkRecord) { headerString.append(insertHeaderToken()); - if(sourcetypeHeader != null) { + if(sourcetypeHeader != null && sourcetypeHeader.value() != null) { headerString.append(sourcetypeHeader.value().toString()); } else { if(metas != null) { @@ -438,16 +438,16 @@ private Event addHeaders(Event event, SinkRecord record) { Header headerSource = headers.lastWithName(connectorConfig.headerSource); Header headerSourcetype = headers.lastWithName(connectorConfig.headerSourcetype); - if (headerIndex != null) { + if (headerIndex != null && headerIndex.value() != null) { event.setIndex(headerIndex.value().toString()); } - if (headerHost != null) { + if (headerHost != null && headerHost.value() != null) { event.setHost(headerHost.value().toString()); } - if (headerSource != null) { + if (headerSource != null && headerSource.value() != null) { event.setSource(headerSource.value().toString()); } - if (headerSourcetype != null) { + if (headerSourcetype != null && headerSourcetype.value() != null) { event.setSourcetype(headerSourcetype.value().toString()); } diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java index b430e860..300b5eb1 100644 --- a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java +++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java @@ -241,6 +241,25 @@ public void putWithNullEvent() { task.stop(); } + @Test + public void putWithNullIndexHeaderValue() { + UnitUtil uu = new UnitUtil(0); + Map config = uu.createTaskConfig(); + config.put(SplunkSinkConnectorConfig.RAW_CONF, String.valueOf(true)); + config.put(SplunkSinkConnectorConfig.ACK_CONF, String.valueOf(true)); + config.put(SplunkSinkConnectorConfig.MAX_BATCH_SIZE_CONF, String.valueOf(1)); + config.put(SplunkSinkConnectorConfig.HEADER_SUPPORT_CONF, String.valueOf("true")); + config.put(SplunkSinkConnectorConfig.HEADER_INDEX_CONF, "index"); + SplunkSinkTask task = new SplunkSinkTask(); + HecMock hec = new HecMock(task); + hec.setSendReturnResult(HecMock.success); + task.setHec(hec); + task.start(config); + task.put(createSinkRecordWithNullIndexHeaderValue()); + Assert.assertEquals(1, hec.getBatches().size()); + task.stop(); + } + @Test public void putWithRawAndAck() { putWithSuccess(true, true); @@ -341,6 +360,12 @@ private Collection createNullSinkRecord() { return records; } + private Collection createSinkRecordWithNullIndexHeaderValue() { + List records = new ArrayList<>(createSinkRecords(1)); + records.get(0).headers().add("index", null, null); + return records; + } + private List createTopicPartitionList() { ArrayList tps = new ArrayList<>(); tps.add(new TopicPartition("mytopic", 1));