Skip to content

Commit 7180b33

Browse files
author
Donald Tregonning
authored
Merge pull request #167 from splunk/issue-165-modify-record-sort-for-raw
Issue 165 modify record sort for raw
2 parents 8953a55 + 7906cc0 commit 7180b33

File tree

4 files changed

+121
-24
lines changed

4 files changed

+121
-24
lines changed

src/main/java/com/splunk/hecclient/HecAckPoller.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,9 +209,9 @@ public void stickySessionHandler(HecChannel channel) {
209209
log.info("Failing {} batches for the channel {}, these will be resent by the connector.", channelBatches.size(), oldChannelId);
210210
if (pollerCallback != null) {
211211
List<EventBatch> expired = new ArrayList<>();
212-
Iterator iter = channelBatches.entrySet().iterator();
212+
Iterator<Map.Entry<Long,EventBatch>> iter = channelBatches.entrySet().iterator();
213213
while(iter.hasNext()) {
214-
Map.Entry<Long, EventBatch> pair = (Map.Entry) iter.next();
214+
Map.Entry<Long, EventBatch> pair = iter.next();
215215
EventBatch batch = pair.getValue();
216216
totalOutstandingEventBatches.decrementAndGet();
217217
batch.fail();

src/main/java/com/splunk/kafka/connect/SplunkSinkRecord.java

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,12 @@
1515
*/
1616
package com.splunk.kafka.connect;
1717

18+
import org.apache.commons.lang3.builder.HashCodeBuilder;
19+
import org.apache.commons.lang3.builder.EqualsBuilder;
1820
import org.apache.kafka.connect.header.Headers;
1921
import org.apache.kafka.connect.sink.SinkRecord;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
2024

2125
/**
2226
* SplunkSinkRecord provides helper functionality to enable Header support for the Splunk Connect for Kafka, Namely
@@ -27,6 +31,7 @@
2731
* @since 1.1.0
2832
*/
2933
public class SplunkSinkRecord {
34+
private static final Logger log = LoggerFactory.getLogger(SplunkSinkRecord.class);
3035
Headers headers;
3136
SplunkSinkConnectorConfig connectorConfig;
3237
String splunkHeaderIndex = "";
@@ -48,7 +53,10 @@ public SplunkSinkRecord() {}
4853
public SplunkSinkRecord(SinkRecord record, SplunkSinkConnectorConfig connectorConfig) {
4954
this.connectorConfig = connectorConfig;
5055
this.headers = record.headers();
51-
setMetadataValues();
56+
if(this.headers != null) {
57+
log.info("not null headers");
58+
setMetadataValues();
59+
}
5260
}
5361

5462
/**
@@ -75,10 +83,49 @@ protected boolean compareRecordHeaders(SinkRecord record) {
7583
}
7684

7785
private void setMetadataValues() {
78-
splunkHeaderIndex = this.headers.lastWithName(connectorConfig.headerIndex).value().toString();
79-
splunkHeaderHost = this.headers.lastWithName(connectorConfig.headerHost).value().toString();
80-
splunkHeaderSource = this.headers.lastWithName(connectorConfig.headerSource).value().toString();
81-
splunkHeaderSourcetype = this.headers.lastWithName(connectorConfig.headerSourcetype).value().toString();
86+
log.info("Made it to setMetadataValues");
87+
88+
if(this.headers.lastWithName(connectorConfig.headerIndex).value() != null) {
89+
splunkHeaderIndex = this.headers.lastWithName(connectorConfig.headerIndex).value().toString();
90+
}
91+
if(this.headers.lastWithName(connectorConfig.headerHost).value() != null) {
92+
splunkHeaderHost = this.headers.lastWithName(connectorConfig.headerHost).value().toString();
93+
}
94+
if(this.headers.lastWithName(connectorConfig.headerSource).value() != null) {
95+
splunkHeaderSource = this.headers.lastWithName(connectorConfig.headerSource).value().toString();
96+
}
97+
if(this.headers.lastWithName(connectorConfig.headerSourcetype).value() != null) {
98+
splunkHeaderSourcetype = this.headers.lastWithName(connectorConfig.headerSourcetype).value().toString();
99+
}
100+
}
101+
102+
public String id() {
103+
return splunkHeaderIndex + "$$$" + splunkHeaderHost + "$$$"
104+
+ splunkHeaderSource + "$$$" + splunkHeaderSourcetype;
105+
}
106+
107+
@Override
108+
public int hashCode() {
109+
return new HashCodeBuilder()
110+
.append(splunkHeaderIndex)
111+
.append(splunkHeaderHost)
112+
.append(splunkHeaderSource)
113+
.append(splunkHeaderSourcetype)
114+
.toHashCode();
115+
}
116+
117+
@Override
118+
public boolean equals(Object obj) {
119+
if (obj instanceof SplunkSinkRecord) {
120+
final SplunkSinkRecord other = (SplunkSinkRecord) obj;
121+
return new EqualsBuilder()
122+
.append(splunkHeaderIndex, other.splunkHeaderIndex)
123+
.append(splunkHeaderHost, other.splunkHeaderHost)
124+
.append(splunkHeaderSource, other.splunkHeaderSource)
125+
.append(splunkHeaderSourcetype, other.splunkHeaderSourcetype)
126+
.isEquals();
127+
}
128+
return false;
82129
}
83130

84131
public Headers getHeaders() {

src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java

Lines changed: 61 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ private void preventTooManyOutstandingEvents() {
141141

142142
private void handleRaw(final Collection<SinkRecord> records) {
143143
if(connectorConfig.headerSupport) {
144-
handleRecordsWithHeader(records);
144+
if(records != null) { handleRecordsWithHeader(records); }
145145
}
146146

147147
else if (connectorConfig.hasMetaDataConfigured()) {
@@ -158,21 +158,63 @@ else if (connectorConfig.hasMetaDataConfigured()) {
158158
}
159159

160160
private void handleRecordsWithHeader(final Collection<SinkRecord> records) {
161-
List<SinkRecord> recordsWithSameHeaders = new ArrayList<>();
162-
SplunkSinkRecord splunkSinkRecord = new SplunkSinkRecord();
161+
HashMap<String, ArrayList<SinkRecord>> recordsWithSameHeaders = new HashMap<>();
163162

164163
for (SinkRecord record : records) {
165-
if (splunkSinkRecord.compareRecordHeaders(record)) {
166-
recordsWithSameHeaders.add(record);
167-
continue;
164+
String key = headerId(record);
165+
if (!recordsWithSameHeaders.containsKey(key)) {
166+
ArrayList<SinkRecord> recordList = new ArrayList<SinkRecord>();
167+
recordsWithSameHeaders.put(key, recordList);
168168
}
169+
ArrayList<SinkRecord> recordList = recordsWithSameHeaders.get(key);
170+
recordList.add(record);
171+
recordsWithSameHeaders.put(key, recordList);
172+
}
173+
174+
int index = 0;
175+
Iterator<Map.Entry<String, ArrayList<SinkRecord>>> itr = recordsWithSameHeaders.entrySet().iterator();
176+
while(itr.hasNext()) {
177+
Map.Entry set = itr.next();
178+
String splunkSinkRecordKey = (String)set.getKey();
179+
ArrayList<SinkRecord> recordArrayList = (ArrayList)set.getValue();
180+
EventBatch batch = createRawHeaderEventBatch(splunkSinkRecordKey);
181+
sendEvents(recordArrayList, batch);
182+
index++;
183+
}
184+
log.debug("{} records have been bucketed in to {} batches",records.size(), index);
185+
}
186+
187+
public String headerId(SinkRecord sinkRecord) {
188+
Headers headers = sinkRecord.headers();
189+
String headerId = "";
190+
191+
if(headers.lastWithName(connectorConfig.headerIndex) != null) {
192+
headerId += headers.lastWithName(connectorConfig.headerIndex).value().toString();
193+
}
194+
195+
headerId = insertheaderToken(headerId);
196+
197+
if(headers.lastWithName(connectorConfig.headerHost) != null) {
198+
headerId += headers.lastWithName(connectorConfig.headerHost).value().toString();
199+
}
200+
201+
headerId = insertheaderToken(headerId);
169202

170-
EventBatch batch = createRawHeaderEventBatch(splunkSinkRecord);
171-
sendEvents(recordsWithSameHeaders, batch);
172-
recordsWithSameHeaders.clear();
173-
recordsWithSameHeaders.add(record);
174-
splunkSinkRecord = new SplunkSinkRecord(record, connectorConfig);
203+
if(headers.lastWithName(connectorConfig.headerSource) != null) {
204+
headerId += headers.lastWithName(connectorConfig.headerSource).value().toString();
175205
}
206+
207+
headerId = insertheaderToken(headerId);
208+
209+
if(headers.lastWithName(connectorConfig.headerSourcetype) != null) {
210+
headerId += headers.lastWithName(connectorConfig.headerSourcetype).value().toString();
211+
}
212+
213+
return headerId;
214+
}
215+
216+
public String insertheaderToken(String id) {
217+
return id + "$$$";
176218
}
177219

178220
private void handleEvent(final Collection<SinkRecord> records) {
@@ -217,15 +259,17 @@ private void send(final EventBatch batch) {
217259
}
218260
}
219261

220-
private EventBatch createRawHeaderEventBatch(SplunkSinkRecord splunkSinkRecord) {
262+
private EventBatch createRawHeaderEventBatch(String splunkSinkRecord) {
263+
String[] split = splunkSinkRecord.split("[$]{3}");
264+
221265
return RawEventBatch.factory()
222-
.setIndex(splunkSinkRecord.getSplunkHeaderIndex())
223-
.setSourcetype(splunkSinkRecord.getSplunkHeaderSourcetype())
224-
.setSource(splunkSinkRecord.getSplunkHeaderSource())
225-
.setHost(splunkSinkRecord.getSplunkHeaderHost())
266+
.setIndex(split[0])
267+
.setSourcetype(split[1])
268+
.setSource(split[2])
269+
.setHost(split[3])
226270
.build();
227-
228271
}
272+
229273
// setup metadata on RawEventBatch
230274
private EventBatch createRawEventBatch(final TopicPartition tp) {
231275
if (tp == null) {

src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,12 @@ public void putWithRawAndAckWithoutMeta() {
226226
putWithSuccess(true, false);
227227
}
228228

229+
@Test
230+
public void handleRecordsWithHeaders() {
231+
232+
233+
}
234+
229235
private void putWithSuccess(boolean raw, boolean withMeta) {
230236
int batchSize = 100;
231237
int total = 1000;

0 commit comments

Comments
 (0)