Skip to content

Commit 7906cc0

Browse files
author
Don Tregonning
committed
issue-165 - fixed categorization of like Headers
1 parent 4e1372a commit 7906cc0

File tree

3 files changed

+28
-32
lines changed

3 files changed

+28
-32
lines changed

src/main/java/com/splunk/hecclient/HecAckPoller.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,9 +209,9 @@ public void stickySessionHandler(HecChannel channel) {
209209
log.info("Failing {} batches for the channel {}, these will be resent by the connector.", channelBatches.size(), oldChannelId);
210210
if (pollerCallback != null) {
211211
List<EventBatch> expired = new ArrayList<>();
212-
Iterator iter = channelBatches.entrySet().iterator();
212+
Iterator<Map.Entry<Long,EventBatch>> iter = channelBatches.entrySet().iterator();
213213
while(iter.hasNext()) {
214-
Map.Entry<Long, EventBatch> pair = (Map.Entry) iter.next();
214+
Map.Entry<Long, EventBatch> pair = iter.next();
215215
EventBatch batch = pair.getValue();
216216
totalOutstandingEventBatches.decrementAndGet();
217217
batch.fail();

src/main/java/com/splunk/kafka/connect/SplunkSinkRecord.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,8 @@ private void setMetadataValues() {
100100
}
101101

102102
public String id() {
103-
return
103+
return splunkHeaderIndex + "$$$" + splunkHeaderHost + "$$$"
104+
+ splunkHeaderSource + "$$$" + splunkHeaderSourcetype;
104105
}
105106

106107
@Override

src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java

Lines changed: 24 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package com.splunk.kafka.connect;
1717

1818
import com.splunk.hecclient.*;
19-
import com.sun.tools.corba.se.idl.StringGen;
2019
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2120
import org.apache.kafka.common.TopicPartition;
2221
import org.apache.kafka.connect.errors.RetriableException;
@@ -30,8 +29,6 @@
3029
import org.slf4j.Logger;
3130
import org.slf4j.LoggerFactory;
3231

33-
import javax.net.ssl.SSLEngineResult;
34-
3532
public final class SplunkSinkTask extends SinkTask implements PollerCallback {
3633
private static final Logger log = LoggerFactory.getLogger(SplunkSinkTask.class);
3734
private static final long flushWindow = 30 * 1000; // 30 seconds
@@ -161,67 +158,63 @@ else if (connectorConfig.hasMetaDataConfigured()) {
161158
}
162159

163160
private void handleRecordsWithHeader(final Collection<SinkRecord> records) {
164-
log.info("Inside handle records");
165-
166161
HashMap<String, ArrayList<SinkRecord>> recordsWithSameHeaders = new HashMap<>();
167-
SplunkSinkRecord splunkSinkRecord;
168-
for (SinkRecord record : records) {
169-
log.info("Inside loop");
170162

171-
// splunkSinkRecord = new SplunkSinkRecord(record, connectorConfig);
163+
for (SinkRecord record : records) {
172164
String key = headerId(record);
173165
if (!recordsWithSameHeaders.containsKey(key)) {
174-
recordsWithSameHeaders.put(key, new ArrayList<>());
166+
ArrayList<SinkRecord> recordList = new ArrayList<SinkRecord>();
167+
recordsWithSameHeaders.put(key, recordList);
175168
}
176-
ArrayList<SinkRecord> recordList = recordsWithSameHeaders.get(record);
169+
ArrayList<SinkRecord> recordList = recordsWithSameHeaders.get(key);
177170
recordList.add(record);
178171
recordsWithSameHeaders.put(key, recordList);
179172
}
180173

181174
int index = 0;
182-
Iterator itr = recordsWithSameHeaders.entrySet().iterator();
175+
Iterator<Map.Entry<String, ArrayList<SinkRecord>>> itr = recordsWithSameHeaders.entrySet().iterator();
183176
while(itr.hasNext()) {
184-
log.info("Sending Log {}", index);
185-
Map.Entry set = (Map.Entry)itr.next();
186-
SplunkSinkRecord splunkSinkRecordKey = (SplunkSinkRecord)set.getKey();
177+
Map.Entry set = itr.next();
178+
String splunkSinkRecordKey = (String)set.getKey();
187179
ArrayList<SinkRecord> recordArrayList = (ArrayList)set.getValue();
188180
EventBatch batch = createRawHeaderEventBatch(splunkSinkRecordKey);
189181
sendEvents(recordArrayList, batch);
190182
index++;
191183
}
184+
log.debug("{} records have been bucketed in to {} batches",records.size(), index);
192185
}
193186

194187
public String headerId(SinkRecord sinkRecord) {
195188
Headers headers = sinkRecord.headers();
196189
String headerId = "";
197190

198-
if(headers.lastWithName(connectorConfig.headerIndex).value() != null) {
191+
if(headers.lastWithName(connectorConfig.headerIndex) != null) {
199192
headerId += headers.lastWithName(connectorConfig.headerIndex).value().toString();
200193
}
201194

202-
insertheaderToken(headerId);
195+
headerId = insertheaderToken(headerId);
203196

204-
if(headers.lastWithName(connectorConfig.headerHost).value() != null) {
197+
if(headers.lastWithName(connectorConfig.headerHost) != null) {
205198
headerId += headers.lastWithName(connectorConfig.headerHost).value().toString();
206199
}
207200

208-
insertheaderToken(headerId);
201+
headerId = insertheaderToken(headerId);
209202

210-
if(headers.lastWithName(connectorConfig.headerSource).value() != null) {
203+
if(headers.lastWithName(connectorConfig.headerSource) != null) {
211204
headerId += headers.lastWithName(connectorConfig.headerSource).value().toString();
212205
}
213206

214-
insertheaderToken(headerId);
207+
headerId = insertheaderToken(headerId);
215208

216-
if(headers.lastWithName(connectorConfig.headerSourcetype).value() != null) {
209+
if(headers.lastWithName(connectorConfig.headerSourcetype) != null) {
217210
headerId += headers.lastWithName(connectorConfig.headerSourcetype).value().toString();
218211
}
219212

220213
return headerId;
221214
}
222215

223216
public String insertheaderToken(String id) {
224-
return id += "$$$";
217+
return id + "$$$";
225218
}
226219

227220
private void handleEvent(final Collection<SinkRecord> records) {
@@ -266,15 +259,17 @@ private void send(final EventBatch batch) {
266259
}
267260
}
268261

269-
private EventBatch createRawHeaderEventBatch(SplunkSinkRecord splunkSinkRecord) {
262+
private EventBatch createRawHeaderEventBatch(String splunkSinkRecord) {
263+
String[] split = splunkSinkRecord.split("[$]{3}");
264+
270265
return RawEventBatch.factory()
271-
.setIndex(splunkSinkRecord.getSplunkHeaderIndex())
272-
.setSourcetype(splunkSinkRecord.getSplunkHeaderSourcetype())
273-
.setSource(splunkSinkRecord.getSplunkHeaderSource())
274-
.setHost(splunkSinkRecord.getSplunkHeaderHost())
266+
.setIndex(split[0])
267+
.setSourcetype(split[1])
268+
.setSource(split[2])
269+
.setHost(split[3])
275270
.build();
276-
277271
}
272+
278273
// setup metadata on RawEventBatch
279274
private EventBatch createRawEventBatch(final TopicPartition tp) {
280275
if (tp == null) {

0 commit comments

Comments
 (0)