Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,17 @@ private void validateSplunkConfigurations(final Map<String, String> configs) thr
}

private void preparePayloadAndExecuteRequest(SplunkSinkConnectorConfig connectorConfig, String index) throws ConfigException {
Header[] headers = new Header[]{new BasicHeader("Authorization", String.format("Splunk %s", connectorConfig.splunkToken))};
Header[] headers;
if (connectorConfig.ack) {
headers = new Header[]{
new BasicHeader("Authorization", String.format("Splunk %s", connectorConfig.splunkToken)),
new BasicHeader("X-Splunk-Request-Channel", java.util.UUID.randomUUID().toString())
};
} else {
headers = new Header[]{
new BasicHeader("Authorization", String.format("Splunk %s", connectorConfig.splunkToken)),
};
}
String endpoint = "/services/collector";
String url = connectorConfig.splunkURI + endpoint;
final HttpPost httpPost = new HttpPost(url);
Expand Down Expand Up @@ -206,6 +216,4 @@ private void executeHttpRequest(final HttpUriRequest req, CloseableHttpClient ht
}
}
}


}
1 change: 1 addition & 0 deletions src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ private Event createHecEventFrom(final SinkRecord record) {
trackMetas.put("kafka_timestamp", String.valueOf(record.timestamp()));
trackMetas.put("kafka_topic", record.topic());
trackMetas.put("kafka_partition", String.valueOf(record.kafkaPartition()));
trackMetas.put("kafka_record_key", String.valueOf(record.key()));
if (HOSTNAME != null)
trackMetas.put("kafka_connect_host", HOSTNAME);
event.addFields(trackMetas);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,44 @@ public void testInvalidKerberosOnlyKeytabSet() {
assertHasErrorMessage(result, KERBEROS_KEYTAB_PATH_CONF, "must be set");
}

@Test
public void testInvalidJsonEventEnrichmentConfig1() {
final Map<String, String> configs = new HashMap<>();
addNecessaryConfigs(configs);
SinkConnector connector = new SplunkSinkConnector();
configs.put("topics", "b");
configs.put("tasks_max", "3");
configs.put("splunk.hec.json.event.enrichment", "k1=v1 k2=v2");
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs));
}

@Test
public void testInvalidJsonEventEnrichmentConfig2() {
final Map<String, String> configs = new HashMap<>();
addNecessaryConfigs(configs);
SinkConnector connector = new SplunkSinkConnector();
configs.put("topics", "b");
configs.put("splunk.hec.json.event.enrichment", "testing-testing non KV");
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs));
}

@Test
public void testInvalidJsonEventEnrichmentConfig3() {
final Map<String, String> configs = new HashMap<>();
addNecessaryConfigs(configs);
SinkConnector connector = new SplunkSinkConnector();
configs.put("topics", "b");
configs.put("tasks_max", "3");
configs.put("splunk.hec.json.event.enrichment", "k1=v1 k2=v2");
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs));
}

@Test
public void testInvalidToken() {
final Map<String, String> configs = new HashMap<>();
Expand All @@ -144,6 +182,18 @@ public void testInvalidToken() {
Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs));
}

@Test
public void testNullHecToken() {
final Map<String, String> configs = new HashMap<>();
addNecessaryConfigs(configs);
SinkConnector connector = new SplunkSinkConnector();
configs.put("topics", "b");
configs.put("splunk.hec.token", null);
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
Assertions.assertThrows(java.lang.NullPointerException.class, ()->connector.validate(configs));
}

