|
17 | 17 |
|
18 | 18 | import com.splunk.hecclient.Event; |
19 | 19 | import com.splunk.hecclient.EventBatch; |
20 | | -import com.splunk.hecclient.JsonEvent; |
21 | 20 | import com.splunk.hecclient.RawEventBatch; |
22 | | -import org.apache.commons.logging.Log; |
23 | 21 | import org.apache.kafka.clients.consumer.OffsetAndMetadata; |
24 | 22 | import org.apache.kafka.common.TopicPartition; |
25 | 23 | import org.apache.kafka.common.config.ConfigException; |
| 24 | +import org.apache.kafka.common.header.Header; |
26 | 25 | import org.apache.kafka.common.record.TimestampType; |
27 | 26 | import org.apache.kafka.connect.errors.RetriableException; |
28 | 27 | import org.apache.kafka.connect.sink.SinkRecord; |
29 | 28 | import org.junit.Assert; |
30 | 29 | import org.junit.Test; |
31 | 30 |
|
32 | | -import java.text.ParseException; |
33 | 31 | import java.util.*; |
34 | 32 |
|
35 | 33 | public class SplunkSinkTaskTest { |
@@ -242,6 +240,25 @@ public void putWithNullEvent() { |
242 | 240 | task.stop(); |
243 | 241 | } |
244 | 242 |
|
| 243 | + @Test |
| 244 | + public void putWithNullIndexHeaderValue() { |
| 245 | + UnitUtil uu = new UnitUtil(0); |
| 246 | + Map<String, String> config = uu.createTaskConfig(); |
| 247 | + config.put(SplunkSinkConnectorConfig.RAW_CONF, String.valueOf(true)); |
| 248 | + config.put(SplunkSinkConnectorConfig.ACK_CONF, String.valueOf(true)); |
| 249 | + config.put(SplunkSinkConnectorConfig.MAX_BATCH_SIZE_CONF, String.valueOf(1)); |
| 250 | + config.put(SplunkSinkConnectorConfig.HEADER_SUPPORT_CONF, String.valueOf("true")); |
| 251 | + config.put(SplunkSinkConnectorConfig.HEADER_INDEX_CONF, "index"); |
| 252 | + SplunkSinkTask task = new SplunkSinkTask(); |
| 253 | + HecMock hec = new HecMock(task); |
| 254 | + hec.setSendReturnResult(HecMock.success); |
| 255 | + task.setHec(hec); |
| 256 | + task.start(config); |
| 257 | + task.put(createSinkRecordWithNullIndexHeaderValue()); |
| 258 | + Assert.assertEquals(1, hec.getBatches().size()); |
| 259 | + task.stop(); |
| 260 | + } |
| 261 | + |
245 | 262 | @Test |
246 | 263 | public void putWithRawAndAck() { |
247 | 264 | putWithSuccess(true, true); |
@@ -455,6 +472,12 @@ private Collection<SinkRecord> createNullSinkRecord() { |
455 | 472 | return records; |
456 | 473 | } |
457 | 474 |
|
| 475 | + private Collection<SinkRecord> createSinkRecordWithNullIndexHeaderValue() { |
| 476 | + List<SinkRecord> records = new ArrayList<>(createSinkRecords(1)); |
| 477 | + records.get(0).headers().add("index", null, null); |
| 478 | + return records; |
| 479 | + } |
| 480 | + |
458 | 481 | private List<TopicPartition> createTopicPartitionList() { |
459 | 482 | ArrayList<TopicPartition> tps = new ArrayList<>(); |
460 | 483 | tps.add(new TopicPartition("mytopic", 1)); |
|
0 commit comments