Skip to content

Commit 4e7ba60

Browse files
author
Don Tregonning
committed
issue-165 Modify raw batching functionality to use hashmap instad of list
1 parent 8953a55 commit 4e7ba60

File tree

2 files changed

+46
-12
lines changed

2 files changed

+46
-12
lines changed

src/main/java/com/splunk/kafka/connect/SplunkSinkRecord.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package com.splunk.kafka.connect;
1717

18+
import org.apache.commons.lang3.builder.HashCodeBuilder;
19+
import org.apache.commons.lang3.builder.EqualsBuilder;
1820
import org.apache.kafka.connect.header.Headers;
1921
import org.apache.kafka.connect.sink.SinkRecord;
2022

@@ -81,6 +83,30 @@ private void setMetadataValues() {
8183
splunkHeaderSourcetype = this.headers.lastWithName(connectorConfig.headerSourcetype).value().toString();
8284
}
8385

86+
@Override
87+
public int hashCode() {
88+
return new HashCodeBuilder()
89+
.append(splunkHeaderIndex)
90+
.append(splunkHeaderHost)
91+
.append(splunkHeaderSource)
92+
.append(splunkHeaderSourcetype)
93+
.toHashCode();
94+
}
95+
96+
@Override
97+
public boolean equals(Object obj) {
98+
if (obj instanceof SplunkSinkRecord) {
99+
final SplunkSinkRecord other = (SplunkSinkRecord) obj;
100+
return new EqualsBuilder()
101+
.append(splunkHeaderIndex, other.splunkHeaderIndex)
102+
.append(splunkHeaderHost, other.splunkHeaderHost)
103+
.append(splunkHeaderSource, other.splunkHeaderSource)
104+
.append(splunkHeaderSourcetype, other.splunkHeaderSourcetype)
105+
.isEquals();
106+
}
107+
return false;
108+
}
109+
84110
public Headers getHeaders() {
85111
return headers;
86112
}

src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.slf4j.Logger;
3030
import org.slf4j.LoggerFactory;
3131

32+
import javax.net.ssl.SSLEngineResult;
33+
3234
public final class SplunkSinkTask extends SinkTask implements PollerCallback {
3335
private static final Logger log = LoggerFactory.getLogger(SplunkSinkTask.class);
3436
private static final long flushWindow = 30 * 1000; // 30 seconds
@@ -158,20 +160,26 @@ else if (connectorConfig.hasMetaDataConfigured()) {
158160
}
159161

160162
private void handleRecordsWithHeader(final Collection<SinkRecord> records) {
161-
List<SinkRecord> recordsWithSameHeaders = new ArrayList<>();
162-
SplunkSinkRecord splunkSinkRecord = new SplunkSinkRecord();
163-
163+
HashMap<SplunkSinkRecord, ArrayList<SinkRecord>> recordsWithSameHeaders = new HashMap<>();
164+
SplunkSinkRecord splunkSinkRecord;
164165
for (SinkRecord record : records) {
165-
if (splunkSinkRecord.compareRecordHeaders(record)) {
166-
recordsWithSameHeaders.add(record);
167-
continue;
168-
}
169-
170-
EventBatch batch = createRawHeaderEventBatch(splunkSinkRecord);
171-
sendEvents(recordsWithSameHeaders, batch);
172-
recordsWithSameHeaders.clear();
173-
recordsWithSameHeaders.add(record);
174166
splunkSinkRecord = new SplunkSinkRecord(record, connectorConfig);
167+
168+
if (!recordsWithSameHeaders.containsKey(splunkSinkRecord)) {
169+
recordsWithSameHeaders.put(splunkSinkRecord, new ArrayList<>());
170+
}
171+
ArrayList<SinkRecord> recordList = recordsWithSameHeaders.get(record);
172+
recordList.add(record);
173+
recordsWithSameHeaders.put(splunkSinkRecord, recordList);
174+
}
175+
176+
Iterator itr = recordsWithSameHeaders.entrySet().iterator();
177+
while(itr.hasNext()) {
178+
Map.Entry set = (Map.Entry)itr.next();
179+
SplunkSinkRecord splunkSinkRecordKey = (SplunkSinkRecord)set.getKey();
180+
ArrayList<SinkRecord> recordArrayList = (ArrayList)set.getValue();
181+
EventBatch batch = createRawHeaderEventBatch(1);
182+
sendEvents(recordArrayList, batch);
175183
}
176184
}
177185

0 commit comments

Comments
 (0)