Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci_build_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ jobs:
kafka_package: "kafka_2.13-2.7.1.tgz"
- kafka_version: "2.8.0"
kafka_package: "kafka_2.13-2.8.0.tgz"
- kafka_version: "3.0.0"
kafka_package: "kafka_2.13-3.0.0.tgz"
env:
CI_SPLUNK_VERSION: "8.2.2"
CI_SPLUNK_FILENAME: splunk-8.2.2-87344edfcdb4-Linux-x86_64.tgz
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ Use the below schema to configure Splunk Connect for Kafka
| `splunk.hec.max.retries` | Amount of times a failed batch will attempt to resend before dropping events completely. Warning: This will result in data loss, default is `-1` which will retry indefinitely | `-1` |
| `splunk.hec.backoff.threshhold.seconds` | The amount of time Splunk Connect for Kafka waits to attempt resending after errors from a HEC endpoint." | `60` |
| `splunk.hec.lb.poll.interval` | Specify this parameter(in seconds) to control the polling interval(increase to do less polling, decrease to do more frequent polling) | `120` |
| `splunk.hec.enable.compression` | Valid settings are true or false. Used for enable or disable gzip-compression. |`false`|
### Acknowledgement Parameters
#### Use Ack
| Name | Description | Default Value |
Expand Down
52 changes: 42 additions & 10 deletions src/main/java/com/splunk/hecclient/EventBatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,14 @@

import org.apache.http.HttpEntity;
import org.apache.http.entity.AbstractHttpEntity;
import org.apache.http.entity.ContentProducer;
import org.apache.http.entity.EntityTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.SequenceInputStream;

import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.io.*;
import java.util.*;
import java.util.zip.GZIPOutputStream;

