@@ -44,6 +44,7 @@ public final class HecAckPoller implements Poller {
4444 private ScheduledThreadPoolExecutor scheduler ;
4545 private ExecutorService executorService ;
4646 private AtomicBoolean started ;
47+ private AtomicBoolean stickySessionStarted ;
4748
4849 public HecAckPoller (PollerCallback cb ) {
4950 outstandingEventBatches = new ConcurrentHashMap <>();
@@ -53,6 +54,11 @@ public HecAckPoller(PollerCallback cb) {
5354 pollThreads = 2 ;
5455 pollerCallback = cb ;
5556 started = new AtomicBoolean (false );
57+ stickySessionStarted = new AtomicBoolean (false );
58+ }
59+
60+ public void setStickySessionToTrue () {
61+ stickySessionStarted .compareAndSet (false , true );
5662 }
5763
5864 @ Override
@@ -201,6 +207,9 @@ public int getAckPollInterval() {
201207 * @since 1.1.0
202208 */
203209 public void stickySessionHandler (HecChannel channel ) {
210+ if (!stickySessionStarted .get ()) {
211+ return ;
212+ }
204213 String oldChannelId = channel .getId ();
205214 channel .setAvailable (false );
206215 log .info ("Channel {} set to be not available" , oldChannelId );
@@ -228,6 +237,7 @@ public void stickySessionHandler(HecChannel channel) {
228237
229238 channel .setAvailable (true );
230239 log .info ("Channel {} is available" , newChannelId );
240+ stickySessionStarted .compareAndSet (true , false );
231241 }
232242
233243 private void poll () {
@@ -305,6 +315,7 @@ private void handleAckPollResponse(String resp, HecChannel channel) {
305315 log .error ("failed to handle ack polled result" , ex );
306316 return ;
307317 }
318+ stickySessionHandler (channel );
308319 handleAckPollResult (channel , ackPollResult );
309320 }
310321
0 commit comments