Skip to content

Commit 9ab633d

Browse files
fix
1 parent d789ce1 commit 9ab633d

File tree

4 files changed

+19
-2
lines changed

4 files changed

+19
-2
lines changed

src/main/java/com/splunk/hecclient/HecAckPoller.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public final class HecAckPoller implements Poller {
4444
private ScheduledThreadPoolExecutor scheduler;
4545
private ExecutorService executorService;
4646
private AtomicBoolean started;
47+
private AtomicBoolean stickySessionStarted;
4748

4849
public HecAckPoller(PollerCallback cb) {
4950
outstandingEventBatches = new ConcurrentHashMap<>();
@@ -53,6 +54,11 @@ public HecAckPoller(PollerCallback cb) {
5354
pollThreads = 2;
5455
pollerCallback = cb;
5556
started = new AtomicBoolean(false);
57+
stickySessionStarted = new AtomicBoolean(false);
58+
}
59+
60+
public void setStickySessionToTrue() {
61+
stickySessionStarted.compareAndSet(false, true);
5662
}
5763

5864
@Override
@@ -201,6 +207,9 @@ public int getAckPollInterval() {
201207
* @since 1.1.0
202208
*/
203209
public void stickySessionHandler(HecChannel channel) {
210+
if (!stickySessionStarted.get()) {
211+
return;
212+
}
204213
String oldChannelId = channel.getId();
205214
channel.setAvailable(false);
206215
log.info("Channel {} set to be not available", oldChannelId);
@@ -228,6 +237,7 @@ public void stickySessionHandler(HecChannel channel) {
228237

229238
channel.setAvailable(true);
230239
log.info("Channel {} is available", newChannelId);
240+
stickySessionStarted.compareAndSet(true, false);
231241
}
232242

233243
private void poll() {
@@ -305,6 +315,7 @@ private void handleAckPollResponse(String resp, HecChannel channel) {
305315
log.error("failed to handle ack polled result", ex);
306316
return;
307317
}
318+
stickySessionHandler(channel);
308319
handleAckPollResult(channel, ackPollResult);
309320
}
310321

src/main/java/com/splunk/hecclient/Indexer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@ public boolean send(final EventBatch batch) {
124124
return false;
125125
}
126126

127+
poller.stickySessionHandler(channel);
128+
127129
// we are all good
128130
poller.add(this.channel, batch, resp);
129131
log.debug("sent {} events to splunk through channel={} indexer={}", batch.size(), channel.getId(), getBaseUrl());
@@ -166,7 +168,8 @@ private String readAndCloseResponse(CloseableHttpResponse resp) {
166168

167169
if((resp.getHeaders("Set-Cookie") != null) && (resp.getHeaders("Set-Cookie").length > 0)) {
168170
log.info("Sticky session expiry detected, will cleanup old channel and its associated batches");
169-
poller.stickySessionHandler(channel);
171+
poller.setStickySessionToTrue();
172+
// poller.stickySessionHandler(channel);
170173
}
171174

172175
int status = resp.getStatusLine().getStatusCode();

src/main/java/com/splunk/hecclient/Poller.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ public interface Poller {
2121
void add(HecChannel channel, EventBatch batch, String response);
2222
void fail(HecChannel channel, EventBatch batch, Exception ex);
2323
void stickySessionHandler(HecChannel channel);
24-
// minimum load channel
24+
void setStickySessionToTrue();
25+
// minimum load channel
2526
HecChannel getMinLoadChannel();
2627
long getTotalOutstandingEventBatches();
2728
}

src/main/java/com/splunk/hecclient/ResponsePoller.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,6 @@ public void add(HecChannel channel, EventBatch batch, String resp) {
7979
callback.onEventCommitted(Arrays.asList(batch));
8080
}
8181
}
82+
83+
public void setStickySessionToTrue() {}
8284
}

0 commit comments

Comments
 (0)