From 9bc0d8512941ba0895faeff9e04141749e26ca22 Mon Sep 17 00:00:00 2001 From: Don Tregonning Date: Tue, 30 Apr 2019 14:12:00 -0700 Subject: [PATCH 1/3] update 3rd party libs --- dependency-reduced-pom.xml | 2 +- pom.xml | 15 +++++----- .../connect/SplunkSinkConnectorConfig.java | 30 +++++++++---------- 3 files changed, 23 insertions(+), 24 deletions(-) diff --git a/dependency-reduced-pom.xml b/dependency-reduced-pom.xml index 6c52b247..5bf16bbb 100644 --- a/dependency-reduced-pom.xml +++ b/dependency-reduced-pom.xml @@ -182,7 +182,7 @@ org.slf4j slf4j-simple - 1.7.25 + 1.7.26 test diff --git a/pom.xml b/pom.xml index 553f5631..54a04651 100644 --- a/pom.xml +++ b/pom.xml @@ -18,7 +18,6 @@ 5.0.1 ${junit.version}.1 1.0.1 - @@ -26,20 +25,20 @@ com.fasterxml.jackson.core jackson-core - 2.9.5 + 2.9.8 compile com.fasterxml.jackson.core jackson-databind - 2.9.5 + 2.9.8 compile org.apache.kafka connect-api - 2.0.0 + 2.2.0 compile @@ -79,7 +78,7 @@ org.apache.httpcomponents httpclient - 4.5.3 + 4.5.8 commons-logging @@ -102,7 +101,7 @@ commons-codec commons-codec - 1.11 + 1.12 compile @@ -110,14 +109,14 @@ org.slf4j slf4j-simple - 1.7.25 + 1.7.26 test org.slf4j slf4j-api - 1.7.25 + 1.7.26 compile diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java index 0d36dee4..d2facf98 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java @@ -328,21 +328,21 @@ public String toString() { + "ackPollThreads:" + ackPollThreads + ", " + "maxHttpConnectionPerChannel:" + maxHttpConnPerChannel + ", " + "totalHecChannels:" + totalHecChannels + ", " - + "enrichment: " + getString(ENRICHMENT_CONF) + ", " - + "maxBatchSize: " + maxBatchSize + ", " - + "numberOfThreads: " + numberOfThreads + ", " - + "lineBreaker: " + lineBreaker + ", " - + "maxOutstandingEvents: " + maxOutstandingEvents + ", " - + "maxRetries: " + maxRetries + ", " - + "useRecordTimestamp: " + useRecordTimestamp + ", " - + "hecEventFormatted" + hecEventFormatted + ", " - + "trackData: " + trackData + "," - + "headerSupport:" + headerSupport + "," - + "headerCustom:" + headerCustom + "," - + "headerIndex:" + headerIndex + "," - + "headerSource:" + headerSource + "," - + "headerSourcetype:" + headerSourcetype + "," - + "headerHost" + headerHost; + + "enrichment:" + getString(ENRICHMENT_CONF) + ", " + + "maxBatchSize:" + maxBatchSize + ", " + + "numberOfThreads:" + numberOfThreads + ", " + + "lineBreaker:" + lineBreaker + ", " + + "maxOutstandingEvents:" + maxOutstandingEvents + ", " + + "maxRetries:" + maxRetries + ", " + + "useRecordTimestamp:" + useRecordTimestamp + ", " + + "hecEventFormatted:" + hecEventFormatted + ", " + + "trackData:" + trackData + ", " + + "headerSupport:" + headerSupport + ", " + + "headerCustom:" + headerCustom + ", " + + "headerIndex:" + headerIndex + ", " + + "headerSource:" + headerSource + ", " + + "headerSourcetype:" + headerSourcetype + ", " + + "headerHost:" + headerHost; } private static String[] split(String data, String sep) { From e7759a21b835823b25d316af2cdf6d7b7109dd29 Mon Sep 17 00:00:00 2001 From: Don Tregonning Date: Wed, 1 May 2019 11:47:07 -0700 Subject: [PATCH 2/3] Added warning for resend of batch and added UUID to batch --- README.md | 2 ++ src/main/java/com/splunk/hecclient/EventBatch.java | 6 ++++++ .../com/splunk/kafka/connect/SplunkSinkTask.java | 12 +++++++----- .../com/splunk/kafka/connect/SplunkSinkTaskTest.java | 1 + 4 files changed, 16 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 41d88cec..7d44dae6 100644 --- a/README.md +++ b/README.md @@ -149,6 +149,8 @@ Use the below schema to configure Splunk Connect for Kafka | `splunk.hec.ssl.trust.store.path` | Location of Java KeyStore. |`""`| | `splunk.hec.ssl.trust.store.password` | Password for Java KeyStore. |`""`| | `splunk.hec.json.event.formatted` | Set to `true` for events that are already in HEC format. Valid settings are `true` or `false`. |`false`| +| `splunk.hec.max.outstanding.events` | Maximum amount of un-acknowledged events kept in memory by connector. Will trigger back-pressure event to slow down collection if reached. | `1000000` | +| `splunk.hec.max.retries` | Amount of times a failed batch will attempt to resend before dropping events completely. Warning: This will result in data loss, default is `-1` which will retry indefinitely | `-1` | ### Acknowledgement Parameters #### Use Ack | Name | Description | Default Value | diff --git a/src/main/java/com/splunk/hecclient/EventBatch.java b/src/main/java/com/splunk/hecclient/EventBatch.java index c18feae8..91ef550c 100644 --- a/src/main/java/com/splunk/hecclient/EventBatch.java +++ b/src/main/java/com/splunk/hecclient/EventBatch.java @@ -24,14 +24,18 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.SequenceInputStream; + import java.util.ArrayList; import java.util.Enumeration; import java.util.List; import java.util.Map; +import java.util.UUID; public abstract class EventBatch { private static Logger log = LoggerFactory.getLogger(EventBatch.class); + private UUID batchUUID = UUID.randomUUID(); + private static final int INIT = 0; private static final int COMMITTED = 1; private static final int FAILED = 2; @@ -103,6 +107,8 @@ public final List getEvents() { return events; } + public final String getUUID() {return batchUUID.toString(); } + // Total length of data for all events public final int length() { return len; diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java index 9b2c4dae..52b7e154 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java @@ -114,22 +114,24 @@ private void handleFailedBatches() { return; } - log.debug("going to handle {} failed batches", failed.size()); + log.debug("handling {} failed batches", failed.size()); long failedEvents = 0; // if there are failed ones, first deal with them for (final EventBatch batch: failed) { failedEvents += batch.size(); if (connectorConfig.maxRetries > 0 && batch.getFailureCount() > connectorConfig.maxRetries) { - log.error("dropping EventBatch with {} events in it since it reaches max retries {}", - batch.size(), connectorConfig.maxRetries); + log.error("dropping EventBatch {} with {} events after reaching maximum retries {}", + batch.getUUID(), batch.size(), connectorConfig.maxRetries); continue; } + log.warn("attempting to resend batch {} with {} events, this is attempt {} out of {} for this batch ", + batch.getUUID(), batch.size(), batch.getFailureCount(), connectorConfig.maxRetries); send(batch); } log.info("handled {} failed batches with {} events", failed.size(), failedEvents); if (failedEvents * 10 > connectorConfig.maxOutstandingEvents) { - String msg = String.format("failed events reach 10 %% of max outstanding events %d, pause the pull for a while", connectorConfig.maxOutstandingEvents); + String msg = String.format("failed events have reached 10 %% of max outstanding events %d, pausing the pull of events for a while", connectorConfig.maxOutstandingEvents); throw new RetriableException(new HecException(msg)); } } @@ -258,7 +260,7 @@ private void send(final EventBatch batch) { } catch (Exception ex) { batch.fail(); onEventFailure(Arrays.asList(batch), ex); - log.error("failed to send batch", ex); + log.error("failed to send batch {}" ,batch.getUUID(), ex); } } diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java index 28773d50..bf49b77a 100644 --- a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java +++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java @@ -176,6 +176,7 @@ public void putWithMaxEvents() { task.stop(); } + @Test public void putWithEmptyRecords() { UnitUtil uu = new UnitUtil(0); From 15b673dc84c512c466c12d4d8f86d36b17bf59e4 Mon Sep 17 00:00:00 2001 From: Donald Tregonning Date: Wed, 1 May 2019 12:45:32 -0700 Subject: [PATCH 3/3] Update SplunkSinkTaskTest.java removed extra line --- src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java index bf49b77a..28773d50 100644 --- a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java +++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java @@ -176,7 +176,6 @@ public void putWithMaxEvents() { task.stop(); } - @Test public void putWithEmptyRecords() { UnitUtil uu = new UnitUtil(0);