@Test
public void testInvalidIndex() {
final Map<String, String> configs = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ private void putWithSuccess(boolean raw, boolean withMeta) {
Assert.assertEquals(String.valueOf(1), event.getFields().get("kafka_partition"));
Assert.assertEquals(new UnitUtil(0).configProfile.getTopics(), event.getFields().get("kafka_topic"));
Assert.assertEquals(String.valueOf(0), event.getFields().get("kafka_timestamp"));
Assert.assertEquals("test", event.getFields().get("kafka_record_key"));
j++;
}

Expand All @@ -441,7 +442,7 @@ private Collection<SinkRecord> createSinkRecords(int numOfRecords, String value)
private Collection<SinkRecord> createSinkRecords(int numOfRecords, int start, String value) {
List<SinkRecord> records = new ArrayList<>();
for (int i = start; i < start + numOfRecords; i++) {
SinkRecord rec = new SinkRecord(new UnitUtil(0).configProfile.getTopics(), 1, null, null, null, value, i, 0L, TimestampType.NO_TIMESTAMP_TYPE);
SinkRecord rec = new SinkRecord(new UnitUtil(0).configProfile.getTopics(), 1, null, "test", null, value, i, 0L, TimestampType.NO_TIMESTAMP_TYPE);
records.add(rec);
}
return records;
Expand Down
4 changes: 2 additions & 2 deletions test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def setup(request):
def pytest_configure():
# Generate message data
topics = [config["kafka_topic"], config["kafka_topic_2"], config["kafka_header_topic"],"prototopic",
"test_splunk_hec_malformed_events","epoch_format","date_format"]
"test_splunk_hec_malformed_events","epoch_format","date_format","record_key"]

create_kafka_topics(config, topics)
producer = KafkaProducer(bootstrap_servers=config["kafka_broker_url"],
Expand Down Expand Up @@ -67,9 +67,9 @@ def pytest_configure():
('splunk.header.source', b'kafka_custom_header_source'),
('splunk.header.sourcetype', b'kafka_custom_header_sourcetype')]
producer.send(config["kafka_header_topic"], msg, headers=headers_to_send)

producer.send("test_splunk_hec_malformed_events", {})
producer.send("test_splunk_hec_malformed_events", {"&&": "null", "message": ["$$$$****////", 123, None]})
producer.send("record_key",{"timestamp": config['timestamp']},b"{}")
protobuf_producer.send("prototopic",value=b'\x00\x00\x00\x00\x01\x00\n\x011\x12\r10-01-04-3:45\x18\x15%\x00\x00*C*\x02No:\x12\n\x011\x12\x04this\x1a\x07New oneB\x0c\n\x011\x12\x07shampooJ\x04Many')
timestamp_producer.send("date_format",b"{\"id\": \"19\",\"host\":\"host-01\",\"source\":\"bu\",\"fields\":{\"hn\":\"hostname\",\"CLASS\":\"class\",\"cust_id\":\"000013934\",\"time\": \"Jun 13 2010 23:11:52.454 UTC\",\"category\":\"IFdata\",\"ifname\":\"LoopBack7\",\"IFdata.Bits received\":\"0\",\"IFdata.Bits sent\":\"0\"}")
timestamp_producer.send("epoch_format",b"{\"id\": \"19\",\"host\":\"host-01\",\"source\":\"bu\",\"fields\":{\"hn\":\"hostname\",\"CLASS\":\"class\",\"cust_id\":\"000013934\",\"time\": \"1555209605000\",\"category\":\"IFdata\",\"ifname\":\"LoopBack7\",\"IFdata.Bits received\":\"0\",\"IFdata.Bits sent\":\"0\"}")
Expand Down
17 changes: 6 additions & 11 deletions test/lib/connect_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,6 @@
"tasks_max": "3",
"splunk_hec_raw": False,
"splunk_hec_json_event_enrichment": "chars=test_tasks_max_3_hec_raw_false"},
{"name": "test_tasks_max_null",
"tasks_max": "null",
"splunk_hec_raw": False,
"splunk_hec_json_event_enrichment": "chars=test_tasks_max_null"},
{"name": "test_1_source_hec_raw_true",
"splunk_hec_raw": True,
"splunk_sources": "test_1_source_hec_raw_true"},
Expand Down Expand Up @@ -179,12 +175,6 @@
"splunk_hec_raw": True,
"splunk_hec_json_event_formatted": "false",
"splunk_sourcetypes": "test_splunk_hec_json_event_formatted_false_raw_data"},
{"name": "test_empty_hec_token",
"splunk_hec_token": None,
"splunk_hec_json_event_enrichment": "chars=test_empty_hec_token"},
{"name": "test_incorrect_hec_token",
"splunk_hec_token": "dummy-tocken",
"splunk_hec_json_event_enrichment": "chars=test_incorrect_hec_token"},
{"name": "test_splunk_hec_empty_event",
"topics": "test_splunk_hec_malformed_events",
"splunk_hec_raw": False,
Expand Down Expand Up @@ -214,5 +204,10 @@
"splunk_hec_raw": False,
"enable_timestamp_extraction" : "true",
"timestamp_regex": r"\\\"time\\\":\\s*\\\"(?<time>.*?)\"",
"timestamp_format": "epoch"}
"timestamp_format": "epoch"},
{"name": "test_extracted_record_key",
"splunk_sourcetypes": "track_record_key",
"topics": "record_key",
"splunk_hec_raw": False,
"splunk_hec_track_data": "true"}
]
3 changes: 2 additions & 1 deletion test/lib/connector.template
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"value.converter.schemas.enable": "{{value_converter_schemas_enable}}",
"enable.timestamp.extraction": "{{enable_timestamp_extraction}}",
"timestamp.regex": "{{timestamp_regex}}",
"timestamp.format": "{{timestamp_format}}"
"timestamp.format": "{{timestamp_format}}",
"splunk.hec.track.data": "{{splunk_hec_track_data}}"
}
}
3 changes: 2 additions & 1 deletion test/lib/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ def generate_connector_content(input_disc=None):
"value_converter_schemas_enable": "false",
"enable_timestamp_extraction": "false",
"regex": "",
"timestamp_format": ""
"timestamp_format": "",
"splunk_hec_track_data": "false"
}

