Skip to content

Commit d789ce1

Browse files
author
Donald Tregonning
authored
Merge pull request #162 from splunk/channel-expiration-logging
#137 - Handle sticky session expiration
2 parents b87acc1 + 33306e8 commit d789ce1

File tree

10 files changed

+159
-21
lines changed

10 files changed

+159
-21
lines changed

src/main/java/com/splunk/hecclient/HecAckPoller.java

Lines changed: 61 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public void add(HecChannel channel, EventBatch batch, String response) {
132132
}
133133

134134
if (channelEvents.get(resp.getAckId()) != null) {
135-
log.error("ackId={} already exists for channel={} index={}", resp.getAckId(), channel, channel.getIndexer());
135+
log.warn("ackId={} already exists for channel={} index={} data may be duplicated in Splunk", resp.getAckId(), channel, channel.getIndexer());
136136
return;
137137
}
138138

@@ -184,6 +184,52 @@ public int getAckPollInterval() {
184184
return ackPollInterval;
185185
}
186186

187+
/**
188+
* StickySessionHandler is used to reassign channel id and fail the batches for that HecChannel.
189+
* Also, the HecChannel will be unavailable during this period.
190+
* StickySessionHandler follows the following flow:
191+
* 1) Set channel unavailable
192+
* 2) Get batches for the channel
193+
* 3) Remove batches for the channel from the poller
194+
* 4) Remove batches from kafka record tracker to fail them and resend
195+
* 5) Remove channel
196+
* 6) Change channel id
197+
* 7) Set channel available
198+
*
199+
* @param channel HecChannel is the channel for which id has tobe changed and batches have to be failed.
200+
* @see HecChannel
201+
* @since 1.1.0
202+
*/
203+
public void stickySessionHandler(HecChannel channel) {
204+
String oldChannelId = channel.getId();
205+
channel.setAvailable(false);
206+
log.info("Channel {} set to be not available", oldChannelId);
207+
ConcurrentHashMap<Long, EventBatch> channelBatches = outstandingEventBatches.get(channel);
208+
if(channelBatches != null && channelBatches.size() > 0) {
209+
log.info("Failing {} batches for the channel {}, these will be resent by the connector.", channelBatches.size(), oldChannelId);
210+
if (pollerCallback != null) {
211+
List<EventBatch> expired = new ArrayList<>();
212+
Iterator iter = channelBatches.entrySet().iterator();
213+
while(iter.hasNext()) {
214+
Map.Entry<Long, EventBatch> pair = (Map.Entry) iter.next();
215+
EventBatch batch = pair.getValue();
216+
totalOutstandingEventBatches.decrementAndGet();
217+
batch.fail();
218+
expired.add(batch);
219+
iter.remove();
220+
}
221+
pollerCallback.onEventFailure(expired, new HecException("sticky_session_expired"));
222+
}
223+
}
224+
outstandingEventBatches.remove(channel);
225+
channel.setId();
226+
String newChannelId = channel.getId();
227+
log.info("Changed channel id from {} to {}", oldChannelId, newChannelId);
228+
229+
channel.setAvailable(true);
230+
log.info("Channel {} is available", newChannelId);
231+
}
232+
187233
private void poll() {
188234
if (totalOutstandingEventBatches.get() <= 0 || outstandingEventBatches.size() <= 0) {
189235
return;
@@ -273,19 +319,22 @@ private void handleAckPollResult(HecChannel channel, HecAckPollResponse result)
273319

274320
List<EventBatch> committedBatches = new ArrayList<>();
275321
ConcurrentHashMap<Long, EventBatch> channelBatches = outstandingEventBatches.get(channel);
276-
for (Long id: ids) {
277-
EventBatch batch = channelBatches.remove(id);
278-
if (batch == null) {
279-
log.warn("event batch id={} for channel={} on host={} is not in map anymore", id, channel, channel.getIndexer());
280-
continue;
322+
// Added null check as channelBatches might still be null(It may be removed while handling sticky sessions and not added until we send more data)
323+
if (channelBatches != null) {
324+
for (Long id: ids) {
325+
EventBatch batch = channelBatches.remove(id);
326+
if (batch == null) {
327+
log.warn("event batch id={} for channel={} on host={} is not in map anymore", id, channel, channel.getIndexer());
328+
continue;
329+
}
330+
totalOutstandingEventBatches.decrementAndGet();
331+
batch.commit();
332+
committedBatches.add(batch);
281333
}
282-
totalOutstandingEventBatches.decrementAndGet();
283-
batch.commit();
284-
committedBatches.add(batch);
285-
}
286334

287-
if (!committedBatches.isEmpty() && pollerCallback != null) {
288-
pollerCallback.onEventCommitted(committedBatches);
335+
if (!committedBatches.isEmpty() && pollerCallback != null) {
336+
pollerCallback.onEventCommitted(committedBatches);
337+
}
289338
}
290339
}
291340

src/main/java/com/splunk/hecclient/HecChannel.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ final class HecChannel {
2424
private String id;
2525
private Map<String, String> chField;
2626
private IndexerInf indexer;
27+
private boolean isAvailable;
2728

2829
public HecChannel(IndexerInf idx) {
2930
id = newChannelId();
3031
indexer = idx;
32+
isAvailable = true;
3133
}
3234

3335
public IndexerInf getIndexer() {
@@ -48,6 +50,10 @@ public HecChannel setTracking(boolean trackChannel) {
4850
return this;
4951
}
5052

53+
public void setId() { id = newChannelId(); }
54+
55+
public void setAvailable(boolean isAvailable) { this.isAvailable = isAvailable; }
56+
5157
public void send(final EventBatch batch) {
5258
if (chField != null) {
5359
batch.addExtraFields(chField);
@@ -60,9 +66,9 @@ public String executeHttpRequest(final HttpUriRequest req) {
6066
return indexer.executeHttpRequest(req);
6167
}
6268

63-
public boolean hasBackPressure() {
64-
return indexer.hasBackPressure();
65-
}
69+
public boolean hasBackPressure() { return indexer.hasBackPressure(); }
70+
71+
public boolean isNotAvailable() { return isAvailable == false; }
6672

6773
@Override
6874
public boolean equals(Object obj) {

src/main/java/com/splunk/hecclient/Indexer.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.slf4j.LoggerFactory;
3030

3131
import java.io.IOException;
32+
import java.util.concurrent.ConcurrentHashMap;
3233

3334
final class Indexer implements IndexerInf {
3435
private static final Logger log = LoggerFactory.getLogger(Indexer.class);
@@ -124,9 +125,9 @@ public boolean send(final EventBatch batch) {
124125
}
125126

126127
// we are all good
127-
poller.add(channel, batch, resp);
128-
log.debug("sent {} events to splunk through channel={} indexer={}",
129-
batch.size(), channel.getId(), getBaseUrl());
128+
poller.add(this.channel, batch, resp);
129+
log.debug("sent {} events to splunk through channel={} indexer={}", batch.size(), channel.getId(), getBaseUrl());
130+
130131
return true;
131132
}
132133

@@ -161,7 +162,13 @@ private String readAndCloseResponse(CloseableHttpResponse resp) {
161162
}
162163
}
163164

164-
// log.info("event posting, channel={}, cookies={}", channel, resp.getHeaders("Set-Cookie"));
165+
// log.info("event posting, channel={}, cookies={}, cookies.length={}", channel, resp.getHeaders("Set-Cookie"), resp.getHeaders("Set-Cookie").length);
166+
167+
if((resp.getHeaders("Set-Cookie") != null) && (resp.getHeaders("Set-Cookie").length > 0)) {
168+
log.info("Sticky session expiry detected, will cleanup old channel and its associated batches");
169+
poller.stickySessionHandler(channel);
170+
}
171+
165172
int status = resp.getStatusLine().getStatusCode();
166173
// FIXME 503 server is busy backpressure
167174
if (status != 200 && status != 201) {

src/main/java/com/splunk/hecclient/LoadBalancer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public void send(final EventBatch batch) {
5252
for (int tried = 0; tried != channels.size(); tried++) {
5353
HecChannel channel = channels.get(index);
5454
index = (index + 1) % channels.size();
55-
if (!channel.hasBackPressure()) {
55+
if (!channel.hasBackPressure() && !channel.isNotAvailable()) {
5656
channel.send(batch);
5757
return;
5858
}

src/main/java/com/splunk/hecclient/Poller.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public interface Poller {
2020
void stop();
2121
void add(HecChannel channel, EventBatch batch, String response);
2222
void fail(HecChannel channel, EventBatch batch, Exception ex);
23-
23+
void stickySessionHandler(HecChannel channel);
2424
// minimum load channel
2525
HecChannel getMinLoadChannel();
2626
long getTotalOutstandingEventBatches();

src/main/java/com/splunk/hecclient/ResponsePoller.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ public ResponsePoller(PollerCallback callback) {
3131
this.callback = callback;
3232
}
3333

34+
@Override
35+
public void stickySessionHandler(HecChannel channel) {}
36+
3437
@Override
3538
public void start() {
3639
}

src/test/java/com/splunk/hecclient/HecAckPollerTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,4 +323,36 @@ public void getMinLoadChannel() {
323323

324324
poller.stop();
325325
}
326+
327+
@Test
328+
public void stickySessionHandler() {
329+
PollerCallbackMock cb = new PollerCallbackMock();
330+
HecAckPoller poller = new HecAckPoller(cb);
331+
poller.setAckPollThreads(1);
332+
poller.setAckPollInterval(2);
333+
poller.start();
334+
335+
IndexerMock indexer = new IndexerMock();
336+
String ackResponse = "{\"acks\":{\"1\":true}}";
337+
indexer.setResponse(ackResponse);
338+
339+
HecChannel ch = new HecChannel(indexer);
340+
EventBatch batch = UnitUtil.createBatch();
341+
342+
String response = "{\"text\":\"Success\",\"code\":0,\"ackId\":1}";
343+
poller.add(ch, batch, response);
344+
345+
long outstanding = poller.getTotalOutstandingEventBatches();
346+
Assert.assertEquals(1, outstanding);
347+
UnitUtil.milliSleep(3000);
348+
349+
String oldId = ch.getId();
350+
poller.stickySessionHandler(ch);
351+
Assert.assertNotEquals(oldId, ch.getId());
352+
353+
outstanding = poller.getTotalOutstandingEventBatches();
354+
Assert.assertEquals(0, outstanding);
355+
356+
poller.stop();
357+
}
326358
}

src/test/java/com/splunk/hecclient/HecChannelTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,18 @@ public void getterSetter() {
3636

3737
Assert.assertEquals(id, ch.toString());
3838
Assert.assertNotNull(ch.hashCode());
39+
40+
Assert.assertFalse(ch.isNotAvailable());
41+
ch.setAvailable(true);
42+
Assert.assertFalse(ch.isNotAvailable());
43+
ch.setAvailable(false);
44+
Assert.assertTrue(ch.isNotAvailable());
45+
46+
ch.setId();
47+
String newId = ch.getId();
48+
Assert.assertNotNull(newId);
49+
Assert.assertFalse(newId.isEmpty());
50+
Assert.assertNotEquals(id, newId);
3951
}
4052

4153
@Test

src/test/java/com/splunk/hecclient/LoadBalancerTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,32 @@ public void sendWithOneBackPressure() {
102102
Assert.assertEquals(6, indexers.get(2).getBatches().size());
103103
}
104104

105+
@Test
106+
public void sendWithOneNotAvailable() {
107+
LoadBalancer lb = new LoadBalancer();
108+
List<IndexerMock> indexers = new ArrayList<>();
109+
110+
int numberOfChannels = 3;
111+
for (int i = 0; i < numberOfChannels; i++) {
112+
IndexerMock indexer = new IndexerMock();
113+
indexers.add(indexer);
114+
HecChannel ch = new HecChannel(indexer);
115+
if(i == 0) {
116+
ch.setAvailable(false);
117+
}
118+
lb.add(ch);
119+
}
120+
121+
int numberOfBatches = 12;
122+
for (int i = 0; i < numberOfBatches; i++) {
123+
lb.send(UnitUtil.createBatch());
124+
}
125+
126+
Assert.assertEquals(0, indexers.get(0).getBatches().size());
127+
Assert.assertEquals(6, indexers.get(1).getBatches().size());
128+
Assert.assertEquals(6, indexers.get(2).getBatches().size());
129+
}
130+
105131
@Test(expected = HecException.class)
106132
public void sendWithoutChannels() {
107133
LoadBalancer lb = new LoadBalancer();

src/test/java/com/splunk/hecclient/PollerMock.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ public void add(HecChannel channel, EventBatch batch, String resp) {
6262
this.response = resp;
6363
}
6464

65+
@Override
66+
public void stickySessionHandler(HecChannel channel) {}
67+
6568
public boolean isStarted() {
6669
return started;
6770
}

0 commit comments

Comments
 (0)