Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 61 additions & 12 deletions src/main/java/com/splunk/hecclient/HecAckPoller.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void add(HecChannel channel, EventBatch batch, String response) {
}

if (channelEvents.get(resp.getAckId()) != null) {
log.error("ackId={} already exists for channel={} index={}", resp.getAckId(), channel, channel.getIndexer());
log.warn("ackId={} already exists for channel={} index={} data may be duplicated in Splunk", resp.getAckId(), channel, channel.getIndexer());
return;
}

Expand Down Expand Up @@ -184,6 +184,52 @@ public int getAckPollInterval() {
return ackPollInterval;
}

/**
* StickySessionHandler is used to reassign channel id and fail the batches for that HecChannel.
* Also, the HecChannel will be unavailable during this period.
* StickySessionHandler follows the following flow:
* 1) Set channel unavailable
* 2) Get batches for the channel
* 3) Remove batches for the channel from the poller
* 4) Remove batches from kafka record tracker to fail them and resend
* 5) Remove channel
* 6) Change channel id
* 7) Set channel available
*
* @param channel HecChannel is the channel for which id has tobe changed and batches have to be failed.
* @see HecChannel
* @since 1.1.0
*/
public void stickySessionHandler(HecChannel channel) {
String oldChannelId = channel.getId();
channel.setAvailable(false);
log.info("Channel {} set to be not available", oldChannelId);
ConcurrentHashMap<Long, EventBatch> channelBatches = outstandingEventBatches.get(channel);
if(channelBatches != null && channelBatches.size() > 0) {
log.info("Failing {} batches for the channel {}, these will be resent by the connector.", channelBatches.size(), oldChannelId);
if (pollerCallback != null) {
List<EventBatch> expired = new ArrayList<>();
Iterator iter = channelBatches.entrySet().iterator();
while(iter.hasNext()) {
Map.Entry<Long, EventBatch> pair = (Map.Entry) iter.next();
EventBatch batch = pair.getValue();
totalOutstandingEventBatches.decrementAndGet();
batch.fail();
expired.add(batch);
iter.remove();
}
pollerCallback.onEventFailure(expired, new HecException("sticky_session_expired"));
}
}
outstandingEventBatches.remove(channel);
channel.setId();
String newChannelId = channel.getId();
log.info("Changed channel id from {} to {}", oldChannelId, newChannelId);

channel.setAvailable(true);
log.info("Channel {} is available", newChannelId);
}

