diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java index ee0b28f8..b80c542d 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java @@ -158,7 +158,17 @@ private void validateSplunkConfigurations(final Map configs) thr } private void preparePayloadAndExecuteRequest(SplunkSinkConnectorConfig connectorConfig, String index) throws ConfigException { - Header[] headers = new Header[]{new BasicHeader("Authorization", String.format("Splunk %s", connectorConfig.splunkToken))}; + Header[] headers; + if (connectorConfig.ack) { + headers = new Header[]{ + new BasicHeader("Authorization", String.format("Splunk %s", connectorConfig.splunkToken)), + new BasicHeader("X-Splunk-Request-Channel", java.util.UUID.randomUUID().toString()) + }; + } else { + headers = new Header[]{ + new BasicHeader("Authorization", String.format("Splunk %s", connectorConfig.splunkToken)), + }; + } String endpoint = "/services/collector"; String url = connectorConfig.splunkURI + endpoint; final HttpPost httpPost = new HttpPost(url); @@ -206,6 +216,4 @@ private void executeHttpRequest(final HttpUriRequest req, CloseableHttpClient ht } } } - - } \ No newline at end of file diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java index af1494ee..9179a9f1 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java @@ -429,6 +429,7 @@ private Event createHecEventFrom(final SinkRecord record) { trackMetas.put("kafka_timestamp", String.valueOf(record.timestamp())); trackMetas.put("kafka_topic", record.topic()); trackMetas.put("kafka_partition", String.valueOf(record.kafkaPartition())); + trackMetas.put("kafka_record_key", String.valueOf(record.key())); if (HOSTNAME != null) trackMetas.put("kafka_connect_host", HOSTNAME); event.addFields(trackMetas); diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java index 58308348..07c302d5 100644 --- a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java +++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java @@ -131,6 +131,44 @@ public void testInvalidKerberosOnlyKeytabSet() { assertHasErrorMessage(result, KERBEROS_KEYTAB_PATH_CONF, "must be set"); } + @Test + public void testInvalidJsonEventEnrichmentConfig1() { + final Map configs = new HashMap<>(); + addNecessaryConfigs(configs); + SinkConnector connector = new SplunkSinkConnector(); + configs.put("topics", "b"); + configs.put("tasks_max", "3"); + configs.put("splunk.hec.json.event.enrichment", "k1=v1 k2=v2"); + MockHecClientWrapper clientInstance = new MockHecClientWrapper(); + ((SplunkSinkConnector) connector).setHecInstance(clientInstance); + Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs)); + } + + @Test + public void testInvalidJsonEventEnrichmentConfig2() { + final Map configs = new HashMap<>(); + addNecessaryConfigs(configs); + SinkConnector connector = new SplunkSinkConnector(); + configs.put("topics", "b"); + configs.put("splunk.hec.json.event.enrichment", "testing-testing non KV"); + MockHecClientWrapper clientInstance = new MockHecClientWrapper(); + ((SplunkSinkConnector) connector).setHecInstance(clientInstance); + Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs)); + } + + @Test + public void testInvalidJsonEventEnrichmentConfig3() { + final Map configs = new HashMap<>(); + addNecessaryConfigs(configs); + SinkConnector connector = new SplunkSinkConnector(); + configs.put("topics", "b"); + configs.put("tasks_max", "3"); + configs.put("splunk.hec.json.event.enrichment", "k1=v1 k2=v2"); + MockHecClientWrapper clientInstance = new MockHecClientWrapper(); + ((SplunkSinkConnector) connector).setHecInstance(clientInstance); + Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs)); + } + @Test public void testInvalidToken() { final Map configs = new HashMap<>(); @@ -144,6 +182,18 @@ public void testInvalidToken() { Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs)); } + @Test + public void testNullHecToken() { + final Map configs = new HashMap<>(); + addNecessaryConfigs(configs); + SinkConnector connector = new SplunkSinkConnector(); + configs.put("topics", "b"); + configs.put("splunk.hec.token", null); + MockHecClientWrapper clientInstance = new MockHecClientWrapper(); + ((SplunkSinkConnector) connector).setHecInstance(clientInstance); + Assertions.assertThrows(java.lang.NullPointerException.class, ()->connector.validate(configs)); + } + @Test public void testInvalidIndex() { final Map configs = new HashMap<>(); diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java index 2a8435bc..f5ae931e 100644 --- a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java +++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java @@ -415,6 +415,7 @@ private void putWithSuccess(boolean raw, boolean withMeta) { Assert.assertEquals(String.valueOf(1), event.getFields().get("kafka_partition")); Assert.assertEquals(new UnitUtil(0).configProfile.getTopics(), event.getFields().get("kafka_topic")); Assert.assertEquals(String.valueOf(0), event.getFields().get("kafka_timestamp")); + Assert.assertEquals("test", event.getFields().get("kafka_record_key")); j++; } @@ -441,7 +442,7 @@ private Collection createSinkRecords(int numOfRecords, String value) private Collection createSinkRecords(int numOfRecords, int start, String value) { List records = new ArrayList<>(); for (int i = start; i < start + numOfRecords; i++) { - SinkRecord rec = new SinkRecord(new UnitUtil(0).configProfile.getTopics(), 1, null, null, null, value, i, 0L, TimestampType.NO_TIMESTAMP_TYPE); + SinkRecord rec = new SinkRecord(new UnitUtil(0).configProfile.getTopics(), 1, null, "test", null, value, i, 0L, TimestampType.NO_TIMESTAMP_TYPE); records.add(rec); } return records; diff --git a/test/conftest.py b/test/conftest.py index 0db4956d..45dd400d 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -39,7 +39,7 @@ def setup(request): def pytest_configure(): # Generate message data topics = [config["kafka_topic"], config["kafka_topic_2"], config["kafka_header_topic"],"prototopic", - "test_splunk_hec_malformed_events","epoch_format","date_format"] + "test_splunk_hec_malformed_events","epoch_format","date_format","record_key"] create_kafka_topics(config, topics) producer = KafkaProducer(bootstrap_servers=config["kafka_broker_url"], @@ -67,9 +67,9 @@ def pytest_configure(): ('splunk.header.source', b'kafka_custom_header_source'), ('splunk.header.sourcetype', b'kafka_custom_header_sourcetype')] producer.send(config["kafka_header_topic"], msg, headers=headers_to_send) - producer.send("test_splunk_hec_malformed_events", {}) producer.send("test_splunk_hec_malformed_events", {"&&": "null", "message": ["$$$$****////", 123, None]}) + producer.send("record_key",{"timestamp": config['timestamp']},b"{}") protobuf_producer.send("prototopic",value=b'\x00\x00\x00\x00\x01\x00\n\x011\x12\r10-01-04-3:45\x18\x15%\x00\x00*C*\x02No:\x12\n\x011\x12\x04this\x1a\x07New oneB\x0c\n\x011\x12\x07shampooJ\x04Many') timestamp_producer.send("date_format",b"{\"id\": \"19\",\"host\":\"host-01\",\"source\":\"bu\",\"fields\":{\"hn\":\"hostname\",\"CLASS\":\"class\",\"cust_id\":\"000013934\",\"time\": \"Jun 13 2010 23:11:52.454 UTC\",\"category\":\"IFdata\",\"ifname\":\"LoopBack7\",\"IFdata.Bits received\":\"0\",\"IFdata.Bits sent\":\"0\"}") timestamp_producer.send("epoch_format",b"{\"id\": \"19\",\"host\":\"host-01\",\"source\":\"bu\",\"fields\":{\"hn\":\"hostname\",\"CLASS\":\"class\",\"cust_id\":\"000013934\",\"time\": \"1555209605000\",\"category\":\"IFdata\",\"ifname\":\"LoopBack7\",\"IFdata.Bits received\":\"0\",\"IFdata.Bits sent\":\"0\"}") diff --git a/test/lib/connect_params.py b/test/lib/connect_params.py index 1833292b..5cc6e4ef 100644 --- a/test/lib/connect_params.py +++ b/test/lib/connect_params.py @@ -83,10 +83,6 @@ "tasks_max": "3", "splunk_hec_raw": False, "splunk_hec_json_event_enrichment": "chars=test_tasks_max_3_hec_raw_false"}, - {"name": "test_tasks_max_null", - "tasks_max": "null", - "splunk_hec_raw": False, - "splunk_hec_json_event_enrichment": "chars=test_tasks_max_null"}, {"name": "test_1_source_hec_raw_true", "splunk_hec_raw": True, "splunk_sources": "test_1_source_hec_raw_true"}, @@ -179,12 +175,6 @@ "splunk_hec_raw": True, "splunk_hec_json_event_formatted": "false", "splunk_sourcetypes": "test_splunk_hec_json_event_formatted_false_raw_data"}, - {"name": "test_empty_hec_token", - "splunk_hec_token": None, - "splunk_hec_json_event_enrichment": "chars=test_empty_hec_token"}, - {"name": "test_incorrect_hec_token", - "splunk_hec_token": "dummy-tocken", - "splunk_hec_json_event_enrichment": "chars=test_incorrect_hec_token"}, {"name": "test_splunk_hec_empty_event", "topics": "test_splunk_hec_malformed_events", "splunk_hec_raw": False, @@ -214,5 +204,10 @@ "splunk_hec_raw": False, "enable_timestamp_extraction" : "true", "timestamp_regex": r"\\\"time\\\":\\s*\\\"(?