Currently like records are batched together if they have the same metadata "index, source, sourcetype, host".
The current batching works by grabbing sequential events and starting a new batch if different. Suggestions have been made to use a better way to split out these batches.
Essentially, this is a categorization function. It is simpler to just implements equal and hashCode functions (or other similar idea) on KafkaHeaderUtility class and then build a hashtable to do categorization. Then it will just be like below
void handleRaw(final Collection<SinkRecord> records) {
if (connectorConfig.headerSupport) {
// it worth to refactor out a function to handle header case as it is complicate
handleRecordsWithHeaders(records)
} else if (...) {
...
} else {
...
}
}
private void handleRecordsWithHeader(final Collection<SinkRecord> records) {
map<SplunkSinkRecord, List<SplunkSinkRecord>) recordsWithSameHeaders = new ...
for (SinkRecord r: records) {
SplunkSinkRecord sr = new SplunkSinkRecord(r)
if (!recordsWithSameHeaders.contains(sr)) {
recordsWithSameHeaders[sr] = new ArrayList<>()
}
recordsWithSameHeaders[sr].add(sr)
}
// send categorized events out
}