diff --git a/README.md b/README.md index 41d88cec..7d44dae6 100644 --- a/README.md +++ b/README.md @@ -149,6 +149,8 @@ Use the below schema to configure Splunk Connect for Kafka | `splunk.hec.ssl.trust.store.path` | Location of Java KeyStore. |`""`| | `splunk.hec.ssl.trust.store.password` | Password for Java KeyStore. |`""`| | `splunk.hec.json.event.formatted` | Set to `true` for events that are already in HEC format. Valid settings are `true` or `false`. |`false`| +| `splunk.hec.max.outstanding.events` | Maximum amount of un-acknowledged events kept in memory by connector. Will trigger back-pressure event to slow down collection if reached. | `1000000` | +| `splunk.hec.max.retries` | Amount of times a failed batch will attempt to resend before dropping events completely. Warning: This will result in data loss, default is `-1` which will retry indefinitely | `-1` | ### Acknowledgement Parameters #### Use Ack | Name | Description | Default Value | diff --git a/dependency-reduced-pom.xml b/dependency-reduced-pom.xml index 1001127d..fae13e15 100644 --- a/dependency-reduced-pom.xml +++ b/dependency-reduced-pom.xml @@ -210,4 +210,4 @@ 1.8 4.12 - \ No newline at end of file + diff --git a/src/main/java/com/splunk/hecclient/EventBatch.java b/src/main/java/com/splunk/hecclient/EventBatch.java index c18feae8..91ef550c 100644 --- a/src/main/java/com/splunk/hecclient/EventBatch.java +++ b/src/main/java/com/splunk/hecclient/EventBatch.java @@ -24,14 +24,18 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.SequenceInputStream; + import java.util.ArrayList; import java.util.Enumeration; import java.util.List; import java.util.Map; +import java.util.UUID; public abstract class EventBatch { private static Logger log = LoggerFactory.getLogger(EventBatch.class); + private UUID batchUUID = UUID.randomUUID(); + private static final int INIT = 0; private static final int COMMITTED = 1; private static final int FAILED = 2; @@ -103,6 +107,8 @@ public final List getEvents() { return events; } + public final String getUUID() {return batchUUID.toString(); } + // Total length of data for all events public final int length() { return len; diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java index 5f0ce394..65294405 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java @@ -126,22 +126,24 @@ private void handleFailedBatches() { return; } - log.debug("going to handle {} failed batches", failed.size()); + log.debug("handling {} failed batches", failed.size()); long failedEvents = 0; // if there are failed ones, first deal with them for (final EventBatch batch: failed) { failedEvents += batch.size(); if (connectorConfig.maxRetries > 0 && batch.getFailureCount() > connectorConfig.maxRetries) { - log.error("dropping EventBatch with {} events in it since it reaches max retries {}", - batch.size(), connectorConfig.maxRetries); + log.error("dropping EventBatch {} with {} events after reaching maximum retries {}", + batch.getUUID(), batch.size(), connectorConfig.maxRetries); continue; } + log.warn("attempting to resend batch {} with {} events, this is attempt {} out of {} for this batch ", + batch.getUUID(), batch.size(), batch.getFailureCount(), connectorConfig.maxRetries); send(batch); } log.info("handled {} failed batches with {} events", failed.size(), failedEvents); if (failedEvents * 10 > connectorConfig.maxOutstandingEvents) { - String msg = String.format("failed events reach 10 %% of max outstanding events %d, pause the pull for a while", connectorConfig.maxOutstandingEvents); + String msg = String.format("failed events have reached 10 %% of max outstanding events %d, pausing the pull of events for a while", connectorConfig.maxOutstandingEvents); throw new RetriableException(new HecException(msg)); } } @@ -284,7 +286,7 @@ private void send(final EventBatch batch) { } catch (Exception ex) { batch.fail(); onEventFailure(Arrays.asList(batch), ex); - log.error("failed to send batch", ex); + log.error("failed to send batch {}" ,batch.getUUID(), ex); } }