private void poll() {
if (totalOutstandingEventBatches.get() <= 0 || outstandingEventBatches.size() <= 0) {
return;
Expand Down Expand Up @@ -273,19 +319,22 @@ private void handleAckPollResult(HecChannel channel, HecAckPollResponse result)

List<EventBatch> committedBatches = new ArrayList<>();
ConcurrentHashMap<Long, EventBatch> channelBatches = outstandingEventBatches.get(channel);
for (Long id: ids) {
EventBatch batch = channelBatches.remove(id);
if (batch == null) {
log.warn("event batch id={} for channel={} on host={} is not in map anymore", id, channel, channel.getIndexer());
continue;
// Added null check as channelBatches might still be null(It may be removed while handling sticky sessions and not added until we send more data)
if (channelBatches != null) {
for (Long id: ids) {
EventBatch batch = channelBatches.remove(id);
if (batch == null) {
log.warn("event batch id={} for channel={} on host={} is not in map anymore", id, channel, channel.getIndexer());
continue;
}
totalOutstandingEventBatches.decrementAndGet();
batch.commit();
committedBatches.add(batch);
}
totalOutstandingEventBatches.decrementAndGet();
batch.commit();
committedBatches.add(batch);
}

if (!committedBatches.isEmpty() && pollerCallback != null) {
pollerCallback.onEventCommitted(committedBatches);
if (!committedBatches.isEmpty() && pollerCallback != null) {
pollerCallback.onEventCommitted(committedBatches);
}
}
}

Expand Down
12 changes: 9 additions & 3 deletions src/main/java/com/splunk/hecclient/HecChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ final class HecChannel {
private String id;
private Map<String, String> chField;
private IndexerInf indexer;
private boolean isAvailable;

public HecChannel(IndexerInf idx) {
id = newChannelId();
indexer = idx;
isAvailable = true;
}

public IndexerInf getIndexer() {
Expand All @@ -48,6 +50,10 @@ public HecChannel setTracking(boolean trackChannel) {
return this;
}

public void setId() { id = newChannelId(); }

public void setAvailable(boolean isAvailable) { this.isAvailable = isAvailable; }

public void send(final EventBatch batch) {
if (chField != null) {
batch.addExtraFields(chField);
Expand All @@ -60,9 +66,9 @@ public String executeHttpRequest(final HttpUriRequest req) {
return indexer.executeHttpRequest(req);
}

public boolean hasBackPressure() {
return indexer.hasBackPressure();
}
public boolean hasBackPressure() { return indexer.hasBackPressure(); }

public boolean isNotAvailable() { return isAvailable == false; }

@Override
public boolean equals(Object obj) {
Expand Down
15 changes: 11 additions & 4 deletions src/main/java/com/splunk/hecclient/Indexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

final class Indexer implements IndexerInf {
private static final Logger log = LoggerFactory.getLogger(Indexer.class);
Expand Down Expand Up @@ -124,9 +125,9 @@ public boolean send(final EventBatch batch) {
}

// we are all good
poller.add(channel, batch, resp);
log.debug("sent {} events to splunk through channel={} indexer={}",
batch.size(), channel.getId(), getBaseUrl());
poller.add(this.channel, batch, resp);
log.debug("sent {} events to splunk through channel={} indexer={}", batch.size(), channel.getId(), getBaseUrl());

return true;
}

Expand Down Expand Up @@ -161,7 +162,13 @@ private String readAndCloseResponse(CloseableHttpResponse resp) {
}
}

// log.info("event posting, channel={}, cookies={}", channel, resp.getHeaders("Set-Cookie"));
// log.info("event posting, channel={}, cookies={}, cookies.length={}", channel, resp.getHeaders("Set-Cookie"), resp.getHeaders("Set-Cookie").length);

if((resp.getHeaders("Set-Cookie") != null) && (resp.getHeaders("Set-Cookie").length > 0)) {
log.info("Sticky session expiry detected, will cleanup old channel and its associated batches");
poller.stickySessionHandler(channel);
}

int status = resp.getStatusLine().getStatusCode();
// FIXME 503 server is busy backpressure
if (status != 200 && status != 201) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/splunk/hecclient/LoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void send(final EventBatch batch) {
for (int tried = 0; tried != channels.size(); tried++) {
HecChannel channel = channels.get(index);
index = (index + 1) % channels.size();
if (!channel.hasBackPressure()) {
if (!channel.hasBackPressure() && !channel.isNotAvailable()) {
channel.send(batch);
return;
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/splunk/hecclient/Poller.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public interface Poller {
void stop();
void add(HecChannel channel, EventBatch batch, String response);
void fail(HecChannel channel, EventBatch batch, Exception ex);

void stickySessionHandler(HecChannel channel);
// minimum load channel
HecChannel getMinLoadChannel();
long getTotalOutstandingEventBatches();
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/splunk/hecclient/ResponsePoller.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ public ResponsePoller(PollerCallback callback) {
this.callback = callback;
}

@Override
public void stickySessionHandler(HecChannel channel) {}

@Override
public void start() {
}
Expand Down
32 changes: 32 additions & 0 deletions src/test/java/com/splunk/hecclient/HecAckPollerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -323,4 +323,36 @@ public void getMinLoadChannel() {

poller.stop();
}

@Test
public void stickySessionHandler() {
PollerCallbackMock cb = new PollerCallbackMock();
HecAckPoller poller = new HecAckPoller(cb);
poller.setAckPollThreads(1);
poller.setAckPollInterval(2);
poller.start();

IndexerMock indexer = new IndexerMock();
String ackResponse = "{\"acks\":{\"1\":true}}";
indexer.setResponse(ackResponse);

HecChannel ch = new HecChannel(indexer);
EventBatch batch = UnitUtil.createBatch();

String response = "{\"text\":\"Success\",\"code\":0,\"ackId\":1}";
poller.add(ch, batch, response);

long outstanding = poller.getTotalOutstandingEventBatches();
Assert.assertEquals(1, outstanding);
UnitUtil.milliSleep(3000);

String oldId = ch.getId();
poller.stickySessionHandler(ch);
Assert.assertNotEquals(oldId, ch.getId());

outstanding = poller.getTotalOutstandingEventBatches();
Assert.assertEquals(0, outstanding);

poller.stop();
}
}
12 changes: 12 additions & 0 deletions src/test/java/com/splunk/hecclient/HecChannelTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ public void getterSetter() {

Assert.assertEquals(id, ch.toString());
Assert.assertNotNull(ch.hashCode());

Assert.assertFalse(ch.isNotAvailable());
ch.setAvailable(true);
Assert.assertFalse(ch.isNotAvailable());
ch.setAvailable(false);
Assert.assertTrue(ch.isNotAvailable());

ch.setId();
String newId = ch.getId();
Assert.assertNotNull(newId);
Assert.assertFalse(newId.isEmpty());
Assert.assertNotEquals(id, newId);
}

@Test
Expand Down
26 changes: 26 additions & 0 deletions src/test/java/com/splunk/hecclient/LoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,32 @@ public void sendWithOneBackPressure() {
Assert.assertEquals(6, indexers.get(2).getBatches().size());
}

@Test
public void sendWithOneNotAvailable() {
LoadBalancer lb = new LoadBalancer();
List<IndexerMock> indexers = new ArrayList<>();

int numberOfChannels = 3;
for (int i = 0; i < numberOfChannels; i++) {
IndexerMock indexer = new IndexerMock();
indexers.add(indexer);
HecChannel ch = new HecChannel(indexer);
if(i == 0) {
ch.setAvailable(false);
}
lb.add(ch);
}

int numberOfBatches = 12;
for (int i = 0; i < numberOfBatches; i++) {
lb.send(UnitUtil.createBatch());
}

Assert.assertEquals(0, indexers.get(0).getBatches().size());
Assert.assertEquals(6, indexers.get(1).getBatches().size());
Assert.assertEquals(6, indexers.get(2).getBatches().size());
}

@Test(expected = HecException.class)
public void sendWithoutChannels() {
LoadBalancer lb = new LoadBalancer();
Expand Down
3 changes: 3 additions & 0 deletions src/test/java/com/splunk/hecclient/PollerMock.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ public void add(HecChannel channel, EventBatch batch, String resp) {
this.response = resp;
}

@Override
public void stickySessionHandler(HecChannel channel) {}

public boolean isStarted() {
return started;
}
Expand Down