if input_disc:
Expand Down
17 changes: 1 addition & 16 deletions test/testcases/test_configurations.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def setup_class(self, setup):
("test_tasks_max_1_hec_raw_false", "chars::test_tasks_max_1_hec_raw_false", 3),
# ("test_tasks_max_3_hec_raw_true", "sourcetype::raw_data-tasks_max_3", 1),
# ("test_tasks_max_3_hec_raw_false", "chars::test_tasks_max_3_hec_raw_false", 3),
("test_tasks_max_null", "chars::test_tasks_max_null", 0)
# ("test_tasks_max_null", "chars::test_tasks_max_null", 0)
])
def test_tasks_max(self, setup, test_scenario, test_input, expected):
logger.info(f"testing {test_scenario} input={test_input} expected={expected} event(s)")
Expand Down Expand Up @@ -141,21 +141,6 @@ def test_header_support_true_event_data(self, setup, test_scenario, test_input,
logger.info("Splunk received %s events in the last 15m", len(events))
assert len(events) == expected

@pytest.mark.parametrize("test_case, test_input, expected", [
("test_incorrect_hec_token", "chars::test_incorrect_hec_token", 0),
("test_empty_hec_token", "chars::test_empty_hec_token", 0)
])
def test_create_connector_with_incorrect_hec_token(self, setup, test_case, test_input, expected):
search_query = f"index={setup['kafka_header_index']} | search timestamp=\"{setup['timestamp']}\" {test_input}"
logger.info(search_query)
events = check_events_from_splunk(start_time="-15m@m",
url=setup["splunkd_url"],
user=setup["splunk_user"],
query=[f"search {search_query}"],
password=setup["splunk_password"])
logger.info("Splunk received %s events in the last 15m", len(events))
assert len(events) == expected

@pytest.mark.parametrize("test_scenario, test_input, expected", [
("test_splunk_hec_json_event_formatted_true_event_data",
"chars::test_splunk_hec_json_event_formatted_true_event_data", 3),
Expand Down
21 changes: 8 additions & 13 deletions test/testcases/test_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def test_valid_crud_tasks(self, setup, test_input, expected):
"tasks.max": "3",
"topics": setup["kafka_topic"],
"splunk.indexes": setup["splunk_index"],
"splunk.hec.uri": setup["splunkd_url"],
"splunk.hec.uri": setup["splunk_hec_url"],
"splunk.hec.token": setup["splunk_token"],
"splunk.hec.raw": "false",
"splunk.hec.ack.enabled": "false",
Expand All @@ -56,7 +56,7 @@ def test_valid_crud_tasks(self, setup, test_input, expected):
"tasks.max": "5",
"topics": setup["kafka_topic"],
"splunk.indexes": setup["splunk_index"],
"splunk.hec.uri": setup["splunkd_url"],
"splunk.hec.uri": setup["splunk_hec_url"],
"splunk.hec.token": setup["splunk_token"],
"splunk.hec.raw": "false",
"splunk.hec.ack.enabled": "false",
Expand Down Expand Up @@ -101,30 +101,25 @@ def test_invalid_crud_tasks(self, setup, test_case, config_input, expected):

@pytest.mark.parametrize("test_case, config_input, expected", [
("event_enrichment_non_key_value", {"name": "event_enrichment_non_key_value",
"splunk_hec_json_event_enrichment": "testing-testing non KV"},
["FAILED"]),
"splunk_hec_json_event_enrichment": "testing-testing non KV"}, False),
("event_enrichment_non_key_value_3_tasks", {"name": "event_enrichment_non_key_value_3_tasks",
"tasks_max": "3",
"splunk_hec_json_event_enrichment": "testing-testing non KV"},
["FAILED", "FAILED", "FAILED"]),
"splunk_hec_json_event_enrichment": "testing-testing non KV"}, False),
("event_enrichment_not_separated_by_commas", {"name": "event_enrichment_not_separated_by_commas",
"splunk_hec_json_event_enrichment": "key1=value1 key2=value2"},
["FAILED"]),
"splunk_hec_json_event_enrichment": "key1=value1 key2=value2"}, False),
("event_enrichment_not_separated_by_commas_3_tasks", {"name": "event_enrichment_not_separated_by_commas_3_tasks",
"tasks_max": "3",
"splunk_hec_json_event_enrichment": "key1=value1 key2=value2"},
["FAILED", "FAILED", "FAILED"])
"splunk_hec_json_event_enrichment": "key1=value1 key2=value2"}, False)

])
def test_invalid_crud_event_enrichment_tasks(self, setup, test_case, config_input, expected):
'''
Test that invalid event_enrichment kafka connect task can be created but task status should be FAILED
and no data should enter splunk
'''
logger.info(f"testing {test_case} input={config_input} expected={expected} ")
logger.info(f"testing {test_case} input={config_input}")

