From 9ab633d208fb118d5727714683501e7940e2c776 Mon Sep 17 00:00:00 2001 From: Chaitanya Phalak Date: Wed, 22 Aug 2018 10:00:01 -0700 Subject: [PATCH 1/3] fix --- src/main/java/com/splunk/hecclient/HecAckPoller.java | 11 +++++++++++ src/main/java/com/splunk/hecclient/Indexer.java | 5 ++++- src/main/java/com/splunk/hecclient/Poller.java | 3 ++- .../java/com/splunk/hecclient/ResponsePoller.java | 2 ++ 4 files changed, 19 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/splunk/hecclient/HecAckPoller.java b/src/main/java/com/splunk/hecclient/HecAckPoller.java index 0fcfafa9..08c7d79e 100644 --- a/src/main/java/com/splunk/hecclient/HecAckPoller.java +++ b/src/main/java/com/splunk/hecclient/HecAckPoller.java @@ -44,6 +44,7 @@ public final class HecAckPoller implements Poller { private ScheduledThreadPoolExecutor scheduler; private ExecutorService executorService; private AtomicBoolean started; + private AtomicBoolean stickySessionStarted; public HecAckPoller(PollerCallback cb) { outstandingEventBatches = new ConcurrentHashMap<>(); @@ -53,6 +54,11 @@ public HecAckPoller(PollerCallback cb) { pollThreads = 2; pollerCallback = cb; started = new AtomicBoolean(false); + stickySessionStarted = new AtomicBoolean(false); + } + + public void setStickySessionToTrue() { + stickySessionStarted.compareAndSet(false, true); } @Override @@ -201,6 +207,9 @@ public int getAckPollInterval() { * @since 1.1.0 */ public void stickySessionHandler(HecChannel channel) { + if (!stickySessionStarted.get()) { + return; + } String oldChannelId = channel.getId(); channel.setAvailable(false); log.info("Channel {} set to be not available", oldChannelId); @@ -228,6 +237,7 @@ public void stickySessionHandler(HecChannel channel) { channel.setAvailable(true); log.info("Channel {} is available", newChannelId); + stickySessionStarted.compareAndSet(true, false); } private void poll() { @@ -305,6 +315,7 @@ private void handleAckPollResponse(String resp, HecChannel channel) { log.error("failed to handle ack polled result", ex); return; } + stickySessionHandler(channel); handleAckPollResult(channel, ackPollResult); } diff --git a/src/main/java/com/splunk/hecclient/Indexer.java b/src/main/java/com/splunk/hecclient/Indexer.java index 1025554a..5ebc6f76 100644 --- a/src/main/java/com/splunk/hecclient/Indexer.java +++ b/src/main/java/com/splunk/hecclient/Indexer.java @@ -124,6 +124,8 @@ public boolean send(final EventBatch batch) { return false; } + poller.stickySessionHandler(channel); + // we are all good poller.add(this.channel, batch, resp); log.debug("sent {} events to splunk through channel={} indexer={}", batch.size(), channel.getId(), getBaseUrl()); @@ -166,7 +168,8 @@ private String readAndCloseResponse(CloseableHttpResponse resp) { if((resp.getHeaders("Set-Cookie") != null) && (resp.getHeaders("Set-Cookie").length > 0)) { log.info("Sticky session expiry detected, will cleanup old channel and its associated batches"); - poller.stickySessionHandler(channel); + poller.setStickySessionToTrue(); + // poller.stickySessionHandler(channel); } int status = resp.getStatusLine().getStatusCode(); diff --git a/src/main/java/com/splunk/hecclient/Poller.java b/src/main/java/com/splunk/hecclient/Poller.java index b260089f..483f7cde 100644 --- a/src/main/java/com/splunk/hecclient/Poller.java +++ b/src/main/java/com/splunk/hecclient/Poller.java @@ -21,7 +21,8 @@ public interface Poller { void add(HecChannel channel, EventBatch batch, String response); void fail(HecChannel channel, EventBatch batch, Exception ex); void stickySessionHandler(HecChannel channel); - // minimum load channel + void setStickySessionToTrue(); + // minimum load channel HecChannel getMinLoadChannel(); long getTotalOutstandingEventBatches(); } diff --git a/src/main/java/com/splunk/hecclient/ResponsePoller.java b/src/main/java/com/splunk/hecclient/ResponsePoller.java index 57f1a716..5f59a88b 100644 --- a/src/main/java/com/splunk/hecclient/ResponsePoller.java +++ b/src/main/java/com/splunk/hecclient/ResponsePoller.java @@ -79,4 +79,6 @@ public void add(HecChannel channel, EventBatch batch, String resp) { callback.onEventCommitted(Arrays.asList(batch)); } } + + public void setStickySessionToTrue() {} } From bd937abbda1335ee96e7a7c44dff64448371d970 Mon Sep 17 00:00:00 2001 From: Chaitanya Phalak Date: Wed, 22 Aug 2018 10:00:39 -0700 Subject: [PATCH 2/3] fix --- .../splunk/hecclient/HecAckPollerTest.java | 56 +++++++++---------- .../java/com/splunk/hecclient/PollerMock.java | 2 + 2 files changed, 30 insertions(+), 28 deletions(-) diff --git a/src/test/java/com/splunk/hecclient/HecAckPollerTest.java b/src/test/java/com/splunk/hecclient/HecAckPollerTest.java index 015f0348..a5e72a78 100644 --- a/src/test/java/com/splunk/hecclient/HecAckPollerTest.java +++ b/src/test/java/com/splunk/hecclient/HecAckPollerTest.java @@ -326,33 +326,33 @@ public void getMinLoadChannel() { @Test public void stickySessionHandler() { - PollerCallbackMock cb = new PollerCallbackMock(); - HecAckPoller poller = new HecAckPoller(cb); - poller.setAckPollThreads(1); - poller.setAckPollInterval(2); - poller.start(); - - IndexerMock indexer = new IndexerMock(); - String ackResponse = "{\"acks\":{\"1\":true}}"; - indexer.setResponse(ackResponse); - - HecChannel ch = new HecChannel(indexer); - EventBatch batch = UnitUtil.createBatch(); - - String response = "{\"text\":\"Success\",\"code\":0,\"ackId\":1}"; - poller.add(ch, batch, response); - - long outstanding = poller.getTotalOutstandingEventBatches(); - Assert.assertEquals(1, outstanding); - UnitUtil.milliSleep(3000); - - String oldId = ch.getId(); - poller.stickySessionHandler(ch); - Assert.assertNotEquals(oldId, ch.getId()); - - outstanding = poller.getTotalOutstandingEventBatches(); - Assert.assertEquals(0, outstanding); - - poller.stop(); +// PollerCallbackMock cb = new PollerCallbackMock(); +// HecAckPoller poller = new HecAckPoller(cb); +// poller.setAckPollThreads(1); +// poller.setAckPollInterval(2); +// poller.start(); +// +// IndexerMock indexer = new IndexerMock(); +// String ackResponse = "{\"acks\":{\"1\":true}}"; +// indexer.setResponse(ackResponse); +// +// HecChannel ch = new HecChannel(indexer); +// EventBatch batch = UnitUtil.createBatch(); +// +// String response = "{\"text\":\"Success\",\"code\":0,\"ackId\":1}"; +// poller.add(ch, batch, response); +// +// long outstanding = poller.getTotalOutstandingEventBatches(); +// Assert.assertEquals(1, outstanding); +// UnitUtil.milliSleep(3000); +// +// String oldId = ch.getId(); +// poller.stickySessionHandler(ch); +// Assert.assertNotEquals(oldId, ch.getId()); +// +// outstanding = poller.getTotalOutstandingEventBatches(); +// Assert.assertEquals(0, outstanding); +// +// poller.stop(); } } diff --git a/src/test/java/com/splunk/hecclient/PollerMock.java b/src/test/java/com/splunk/hecclient/PollerMock.java index 7a89ef0a..2d040070 100644 --- a/src/test/java/com/splunk/hecclient/PollerMock.java +++ b/src/test/java/com/splunk/hecclient/PollerMock.java @@ -88,4 +88,6 @@ public Exception getException() { public String getResponse() { return response; } + public void setStickySessionToTrue() { + } } From 8eb8d73e80dbe09f39dfebe3d37b6e8f02911c7f Mon Sep 17 00:00:00 2001 From: Chaitanya Phalak Date: Wed, 22 Aug 2018 10:00:01 -0700 Subject: [PATCH 3/3] INGEST-1513: Handle sticky session expiration - improvement/bugfix --- .../java/com/splunk/hecclient/Indexer.java | 1 - .../splunk/hecclient/HecAckPollerTest.java | 57 ++++++++++--------- 2 files changed, 29 insertions(+), 29 deletions(-) diff --git a/src/main/java/com/splunk/hecclient/Indexer.java b/src/main/java/com/splunk/hecclient/Indexer.java index 5ebc6f76..43611c23 100644 --- a/src/main/java/com/splunk/hecclient/Indexer.java +++ b/src/main/java/com/splunk/hecclient/Indexer.java @@ -169,7 +169,6 @@ private String readAndCloseResponse(CloseableHttpResponse resp) { if((resp.getHeaders("Set-Cookie") != null) && (resp.getHeaders("Set-Cookie").length > 0)) { log.info("Sticky session expiry detected, will cleanup old channel and its associated batches"); poller.setStickySessionToTrue(); - // poller.stickySessionHandler(channel); } int status = resp.getStatusLine().getStatusCode(); diff --git a/src/test/java/com/splunk/hecclient/HecAckPollerTest.java b/src/test/java/com/splunk/hecclient/HecAckPollerTest.java index a5e72a78..e25a840e 100644 --- a/src/test/java/com/splunk/hecclient/HecAckPollerTest.java +++ b/src/test/java/com/splunk/hecclient/HecAckPollerTest.java @@ -326,33 +326,34 @@ public void getMinLoadChannel() { @Test public void stickySessionHandler() { -// PollerCallbackMock cb = new PollerCallbackMock(); -// HecAckPoller poller = new HecAckPoller(cb); -// poller.setAckPollThreads(1); -// poller.setAckPollInterval(2); -// poller.start(); -// -// IndexerMock indexer = new IndexerMock(); -// String ackResponse = "{\"acks\":{\"1\":true}}"; -// indexer.setResponse(ackResponse); -// -// HecChannel ch = new HecChannel(indexer); -// EventBatch batch = UnitUtil.createBatch(); -// -// String response = "{\"text\":\"Success\",\"code\":0,\"ackId\":1}"; -// poller.add(ch, batch, response); -// -// long outstanding = poller.getTotalOutstandingEventBatches(); -// Assert.assertEquals(1, outstanding); -// UnitUtil.milliSleep(3000); -// -// String oldId = ch.getId(); -// poller.stickySessionHandler(ch); -// Assert.assertNotEquals(oldId, ch.getId()); -// -// outstanding = poller.getTotalOutstandingEventBatches(); -// Assert.assertEquals(0, outstanding); -// -// poller.stop(); + PollerCallbackMock cb = new PollerCallbackMock(); + HecAckPoller poller = new HecAckPoller(cb); + poller.setAckPollThreads(1); + poller.setAckPollInterval(2); + poller.start(); + + IndexerMock indexer = new IndexerMock(); + String ackResponse = "{\"acks\":{\"1\":true}}"; + indexer.setResponse(ackResponse); + + HecChannel ch = new HecChannel(indexer); + EventBatch batch = UnitUtil.createBatch(); + + String response = "{\"text\":\"Success\",\"code\":0,\"ackId\":1}"; + poller.add(ch, batch, response); + + long outstanding = poller.getTotalOutstandingEventBatches(); + Assert.assertEquals(1, outstanding); + UnitUtil.milliSleep(3000); + + String oldId = ch.getId(); + poller.setStickySessionToTrue(); + poller.stickySessionHandler(ch); + Assert.assertNotEquals(oldId, ch.getId()); + + outstanding = poller.getTotalOutstandingEventBatches(); + Assert.assertEquals(0, outstanding); + + poller.stop(); } }