Skip to content

Commit 8c709a3

Browse files
author
Donald Tregonning
authored
Merge branch 'develop' into issue-138/header-support
2 parents 5e79d8a + d789ce1 commit 8c709a3

File tree

17 files changed

+264
-26
lines changed

17 files changed

+264
-26
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
12
# Compiled class file
23
*.class
34

@@ -27,4 +28,5 @@ target/*
2728
splunk-kafka-connect/
2829
pom.xml.versionsBackup
2930
.classpath
30-
.project
31+
.project
32+
*.iml

build.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/bin/bash
22

33
# variables
4-
kafkaversion=0.11.0.2
4+
kafkaversion=2.0.0
55
builddir=/tmp/splunk-kafka-connect-build/splunk-kafka-connect
66

77
githash=`git rev-parse --short HEAD 2>/dev/null | sed "s/\(.*\)/@\1/"` # get current git hash

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>com.github.splunk.kafka.connect</groupId>
88
<artifactId>splunk-kafka-connect</artifactId>
9-
<version>v1.0.0</version>
9+
<version>v1.1.0</version>
1010
<name>splunk-kafka-connect</name>
1111

1212
<properties>

src/main/java/com/splunk/hecclient/Event.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.fasterxml.jackson.annotation.JsonInclude;
2020
import com.fasterxml.jackson.databind.ObjectMapper;
2121
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
22+
import com.fasterxml.jackson.databind.util.StdDateFormat;
2223
import org.slf4j.*;
2324

2425
import java.io.*;
@@ -43,7 +44,13 @@ public abstract class Event {
4344
static final String SOURCE = "source";
4445
static final String SOURCETYPE = "sourcetype";
4546

46-
static final ObjectMapper jsonMapper = new ObjectMapper();
47+
static final ObjectMapper jsonMapper;
48+
static {
49+
jsonMapper = new ObjectMapper();
50+
jsonMapper.registerModule(new com.splunk.kafka.connect.JacksonStructModule());
51+
jsonMapper.setDateFormat(StdDateFormat.instance);
52+
}
53+
4754
protected static final Logger log = LoggerFactory.getLogger(Event.class);
4855

4956
@JsonSerialize(using = DoubleSerializer.class)

src/main/java/com/splunk/hecclient/HecAckPoller.java

Lines changed: 61 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public void add(HecChannel channel, EventBatch batch, String response) {
132132
}
133133

134134
if (channelEvents.get(resp.getAckId()) != null) {
135-
log.error("ackId={} already exists for channel={} index={}", resp.getAckId(), channel, channel.getIndexer());
135+
log.warn("ackId={} already exists for channel={} index={} data may be duplicated in Splunk", resp.getAckId(), channel, channel.getIndexer());
136136
return;
137137
}
138138

@@ -184,6 +184,52 @@ public int getAckPollInterval() {
184184
return ackPollInterval;
185185
}
186186

187+
/**
188+
* StickySessionHandler is used to reassign channel id and fail the batches for that HecChannel.
189+
* Also, the HecChannel will be unavailable during this period.
190+
* StickySessionHandler follows the following flow:
191+
* 1) Set channel unavailable
192+
* 2) Get batches for the channel
193+
* 3) Remove batches for the channel from the poller
194+
* 4) Remove batches from kafka record tracker to fail them and resend
195+
* 5) Remove channel
196+
* 6) Change channel id
197+
* 7) Set channel available
198+
*
199+
* @param channel HecChannel is the channel for which id has tobe changed and batches have to be failed.
200+
* @see HecChannel
201+
* @since 1.1.0
202+
*/
203+
public void stickySessionHandler(HecChannel channel) {
204+
String oldChannelId = channel.getId();
205+
channel.setAvailable(false);
206+
log.info("Channel {} set to be not available", oldChannelId);
207+
ConcurrentHashMap<Long, EventBatch> channelBatches = outstandingEventBatches.get(channel);
208+
if(channelBatches != null && channelBatches.size() > 0) {
209+
log.info("Failing {} batches for the channel {}, these will be resent by the connector.", channelBatches.size(), oldChannelId);
210+
if (pollerCallback != null) {
211+
List<EventBatch> expired = new ArrayList<>();
212+
Iterator iter = channelBatches.entrySet().iterator();
213+
while(iter.hasNext()) {
214+
Map.Entry<Long, EventBatch> pair = (Map.Entry) iter.next();
215+
EventBatch batch = pair.getValue();
216+
totalOutstandingEventBatches.decrementAndGet();
217+
batch.fail();
218+
expired.add(batch);
219+
iter.remove();
220+
}
221+
pollerCallback.onEventFailure(expired, new HecException("sticky_session_expired"));
222+
}
223+
}
224+
outstandingEventBatches.remove(channel);
225+
channel.setId();
226+
String newChannelId = channel.getId();
227+
log.info("Changed channel id from {} to {}", oldChannelId, newChannelId);
228+
229+
channel.setAvailable(true);
230+
log.info("Channel {} is available", newChannelId);
231+
}
232+
187233
private void poll() {
188234
if (totalOutstandingEventBatches.get() <= 0 || outstandingEventBatches.size() <= 0) {
189235
return;
@@ -273,19 +319,22 @@ private void handleAckPollResult(HecChannel channel, HecAckPollResponse result)
273319

274320
List<EventBatch> committedBatches = new ArrayList<>();
275321
ConcurrentHashMap<Long, EventBatch> channelBatches = outstandingEventBatches.get(channel);
276-
for (Long id: ids) {
277-
EventBatch batch = channelBatches.remove(id);
278-
if (batch == null) {
279-
log.warn("event batch id={} for channel={} on host={} is not in map anymore", id, channel, channel.getIndexer());
280-
continue;
322+
// Added null check as channelBatches might still be null(It may be removed while handling sticky sessions and not added until we send more data)
323+
if (channelBatches != null) {
324+
for (Long id: ids) {
325+
EventBatch batch = channelBatches.remove(id);
326+
if (batch == null) {
327+
log.warn("event batch id={} for channel={} on host={} is not in map anymore", id, channel, channel.getIndexer());
328+
continue;
329+
}
330+
totalOutstandingEventBatches.decrementAndGet();
331+
batch.commit();
332+
committedBatches.add(batch);
281333
}
282-
totalOutstandingEventBatches.decrementAndGet();
283-
batch.commit();
284-
committedBatches.add(batch);
285-
}
286334

287-
if (!committedBatches.isEmpty() && pollerCallback != null) {
288-
pollerCallback.onEventCommitted(committedBatches);
335+
if (!committedBatches.isEmpty() && pollerCallback != null) {
336+
pollerCallback.onEventCommitted(committedBatches);
337+
}
289338
}
290339
}
291340

src/main/java/com/splunk/hecclient/HecChannel.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ final class HecChannel {
2424
private String id;
2525
private Map<String, String> chField;
2626
private IndexerInf indexer;
27+
private boolean isAvailable;
2728

2829
public HecChannel(IndexerInf idx) {
2930
id = newChannelId();
3031
indexer = idx;
32+
isAvailable = true;
3133
}
3234

3335
public IndexerInf getIndexer() {
@@ -48,6 +50,10 @@ public HecChannel setTracking(boolean trackChannel) {
4850
return this;
4951
}
5052

53+
public void setId() { id = newChannelId(); }
54+
55+
public void setAvailable(boolean isAvailable) { this.isAvailable = isAvailable; }
56+
5157
public void send(final EventBatch batch) {
5258
if (chField != null) {
5359
batch.addExtraFields(chField);
@@ -60,9 +66,9 @@ public String executeHttpRequest(final HttpUriRequest req) {
6066
return indexer.executeHttpRequest(req);
6167
}
6268

63-
public boolean hasBackPressure() {
64-
return indexer.hasBackPressure();
65-
}
69+
public boolean hasBackPressure() { return indexer.hasBackPressure(); }
70+
71+
public boolean isNotAvailable() { return isAvailable == false; }
6672

6773
@Override
6874
public boolean equals(Object obj) {

src/main/java/com/splunk/hecclient/HttpClientBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public HttpClientBuilder setSocketTimeout(int timeout /*seconds*/) {
5353
}
5454

5555
public HttpClientBuilder setSocketSendBufferSize(int bufSize /*bytes*/) {
56-
this.socketSendBufferSize = socketSendBufferSize;
56+
this.socketSendBufferSize = bufSize;
5757
return this;
5858
}
5959

src/main/java/com/splunk/hecclient/Indexer.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.slf4j.LoggerFactory;
3030

3131
import java.io.IOException;
32+
import java.util.concurrent.ConcurrentHashMap;
3233

3334
final class Indexer implements IndexerInf {
3435
private static final Logger log = LoggerFactory.getLogger(Indexer.class);
@@ -124,9 +125,9 @@ public boolean send(final EventBatch batch) {
124125
}
125126

126127
// we are all good
127-
poller.add(channel, batch, resp);
128-
log.debug("sent {} events to splunk through channel={} indexer={}",
129-
batch.size(), channel.getId(), getBaseUrl());
128+
poller.add(this.channel, batch, resp);
129+
log.debug("sent {} events to splunk through channel={} indexer={}", batch.size(), channel.getId(), getBaseUrl());
130+
130131
return true;
131132
}
132133

@@ -161,7 +162,13 @@ private String readAndCloseResponse(CloseableHttpResponse resp) {
161162
}
162163
}
163164

164-
// log.info("event posting, channel={}, cookies={}", channel, resp.getHeaders("Set-Cookie"));
165+
// log.info("event posting, channel={}, cookies={}, cookies.length={}", channel, resp.getHeaders("Set-Cookie"), resp.getHeaders("Set-Cookie").length);
166+
167+
if((resp.getHeaders("Set-Cookie") != null) && (resp.getHeaders("Set-Cookie").length > 0)) {
168+
log.info("Sticky session expiry detected, will cleanup old channel and its associated batches");
169+
poller.stickySessionHandler(channel);
170+
}
171+
165172
int status = resp.getStatusLine().getStatusCode();
166173
// FIXME 503 server is busy backpressure
167174
if (status != 200 && status != 201) {

src/main/java/com/splunk/hecclient/LoadBalancer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public void send(final EventBatch batch) {
5252
for (int tried = 0; tried != channels.size(); tried++) {
5353
HecChannel channel = channels.get(index);
5454
index = (index + 1) % channels.size();
55-
if (!channel.hasBackPressure()) {
55+
if (!channel.hasBackPressure() && !channel.isNotAvailable()) {
5656
channel.send(batch);
5757
return;
5858
}

src/main/java/com/splunk/hecclient/Poller.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public interface Poller {
2020
void stop();
2121
void add(HecChannel channel, EventBatch batch, String response);
2222
void fail(HecChannel channel, EventBatch batch, Exception ex);
23-
23+
void stickySessionHandler(HecChannel channel);
2424
// minimum load channel
2525
HecChannel getMinLoadChannel();
2626
long getTotalOutstandingEventBatches();

0 commit comments

Comments
 (0)