From 33306e817ae8427c679ba7925c3d52deda327165 Mon Sep 17 00:00:00 2001 From: Chaitanya Phalak Date: Wed, 15 Aug 2018 17:57:51 -0700 Subject: [PATCH] INGEST-1513: Handle sticky session expiration --- .../com/splunk/hecclient/HecAckPoller.java | 73 ++++++++++++++++--- .../java/com/splunk/hecclient/HecChannel.java | 12 ++- .../java/com/splunk/hecclient/Indexer.java | 15 +++- .../com/splunk/hecclient/LoadBalancer.java | 2 +- .../java/com/splunk/hecclient/Poller.java | 2 +- .../com/splunk/hecclient/ResponsePoller.java | 3 + .../splunk/hecclient/HecAckPollerTest.java | 32 ++++++++ .../com/splunk/hecclient/HecChannelTest.java | 12 +++ .../splunk/hecclient/LoadBalancerTest.java | 26 +++++++ .../java/com/splunk/hecclient/PollerMock.java | 3 + 10 files changed, 159 insertions(+), 21 deletions(-) diff --git a/src/main/java/com/splunk/hecclient/HecAckPoller.java b/src/main/java/com/splunk/hecclient/HecAckPoller.java index bd5deb04..0fcfafa9 100644 --- a/src/main/java/com/splunk/hecclient/HecAckPoller.java +++ b/src/main/java/com/splunk/hecclient/HecAckPoller.java @@ -132,7 +132,7 @@ public void add(HecChannel channel, EventBatch batch, String response) { } if (channelEvents.get(resp.getAckId()) != null) { - log.error("ackId={} already exists for channel={} index={}", resp.getAckId(), channel, channel.getIndexer()); + log.warn("ackId={} already exists for channel={} index={} data may be duplicated in Splunk", resp.getAckId(), channel, channel.getIndexer()); return; } @@ -184,6 +184,52 @@ public int getAckPollInterval() { return ackPollInterval; } + /** + * StickySessionHandler is used to reassign channel id and fail the batches for that HecChannel. + * Also, the HecChannel will be unavailable during this period. + * StickySessionHandler follows the following flow: + * 1) Set channel unavailable + * 2) Get batches for the channel + * 3) Remove batches for the channel from the poller + * 4) Remove batches from kafka record tracker to fail them and resend + * 5) Remove channel + * 6) Change channel id + * 7) Set channel available + * + * @param channel HecChannel is the channel for which id has tobe changed and batches have to be failed. + * @see HecChannel + * @since 1.1.0 + */ + public void stickySessionHandler(HecChannel channel) { + String oldChannelId = channel.getId(); + channel.setAvailable(false); + log.info("Channel {} set to be not available", oldChannelId); + ConcurrentHashMap channelBatches = outstandingEventBatches.get(channel); + if(channelBatches != null && channelBatches.size() > 0) { + log.info("Failing {} batches for the channel {}, these will be resent by the connector.", channelBatches.size(), oldChannelId); + if (pollerCallback != null) { + List expired = new ArrayList<>(); + Iterator iter = channelBatches.entrySet().iterator(); + while(iter.hasNext()) { + Map.Entry pair = (Map.Entry) iter.next(); + EventBatch batch = pair.getValue(); + totalOutstandingEventBatches.decrementAndGet(); + batch.fail(); + expired.add(batch); + iter.remove(); + } + pollerCallback.onEventFailure(expired, new HecException("sticky_session_expired")); + } + } + outstandingEventBatches.remove(channel); + channel.setId(); + String newChannelId = channel.getId(); + log.info("Changed channel id from {} to {}", oldChannelId, newChannelId); + + channel.setAvailable(true); + log.info("Channel {} is available", newChannelId); + } + private void poll() { if (totalOutstandingEventBatches.get() <= 0 || outstandingEventBatches.size() <= 0) { return; @@ -273,19 +319,22 @@ private void handleAckPollResult(HecChannel channel, HecAckPollResponse result) List committedBatches = new ArrayList<>(); ConcurrentHashMap channelBatches = outstandingEventBatches.get(channel); - for (Long id: ids) { - EventBatch batch = channelBatches.remove(id); - if (batch == null) { - log.warn("event batch id={} for channel={} on host={} is not in map anymore", id, channel, channel.getIndexer()); - continue; + // Added null check as channelBatches might still be null(It may be removed while handling sticky sessions and not added until we send more data) + if (channelBatches != null) { + for (Long id: ids) { + EventBatch batch = channelBatches.remove(id); + if (batch == null) { + log.warn("event batch id={} for channel={} on host={} is not in map anymore", id, channel, channel.getIndexer()); + continue; + } + totalOutstandingEventBatches.decrementAndGet(); + batch.commit(); + committedBatches.add(batch); } - totalOutstandingEventBatches.decrementAndGet(); - batch.commit(); - committedBatches.add(batch); - } - if (!committedBatches.isEmpty() && pollerCallback != null) { - pollerCallback.onEventCommitted(committedBatches); + if (!committedBatches.isEmpty() && pollerCallback != null) { + pollerCallback.onEventCommitted(committedBatches); + } } } diff --git a/src/main/java/com/splunk/hecclient/HecChannel.java b/src/main/java/com/splunk/hecclient/HecChannel.java index e624a0a7..c7f49805 100644 --- a/src/main/java/com/splunk/hecclient/HecChannel.java +++ b/src/main/java/com/splunk/hecclient/HecChannel.java @@ -24,10 +24,12 @@ final class HecChannel { private String id; private Map chField; private IndexerInf indexer; + private boolean isAvailable; public HecChannel(IndexerInf idx) { id = newChannelId(); indexer = idx; + isAvailable = true; } public IndexerInf getIndexer() { @@ -48,6 +50,10 @@ public HecChannel setTracking(boolean trackChannel) { return this; } + public void setId() { id = newChannelId(); } + + public void setAvailable(boolean isAvailable) { this.isAvailable = isAvailable; } + public void send(final EventBatch batch) { if (chField != null) { batch.addExtraFields(chField); @@ -60,9 +66,9 @@ public String executeHttpRequest(final HttpUriRequest req) { return indexer.executeHttpRequest(req); } - public boolean hasBackPressure() { - return indexer.hasBackPressure(); - } + public boolean hasBackPressure() { return indexer.hasBackPressure(); } + + public boolean isNotAvailable() { return isAvailable == false; } @Override public boolean equals(Object obj) { diff --git a/src/main/java/com/splunk/hecclient/Indexer.java b/src/main/java/com/splunk/hecclient/Indexer.java index f8b681db..1025554a 100644 --- a/src/main/java/com/splunk/hecclient/Indexer.java +++ b/src/main/java/com/splunk/hecclient/Indexer.java @@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; final class Indexer implements IndexerInf { private static final Logger log = LoggerFactory.getLogger(Indexer.class); @@ -124,9 +125,9 @@ public boolean send(final EventBatch batch) { } // we are all good - poller.add(channel, batch, resp); - log.debug("sent {} events to splunk through channel={} indexer={}", - batch.size(), channel.getId(), getBaseUrl()); + poller.add(this.channel, batch, resp); + log.debug("sent {} events to splunk through channel={} indexer={}", batch.size(), channel.getId(), getBaseUrl()); + return true; } @@ -161,7 +162,13 @@ private String readAndCloseResponse(CloseableHttpResponse resp) { } } - // log.info("event posting, channel={}, cookies={}", channel, resp.getHeaders("Set-Cookie")); +// log.info("event posting, channel={}, cookies={}, cookies.length={}", channel, resp.getHeaders("Set-Cookie"), resp.getHeaders("Set-Cookie").length); + + if((resp.getHeaders("Set-Cookie") != null) && (resp.getHeaders("Set-Cookie").length > 0)) { + log.info("Sticky session expiry detected, will cleanup old channel and its associated batches"); + poller.stickySessionHandler(channel); + } + int status = resp.getStatusLine().getStatusCode(); // FIXME 503 server is busy backpressure if (status != 200 && status != 201) { diff --git a/src/main/java/com/splunk/hecclient/LoadBalancer.java b/src/main/java/com/splunk/hecclient/LoadBalancer.java index d43d256a..3f7ffc7c 100644 --- a/src/main/java/com/splunk/hecclient/LoadBalancer.java +++ b/src/main/java/com/splunk/hecclient/LoadBalancer.java @@ -52,7 +52,7 @@ public void send(final EventBatch batch) { for (int tried = 0; tried != channels.size(); tried++) { HecChannel channel = channels.get(index); index = (index + 1) % channels.size(); - if (!channel.hasBackPressure()) { + if (!channel.hasBackPressure() && !channel.isNotAvailable()) { channel.send(batch); return; } diff --git a/src/main/java/com/splunk/hecclient/Poller.java b/src/main/java/com/splunk/hecclient/Poller.java index fcc861fc..b260089f 100644 --- a/src/main/java/com/splunk/hecclient/Poller.java +++ b/src/main/java/com/splunk/hecclient/Poller.java @@ -20,7 +20,7 @@ public interface Poller { void stop(); void add(HecChannel channel, EventBatch batch, String response); void fail(HecChannel channel, EventBatch batch, Exception ex); - + void stickySessionHandler(HecChannel channel); // minimum load channel HecChannel getMinLoadChannel(); long getTotalOutstandingEventBatches(); diff --git a/src/main/java/com/splunk/hecclient/ResponsePoller.java b/src/main/java/com/splunk/hecclient/ResponsePoller.java index e74e4251..57f1a716 100644 --- a/src/main/java/com/splunk/hecclient/ResponsePoller.java +++ b/src/main/java/com/splunk/hecclient/ResponsePoller.java @@ -31,6 +31,9 @@ public ResponsePoller(PollerCallback callback) { this.callback = callback; } + @Override + public void stickySessionHandler(HecChannel channel) {} + @Override public void start() { } diff --git a/src/test/java/com/splunk/hecclient/HecAckPollerTest.java b/src/test/java/com/splunk/hecclient/HecAckPollerTest.java index a9f71266..015f0348 100644 --- a/src/test/java/com/splunk/hecclient/HecAckPollerTest.java +++ b/src/test/java/com/splunk/hecclient/HecAckPollerTest.java @@ -323,4 +323,36 @@ public void getMinLoadChannel() { poller.stop(); } + + @Test + public void stickySessionHandler() { + PollerCallbackMock cb = new PollerCallbackMock(); + HecAckPoller poller = new HecAckPoller(cb); + poller.setAckPollThreads(1); + poller.setAckPollInterval(2); + poller.start(); + + IndexerMock indexer = new IndexerMock(); + String ackResponse = "{\"acks\":{\"1\":true}}"; + indexer.setResponse(ackResponse); + + HecChannel ch = new HecChannel(indexer); + EventBatch batch = UnitUtil.createBatch(); + + String response = "{\"text\":\"Success\",\"code\":0,\"ackId\":1}"; + poller.add(ch, batch, response); + + long outstanding = poller.getTotalOutstandingEventBatches(); + Assert.assertEquals(1, outstanding); + UnitUtil.milliSleep(3000); + + String oldId = ch.getId(); + poller.stickySessionHandler(ch); + Assert.assertNotEquals(oldId, ch.getId()); + + outstanding = poller.getTotalOutstandingEventBatches(); + Assert.assertEquals(0, outstanding); + + poller.stop(); + } } diff --git a/src/test/java/com/splunk/hecclient/HecChannelTest.java b/src/test/java/com/splunk/hecclient/HecChannelTest.java index 0dd43a60..c5138819 100644 --- a/src/test/java/com/splunk/hecclient/HecChannelTest.java +++ b/src/test/java/com/splunk/hecclient/HecChannelTest.java @@ -36,6 +36,18 @@ public void getterSetter() { Assert.assertEquals(id, ch.toString()); Assert.assertNotNull(ch.hashCode()); + + Assert.assertFalse(ch.isNotAvailable()); + ch.setAvailable(true); + Assert.assertFalse(ch.isNotAvailable()); + ch.setAvailable(false); + Assert.assertTrue(ch.isNotAvailable()); + + ch.setId(); + String newId = ch.getId(); + Assert.assertNotNull(newId); + Assert.assertFalse(newId.isEmpty()); + Assert.assertNotEquals(id, newId); } @Test diff --git a/src/test/java/com/splunk/hecclient/LoadBalancerTest.java b/src/test/java/com/splunk/hecclient/LoadBalancerTest.java index 6fb690c6..dacd923f 100644 --- a/src/test/java/com/splunk/hecclient/LoadBalancerTest.java +++ b/src/test/java/com/splunk/hecclient/LoadBalancerTest.java @@ -102,6 +102,32 @@ public void sendWithOneBackPressure() { Assert.assertEquals(6, indexers.get(2).getBatches().size()); } + @Test + public void sendWithOneNotAvailable() { + LoadBalancer lb = new LoadBalancer(); + List indexers = new ArrayList<>(); + + int numberOfChannels = 3; + for (int i = 0; i < numberOfChannels; i++) { + IndexerMock indexer = new IndexerMock(); + indexers.add(indexer); + HecChannel ch = new HecChannel(indexer); + if(i == 0) { + ch.setAvailable(false); + } + lb.add(ch); + } + + int numberOfBatches = 12; + for (int i = 0; i < numberOfBatches; i++) { + lb.send(UnitUtil.createBatch()); + } + + Assert.assertEquals(0, indexers.get(0).getBatches().size()); + Assert.assertEquals(6, indexers.get(1).getBatches().size()); + Assert.assertEquals(6, indexers.get(2).getBatches().size()); + } + @Test(expected = HecException.class) public void sendWithoutChannels() { LoadBalancer lb = new LoadBalancer(); diff --git a/src/test/java/com/splunk/hecclient/PollerMock.java b/src/test/java/com/splunk/hecclient/PollerMock.java index 8fe90990..7a89ef0a 100644 --- a/src/test/java/com/splunk/hecclient/PollerMock.java +++ b/src/test/java/com/splunk/hecclient/PollerMock.java @@ -62,6 +62,9 @@ public void add(HecChannel channel, EventBatch batch, String resp) { this.response = resp; } + @Override + public void stickySessionHandler(HecChannel channel) {} + public boolean isStarted() { return started; }