Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,4 @@ private void executeHttpRequest(final HttpUriRequest req, CloseableHttpClient ht
}
}
}


}
1 change: 1 addition & 0 deletions src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ private Event createHecEventFrom(final SinkRecord record) {
trackMetas.put("kafka_timestamp", String.valueOf(record.timestamp()));
trackMetas.put("kafka_topic", record.topic());
trackMetas.put("kafka_partition", String.valueOf(record.kafkaPartition()));
trackMetas.put("kafka_record_key", String.valueOf(record.key()));
if (HOSTNAME != null)
trackMetas.put("kafka_connect_host", HOSTNAME);
event.addFields(trackMetas);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ private void putWithSuccess(boolean raw, boolean withMeta) {
Assert.assertEquals(String.valueOf(1), event.getFields().get("kafka_partition"));
Assert.assertEquals(new UnitUtil(0).configProfile.getTopics(), event.getFields().get("kafka_topic"));
Assert.assertEquals(String.valueOf(0), event.getFields().get("kafka_timestamp"));
Assert.assertEquals("test", event.getFields().get("kafka_record_key"));
j++;
}

Expand All @@ -441,7 +442,7 @@ private Collection<SinkRecord> createSinkRecords(int numOfRecords, String value)
private Collection<SinkRecord> createSinkRecords(int numOfRecords, int start, String value) {
List<SinkRecord> records = new ArrayList<>();
for (int i = start; i < start + numOfRecords; i++) {
SinkRecord rec = new SinkRecord(new UnitUtil(0).configProfile.getTopics(), 1, null, null, null, value, i, 0L, TimestampType.NO_TIMESTAMP_TYPE);
SinkRecord rec = new SinkRecord(new UnitUtil(0).configProfile.getTopics(), 1, null, "test", null, value, i, 0L, TimestampType.NO_TIMESTAMP_TYPE);
records.add(rec);
}
return records;
Expand Down
4 changes: 2 additions & 2 deletions test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def setup(request):
def pytest_configure():
# Generate message data
topics = [config["kafka_topic"], config["kafka_topic_2"], config["kafka_header_topic"],"prototopic",
"test_splunk_hec_malformed_events","epoch_format","date_format"]
"test_splunk_hec_malformed_events","epoch_format","date_format","record_key"]

create_kafka_topics(config, topics)
producer = KafkaProducer(bootstrap_servers=config["kafka_broker_url"],
Expand Down Expand Up @@ -67,9 +67,9 @@ def pytest_configure():
('splunk.header.source', b'kafka_custom_header_source'),
('splunk.header.sourcetype', b'kafka_custom_header_sourcetype')]
producer.send(config["kafka_header_topic"], msg, headers=headers_to_send)

producer.send("test_splunk_hec_malformed_events", {})
producer.send("test_splunk_hec_malformed_events", {"&&": "null", "message": ["$$$$****////", 123, None]})
producer.send("record_key",{"timestamp": config['timestamp']},b"{}")
protobuf_producer.send("prototopic",value=b'\x00\x00\x00\x00\x01\x00\n\x011\x12\r10-01-04-3:45\x18\x15%\x00\x00*C*\x02No:\x12\n\x011\x12\x04this\x1a\x07New oneB\x0c\n\x011\x12\x07shampooJ\x04Many')
timestamp_producer.send("date_format",b"{\"id\": \"19\",\"host\":\"host-01\",\"source\":\"bu\",\"fields\":{\"hn\":\"hostname\",\"CLASS\":\"class\",\"cust_id\":\"000013934\",\"time\": \"Jun 13 2010 23:11:52.454 UTC\",\"category\":\"IFdata\",\"ifname\":\"LoopBack7\",\"IFdata.Bits received\":\"0\",\"IFdata.Bits sent\":\"0\"}")
timestamp_producer.send("epoch_format",b"{\"id\": \"19\",\"host\":\"host-01\",\"source\":\"bu\",\"fields\":{\"hn\":\"hostname\",\"CLASS\":\"class\",\"cust_id\":\"000013934\",\"time\": \"1555209605000\",\"category\":\"IFdata\",\"ifname\":\"LoopBack7\",\"IFdata.Bits received\":\"0\",\"IFdata.Bits sent\":\"0\"}")
Expand Down
9 changes: 7 additions & 2 deletions test/lib/connect_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,5 +204,10 @@
"splunk_hec_raw": False,
"enable_timestamp_extraction" : "true",
"timestamp_regex": r"\\\"time\\\":\\s*\\\"(?<time>.*?)\"",
"timestamp_format": "epoch"}
]
"timestamp_format": "epoch"},
{"name": "test_extracted_record_key",
"splunk_sourcetypes": "track_record_key",
"topics": "record_key",
"splunk_hec_raw": False,
"splunk_hec_track_data": "true"}
]
3 changes: 2 additions & 1 deletion test/lib/connector.template
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"value.converter.schemas.enable": "{{value_converter_schemas_enable}}",
"enable.timestamp.extraction": "{{enable_timestamp_extraction}}",
"timestamp.regex": "{{timestamp_regex}}",
"timestamp.format": "{{timestamp_format}}"
"timestamp.format": "{{timestamp_format}}",
"splunk.hec.track.data": "{{splunk_hec_track_data}}"
}
}
3 changes: 2 additions & 1 deletion test/lib/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ def generate_connector_content(input_disc=None):
"value_converter_schemas_enable": "false",
"enable_timestamp_extraction": "false",
"regex": "",
"timestamp_format": ""
"timestamp_format": "",
"splunk_hec_track_data": "false"
}

if input_disc:
Expand Down
19 changes: 19 additions & 0 deletions test/testcases/test_data_enrichment.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,22 @@ def test_line_breaking_configuration(self, setup, test_case, test_input, expecte
setup["timestamp"], setup["timestamp"], setup["timestamp"])
assert actual_raw_data == expected_data, \
f'\nActual value: \n{actual_raw_data} \ndoes not match expected value: \n{expected_data}'

@pytest.mark.parametrize("test_scenario, test_input, expected", [
("record_key_extraction", "sourcetype::track_record_key", "{}"),
])
def test_record_key_data_enrichment(self, setup, test_scenario, test_input, expected):
logger.info(f"testing {test_scenario} input={test_input} expected={expected} event(s)")
search_query = f"index={setup['splunk_index']} | search {test_input} | fields *"
logger.info(search_query)
events = check_events_from_splunk(start_time="-15m@m",
url=setup["splunkd_url"],
user=setup["splunk_user"],
query=[f"search {search_query}"],
password=setup["splunk_password"])
logger.info("Splunk received %s events in the last hour", len(events))

if(len(events)==1):
assert events[0]["kafka_record_key"] == expected
else:
assert False,"No event found or duplicate events found"