Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ Use the below schema to configure Splunk Connect for Kafka
| `splunk.hec.ssl.trust.store.path` | Location of Java KeyStore. |`""`|
| `splunk.hec.ssl.trust.store.password` | Password for Java KeyStore. |`""`|
| `splunk.hec.json.event.formatted` | Set to `true` for events that are already in HEC format. Valid settings are `true` or `false`. |`false`|
| `splunk.hec.max.outstanding.events` | Maximum amount of un-acknowledged events kept in memory by connector. Will trigger back-pressure event to slow down collection if reached. | `1000000` |
| `splunk.hec.max.retries` | Amount of times a failed batch will attempt to resend before dropping events completely. Warning: This will result in data loss, default is `-1` which will retry indefinitely | `-1` |
### Acknowledgement Parameters
#### Use Ack
| Name | Description | Default Value |
Expand Down
2 changes: 1 addition & 1 deletion dependency-reduced-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -210,4 +210,4 @@
<maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version>
</properties>
</project>
</project>
6 changes: 6 additions & 0 deletions src/main/java/com/splunk/hecclient/EventBatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.io.SequenceInputStream;

import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
import java.util.UUID;

public abstract class EventBatch {
private static Logger log = LoggerFactory.getLogger(EventBatch.class);

private UUID batchUUID = UUID.randomUUID();

private static final int INIT = 0;
private static final int COMMITTED = 1;
private static final int FAILED = 2;
Expand Down Expand Up @@ -103,6 +107,8 @@ public final List<Event> getEvents() {
return events;
}

public final String getUUID() {return batchUUID.toString(); }

// Total length of data for all events
public final int length() {
return len;
Expand Down
12 changes: 7 additions & 5 deletions src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,22 +126,24 @@ private void handleFailedBatches() {
return;
}

log.debug("going to handle {} failed batches", failed.size());
log.debug("handling {} failed batches", failed.size());
long failedEvents = 0;
// if there are failed ones, first deal with them
for (final EventBatch batch: failed) {
failedEvents += batch.size();
if (connectorConfig.maxRetries > 0 && batch.getFailureCount() > connectorConfig.maxRetries) {
log.error("dropping EventBatch with {} events in it since it reaches max retries {}",
batch.size(), connectorConfig.maxRetries);
log.error("dropping EventBatch {} with {} events after reaching maximum retries {}",
batch.getUUID(), batch.size(), connectorConfig.maxRetries);
continue;
}
log.warn("attempting to resend batch {} with {} events, this is attempt {} out of {} for this batch ",
batch.getUUID(), batch.size(), batch.getFailureCount(), connectorConfig.maxRetries);
send(batch);
}

log.info("handled {} failed batches with {} events", failed.size(), failedEvents);
if (failedEvents * 10 > connectorConfig.maxOutstandingEvents) {
String msg = String.format("failed events reach 10 %% of max outstanding events %d, pause the pull for a while", connectorConfig.maxOutstandingEvents);
String msg = String.format("failed events have reached 10 %% of max outstanding events %d, pausing the pull of events for a while", connectorConfig.maxOutstandingEvents);
throw new RetriableException(new HecException(msg));
}
}
Expand Down Expand Up @@ -284,7 +286,7 @@ private void send(final EventBatch batch) {
} catch (Exception ex) {
batch.fail();
onEventFailure(Arrays.asList(batch), ex);
log.error("failed to send batch", ex);
log.error("failed to send batch {}" ,batch.getUUID(), ex);
}
}

Expand Down