connector_definition_invalid_tasks = generate_connector_content(config_input)
setup['connectors'].append(test_case)

assert create_kafka_connector(setup, connector_definition_invalid_tasks) is True
assert get_running_kafka_connector_task_status(setup, connector_definition_invalid_tasks) == expected
assert create_kafka_connector(setup, connector_definition_invalid_tasks) == expected
19 changes: 19 additions & 0 deletions test/testcases/test_data_enrichment.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,22 @@ def test_line_breaking_configuration(self, setup, test_case, test_input, expecte
setup["timestamp"], setup["timestamp"], setup["timestamp"])
assert actual_raw_data == expected_data, \
f'\nActual value: \n{actual_raw_data} \ndoes not match expected value: \n{expected_data}'

@pytest.mark.parametrize("test_scenario, test_input, expected", [
("record_key_extraction", "sourcetype::track_record_key", "{}"),
])
def test_record_key_data_enrichment(self, setup, test_scenario, test_input, expected):
logger.info(f"testing {test_scenario} input={test_input} expected={expected} event(s)")
search_query = f"index={setup['splunk_index']} | search {test_input} | fields *"
logger.info(search_query)
events = check_events_from_splunk(start_time="-15m@m",
url=setup["splunkd_url"],
user=setup["splunk_user"],
query=[f"search {search_query}"],
password=setup["splunk_password"])
logger.info("Splunk received %s events in the last hour", len(events))

if(len(events)==1):
assert events[0]["kafka_record_key"] == expected
else:
assert False,"No event found or duplicate events found"