@@ -132,7 +132,7 @@ public void add(HecChannel channel, EventBatch batch, String response) {
132132 }
133133
134134 if (channelEvents .get (resp .getAckId ()) != null ) {
135- log .error ("ackId={} already exists for channel={} index={}" , resp .getAckId (), channel , channel .getIndexer ());
135+ log .warn ("ackId={} already exists for channel={} index={} data may be duplicated in Splunk " , resp .getAckId (), channel , channel .getIndexer ());
136136 return ;
137137 }
138138
@@ -184,6 +184,51 @@ public int getAckPollInterval() {
184184 return ackPollInterval ;
185185 }
186186
187+ /**
188+ * StickySessionHandler is used to reassign channel id and fail the batches for that HecChannel.
189+ * Also, the HecChannel will be unavailable during this period.
190+ *
191+ * @param channel HecChannel is the channel for which id has tobe changed and batches have to be failed.
192+ * @see HecChannel
193+ * @since 1.1.0
194+ */
195+ public void stickySessionHandler (HecChannel channel ) {
196+ String oldChannelId = channel .getId ();
197+ // Set channel unavailable
198+ channel .setAvailable (false );
199+ log .info ("Channel {} set to be not available" , oldChannelId );
200+ // Get batches for the channel
201+ ConcurrentHashMap <Long , EventBatch > channelBatches = outstandingEventBatches .get (channel );
202+ // Remove batches for the channel from the poller
203+ if (channelBatches != null && channelBatches .size () > 0 ) {
204+ log .info ("Failing {} batches for the channel {}, these will be resent by the connector." , channelBatches .size (), oldChannelId );
205+ // Remove batches from kafka record tracker to fail them and resend
206+ if (pollerCallback != null ) {
207+ List <EventBatch > expired = new ArrayList <>();
208+ Iterator iter = channelBatches .entrySet ().iterator ();
209+ while (iter .hasNext ()) {
210+ Map .Entry <Long , EventBatch > pair = (Map .Entry ) iter .next ();
211+ EventBatch batch = pair .getValue ();
212+ totalOutstandingEventBatches .decrementAndGet ();
213+ batch .fail ();
214+ expired .add (batch );
215+ iter .remove ();
216+ }
217+ pollerCallback .onEventFailure (expired , new HecException ("sticky_session_expired" ));
218+ }
219+ }
220+ // Remove channel
221+ outstandingEventBatches .remove (channel );
222+ // Change channel id
223+ channel .setId ();
224+ String newChannelId = channel .getId ();
225+ log .info ("Changed channel id from {} to {}" , oldChannelId , newChannelId );
226+
227+ // Set channel available
228+ channel .setAvailable (true );
229+ log .info ("Channel {} is available" , newChannelId );
230+ }
231+
187232 private void poll () {
188233 if (totalOutstandingEventBatches .get () <= 0 || outstandingEventBatches .size () <= 0 ) {
189234 return ;
@@ -252,14 +297,16 @@ private void findAndRemoveTimedoutBatches(Map<Long, EventBatch> batches, List<Ev
252297
253298 private void handleAckPollResponse (String resp , HecChannel channel ) {
254299 log .debug ("ackPollResponse={}, channel={}" , resp , channel );
255- HecAckPollResponse ackPollResult ;
300+ HecAckPollResponse ackPollResult = null ;
256301 try {
257302 ackPollResult = jsonMapper .readValue (resp , HecAckPollResponse .class );
258303 } catch (Exception ex ) {
259304 log .error ("failed to handle ack polled result" , ex );
260305 return ;
261306 }
262- handleAckPollResult (channel , ackPollResult );
307+ if (ackPollResult != null ) {
308+ handleAckPollResult (channel , ackPollResult );
309+ }
263310 }
264311
265312 private void handleAckPollResult (HecChannel channel , HecAckPollResponse result ) {
@@ -273,19 +320,22 @@ private void handleAckPollResult(HecChannel channel, HecAckPollResponse result)
273320
274321 List <EventBatch > committedBatches = new ArrayList <>();
275322 ConcurrentHashMap <Long , EventBatch > channelBatches = outstandingEventBatches .get (channel );
276- for (Long id : ids ) {
277- EventBatch batch = channelBatches .remove (id );
278- if (batch == null ) {
279- log .warn ("event batch id={} for channel={} on host={} is not in map anymore" , id , channel , channel .getIndexer ());
280- continue ;
323+ // Added null check as channelBatches might still be null(It may be removed while handling sticky sessions and not added until we send more data)
324+ if (channelBatches != null ) {
325+ for (Long id : ids ) {
326+ EventBatch batch = channelBatches .remove (id );
327+ if (batch == null ) {
328+ log .warn ("event batch id={} for channel={} on host={} is not in map anymore" , id , channel , channel .getIndexer ());
329+ continue ;
330+ }
331+ totalOutstandingEventBatches .decrementAndGet ();
332+ batch .commit ();
333+ committedBatches .add (batch );
281334 }
282- totalOutstandingEventBatches .decrementAndGet ();
283- batch .commit ();
284- committedBatches .add (batch );
285- }
286335
287- if (!committedBatches .isEmpty () && pollerCallback != null ) {
288- pollerCallback .onEventCommitted (committedBatches );
336+ if (!committedBatches .isEmpty () && pollerCallback != null ) {
337+ pollerCallback .onEventCommitted (committedBatches );
338+ }
289339 }
290340 }
291341
0 commit comments