public abstract class EventBatch {
private static Logger log = LoggerFactory.getLogger(EventBatch.class);
Expand All @@ -42,6 +37,7 @@ public abstract class EventBatch {

private volatile int status = INIT;
private int failureCount = 0;
private boolean enableCompression;
private long sendTimestamp = System.currentTimeMillis() / 1000; // in seconds
protected int len;
protected List<Event> events = new ArrayList<>();
Expand Down Expand Up @@ -129,6 +125,13 @@ public final HttpEntity getHttpEntity() {
return e;
}

public final HttpEntity getHttpEntityTemplate() {
AbstractHttpEntity e = new EntityTemplate(new GzipDataContentProducer());
e.setContentEncoding("gzip");
e.setContentType(getContentType());
return e;
}

@Override
public final String toString() {
StringBuilder builder = new StringBuilder();
Expand All @@ -141,6 +144,35 @@ public final String toString() {
return builder.toString();
}

public boolean isEnableCompression() {
return enableCompression;
}

public void setEnableCompression(boolean enableCompression) {
this.enableCompression = enableCompression;
}

public final byte[] getDataOfBatch() throws IOException {
try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
for (final Event e : events) {
e.writeTo(bos);
}
byte[] unCompressBytes = bos.toByteArray();
return unCompressBytes;
}
}

private class GzipDataContentProducer implements ContentProducer {

@Override
public void writeTo(OutputStream outputStream) throws IOException {
OutputStream out = new GZIPOutputStream(outputStream);
out.write(getDataOfBatch());
out.flush();
out.close();
}
}

private class HttpEventBatchEntity extends AbstractHttpEntity {
@Override
public boolean isRepeatable() {
Expand Down
17 changes: 10 additions & 7 deletions src/main/java/com/splunk/hecclient/Indexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,20 @@
*/
package com.splunk.hecclient;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.splunk.kafka.connect.VersionUtils;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.message.BasicHeader;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.JsonNode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -121,8 +120,12 @@ public boolean send(final EventBatch batch) {
String url = baseUrl + endpoint;
final HttpPost httpPost = new HttpPost(url);
httpPost.setHeaders(headers);
httpPost.setEntity(batch.getHttpEntity());

if (batch.isEnableCompression()) {
httpPost.setHeader("Content-Encoding", "gzip");
httpPost.setEntity(batch.getHttpEntityTemplate());
} else {
httpPost.setEntity(batch.getHttpEntity());
}
String resp;
try {
resp = executeHttpRequest(httpPost);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@
package com.splunk.kafka.connect;

import com.splunk.hecclient.HecConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkTask;

import java.util.*;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public final class SplunkSinkConnectorConfig extends AbstractConfig {
// General
Expand All @@ -45,6 +48,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
static final String HEC_THREDS_CONF = "splunk.hec.threads";
static final String SOCKET_TIMEOUT_CONF = "splunk.hec.socket.timeout"; // seconds
static final String SSL_VALIDATE_CERTIFICATES_CONF = "splunk.hec.ssl.validate.certs";
static final String ENABLE_COMPRESSSION_CONF = "splunk.hec.enable.compression";
// Acknowledgement Parameters
// Use Ack
static final String ACK_CONF = "splunk.hec.ack.enabled";
Expand Down Expand Up @@ -108,6 +112,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
+ "Socket timeout.By default, this is set to 60 seconds.";
static final String SSL_VALIDATE_CERTIFICATES_DOC = "Valid settings are true or false. Enables or disables HTTPS "
+ "certification validation. By default, this is set to true.";
static final String ENABLE_COMPRESSSION_DOC = "Valid settings are true or false. Used for enable or disable gzip-compression. By default, this is set to false.";
// Acknowledgement Parameters
// Use Ack
static final String ACK_DOC = "Valid settings are true or false. When set to true Splunk Connect for Kafka will "
Expand Down Expand Up @@ -189,6 +194,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
final int numberOfThreads;
final int socketTimeout;
final boolean validateCertificates;
final boolean enableCompression;
final int lbPollInterval;

final boolean ack;
Expand Down Expand Up @@ -259,6 +265,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
headerSource = getString(HEADER_SOURCE_CONF);
headerSourcetype = getString(HEADER_SOURCETYPE_CONF);
headerHost = getString(HEADER_HOST_CONF);
enableCompression = getBoolean(ENABLE_COMPRESSSION_CONF);
}

public static ConfigDef conf() {
Expand Down Expand Up @@ -297,7 +304,8 @@ public static ConfigDef conf() {
.define(HEADER_SOURCE_CONF, ConfigDef.Type.STRING, "splunk.header.source", ConfigDef.Importance.MEDIUM, HEADER_SOURCE_DOC)
.define(HEADER_SOURCETYPE_CONF, ConfigDef.Type.STRING, "splunk.header.sourcetype", ConfigDef.Importance.MEDIUM, HEADER_SOURCETYPE_DOC)
.define(HEADER_HOST_CONF, ConfigDef.Type.STRING, "splunk.header.host", ConfigDef.Importance.MEDIUM, HEADER_HOST_DOC)
.define(LB_POLL_INTERVAL_CONF, ConfigDef.Type.INT, 120, ConfigDef.Importance.LOW, LB_POLL_INTERVAL_DOC);
.define(LB_POLL_INTERVAL_CONF, ConfigDef.Type.INT, 120, ConfigDef.Importance.LOW, LB_POLL_INTERVAL_DOC)
.define(ENABLE_COMPRESSSION_CONF, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.MEDIUM, ENABLE_COMPRESSSION_DOC);
}

/**
Expand Down Expand Up @@ -362,6 +370,7 @@ public String toString() {
+ "headerSource:" + headerSource + ", "
+ "headerSourcetype:" + headerSourcetype + ", "
+ "headerHost:" + headerHost + ", "
+ "enableCompression:" + enableCompression + ", "
+ "lbPollInterval:" + lbPollInterval;
}

Expand Down
5 changes: 5 additions & 0 deletions src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,12 @@ private void handleRaw(final Collection<SinkRecord> records) {
Map<TopicPartition, Collection<SinkRecord>> partitionedRecords = partitionRecords(records);
for (Map.Entry<TopicPartition, Collection<SinkRecord>> entry: partitionedRecords.entrySet()) {
EventBatch batch = createRawEventBatch(entry.getKey());
batch.setEnableCompression(connectorConfig.enableCompression);
sendEvents(entry.getValue(), batch);
}
} else {
EventBatch batch = createRawEventBatch(null);
batch.setEnableCompression(connectorConfig.enableCompression);
sendEvents(records, batch);
}
}
Expand All @@ -193,6 +195,7 @@ private void handleRecordsWithHeader(final Collection<SinkRecord> records) {
ArrayList<SinkRecord> recordArrayList = set.getValue();

EventBatch batch = createRawHeaderEventBatch(splunkSinkRecordKey);
batch.setEnableCompression(connectorConfig.enableCompression);
sendEvents(recordArrayList, batch);
}
log.debug("{} records have been bucketed in to {} batches", records.size(), recordsWithSameHeaders.size());
Expand Down Expand Up @@ -260,6 +263,7 @@ public String insertHeaderToken() {

private void handleEvent(final Collection<SinkRecord> records) {
EventBatch batch = new JsonEventBatch();
batch.setEnableCompression(connectorConfig.enableCompression);
sendEvents(records, batch);
}

Expand All @@ -283,6 +287,7 @@ private void sendEvents(final Collection<SinkRecord> records, EventBatch batch)
send(batch);
// start a new batch after send
batch = batch.createFromThis();
batch.setEnableCompression(connectorConfig.enableCompression);
}
}

Expand Down
44 changes: 44 additions & 0 deletions src/test/java/com/splunk/hecclient/IndexerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,4 +194,48 @@ private Indexer assertFailure(CloseableHttpClient client) {
Assert.assertEquals(indexer.getChannel(), poller.getChannel());
return indexer;
}

@Test
public void sendCompressedBatchWithSuccess() {
for (int i = 0; i < 2; i++) {
CloseableHttpClientMock client = new CloseableHttpClientMock();
if (i == 0) {
client.setResponse(CloseableHttpClientMock.success);
}
PollerMock poller = new PollerMock();

Indexer indexer = new Indexer(baseUrl, token, client, poller);
EventBatch batch = UnitUtil.createBatch();
batch.setEnableCompression(true);
boolean result = indexer.send(batch);
Assert.assertTrue(result);
Assert.assertNotNull(poller.getBatch());
Assert.assertNull(poller.getFailedBatch());
Assert.assertNull(poller.getException());
Assert.assertEquals(indexer.getChannel(), poller.getChannel());
Assert.assertEquals(CloseableHttpClientMock.success, poller.getResponse());
}
}

@Test
public void sendCompressedRawBatchWithSuccess() {
for (int i = 0; i < 2; i++) {
CloseableHttpClientMock client = new CloseableHttpClientMock();
if (i == 0) {
client.setResponse(CloseableHttpClientMock.success);
}
PollerMock poller = new PollerMock();

Indexer indexer = new Indexer(baseUrl, token, client, poller);
EventBatch batch = UnitUtil.createRawEventBatch();
batch.setEnableCompression(true);
boolean result = indexer.send(batch);
Assert.assertTrue(result);
Assert.assertNotNull(poller.getBatch());
Assert.assertNull(poller.getFailedBatch());
Assert.assertNull(poller.getException());
Assert.assertEquals(indexer.getChannel(), poller.getChannel());
Assert.assertEquals(CloseableHttpClientMock.success, poller.getResponse());
}
}
}
30 changes: 29 additions & 1 deletion src/test/java/com/splunk/hecclient/JsonEvenBatchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
import org.junit.Assert;
import org.junit.Test;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;

public class JsonEvenBatchTest {
@Test
Expand Down Expand Up @@ -177,4 +178,31 @@ private int readContent(final HttpEntity entity, byte[] data) {

return UnitUtil.read(in, data);
}

@Test
public void testGZIPCompressionForJsonEvent() {
EventBatch batch = new JsonEventBatch();
batch.setEnableCompression(true);
Assert.assertTrue(batch.isEnableCompression());
Event event = new JsonEvent("hello world! hello world! hello world!", "hao");
batch.add(event);
HttpEntity entity = batch.getHttpEntityTemplate();
byte[] data = new byte[1024];
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
entity.writeTo(out);
String expected = "{\"event\":\"hello world! hello world! hello world!\"}\n";
ByteArrayInputStream bis = new ByteArrayInputStream(out.toByteArray());
GZIPInputStream gis = new GZIPInputStream(bis);
int read = gis.read(data, 0, data.length);
gis.close();
bis.close();

// Decode the bytes into a String
String ori = new String(data, 0, read, "UTF-8");
Assert.assertEquals(expected, ori);
} catch (IOException ex) {
Assert.assertTrue("failed to compress and decompress the data", false);
throw new HecException("failed to compress and decompress the data", ex);
}
}
}
32 changes: 32 additions & 0 deletions src/test/java/com/splunk/hecclient/RawEventBatchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@
*/
package com.splunk.hecclient;

import org.apache.http.HttpEntity;
import org.junit.Assert;
import org.junit.Test;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.zip.GZIPInputStream;

public class RawEventBatchTest {
@Test
Expand Down Expand Up @@ -122,4 +127,31 @@ public void checkEquals() {

Assert.assertFalse(batchOne.equals(batchTwo));
}

@Test
public void testGZIPCompressionForRaw() {
EventBatch batch = RawEventBatch.factory().build();
batch.setEnableCompression(true);
Assert.assertTrue(batch.isEnableCompression());
Event event = new RawEvent("hello world! hello world! hello world!", null);
batch.add(event);
HttpEntity entity = batch.getHttpEntityTemplate();
byte[] data = new byte[1024];
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
entity.writeTo(out);
String expected = "hello world! hello world! hello world!";
ByteArrayInputStream bis = new ByteArrayInputStream(out.toByteArray());
GZIPInputStream gis = new GZIPInputStream(bis);
int read = gis.read(data, 0, data.length);
gis.close();
bis.close();

// Decode the bytes into a String
String ori = new String(data, 0, read, "UTF-8");
Assert.assertEquals(expected, ori);
} catch (IOException ex) {
Assert.assertTrue("failed to compress and decompress the data", false);
throw new HecException("failed to compress and decompress the data", ex);
}
}
}
Loading