From 547d8b44b428d682eba96028d4633d2da4d4418b Mon Sep 17 00:00:00 2001 From: ankit-splunk Date: Fri, 12 Nov 2021 17:28:30 +0530 Subject: [PATCH 1/4] Added GZIP compression support --- README.md | 1 + .../java/com/splunk/hecclient/EventBatch.java | 59 +++++++++++++++---- .../java/com/splunk/hecclient/Indexer.java | 17 +++--- .../connect/SplunkSinkConnectorConfig.java | 19 ++++-- .../splunk/kafka/connect/SplunkSinkTask.java | 5 ++ .../com/splunk/hecclient/IndexerTest.java | 22 +++++++ .../splunk/hecclient/JsonEvenBatchTest.java | 30 +++++++++- .../splunk/hecclient/RawEventBatchTest.java | 32 ++++++++++ 8 files changed, 160 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 39c92d5f..ace8df16 100644 --- a/README.md +++ b/README.md @@ -164,6 +164,7 @@ Use the below schema to configure Splunk Connect for Kafka | `splunk.hec.max.retries` | Amount of times a failed batch will attempt to resend before dropping events completely. Warning: This will result in data loss, default is `-1` which will retry indefinitely | `-1` | | `splunk.hec.backoff.threshhold.seconds` | The amount of time Splunk Connect for Kafka waits to attempt resending after errors from a HEC endpoint." | `60` | | `splunk.hec.lb.poll.interval` | Specify this parameter(in seconds) to control the polling interval(increase to do less polling, decrease to do more frequent polling) | `120` | +| `splunk.hec.enable.compression` | Valid settings are true or false. Used for enable or disable gzip-compression. |`false`| ### Acknowledgement Parameters #### Use Ack | Name | Description | Default Value | diff --git a/src/main/java/com/splunk/hecclient/EventBatch.java b/src/main/java/com/splunk/hecclient/EventBatch.java index 91ef550c..2d747a35 100644 --- a/src/main/java/com/splunk/hecclient/EventBatch.java +++ b/src/main/java/com/splunk/hecclient/EventBatch.java @@ -17,19 +17,14 @@ import org.apache.http.HttpEntity; import org.apache.http.entity.AbstractHttpEntity; +import org.apache.http.entity.ContentProducer; +import org.apache.http.entity.EntityTemplate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.SequenceInputStream; - -import java.util.ArrayList; -import java.util.Enumeration; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.io.*; +import java.util.*; +import java.util.zip.GZIPOutputStream; public abstract class EventBatch { private static Logger log = LoggerFactory.getLogger(EventBatch.class); @@ -42,19 +37,23 @@ public abstract class EventBatch { private volatile int status = INIT; private int failureCount = 0; + private boolean enableCompression; private long sendTimestamp = System.currentTimeMillis() / 1000; // in seconds protected int len; protected List events = new ArrayList<>(); public abstract String getRestEndpoint(); + public abstract String getContentType(); + public abstract void add(Event event); + public abstract EventBatch createFromThis(); public final void addExtraFields(final Map fields) { // recalculate the batch length since we inject more meta data to each event int newLength = 0; - for (final Event event: events) { + for (final Event event : events) { event.addFields(fields); newLength += event.length(); } @@ -129,11 +128,18 @@ public final HttpEntity getHttpEntity() { return e; } + public final HttpEntity getHttpEntityTemplate() { + AbstractHttpEntity e = new EntityTemplate(new GzipDataContentProducer()); + e.setContentEncoding("gzip"); + e.setContentType(getContentType()); + return e; + } + @Override public final String toString() { StringBuilder builder = new StringBuilder(); builder.append("["); - for (Event e: events) { + for (Event e : events) { builder.append(e.toString()); builder.append(","); } @@ -141,6 +147,35 @@ public final String toString() { return builder.toString(); } + public boolean isEnableCompression() { + return enableCompression; + } + + public void setEnableCompression(boolean enableCompression) { + this.enableCompression = enableCompression; + } + + public final byte[] getDataOfBatch() throws IOException { + try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) { + for (final Event e : events) { + e.writeTo(bos); + } + byte[] unCompressBytes = bos.toByteArray(); + return unCompressBytes; + } + } + + private class GzipDataContentProducer implements ContentProducer { + + @Override + public void writeTo(OutputStream outputStream) throws IOException { + OutputStream out = new GZIPOutputStream(outputStream); + out.write(getDataOfBatch()); + out.flush(); + out.close(); + } + } + private class HttpEventBatchEntity extends AbstractHttpEntity { @Override public boolean isRepeatable() { diff --git a/src/main/java/com/splunk/hecclient/Indexer.java b/src/main/java/com/splunk/hecclient/Indexer.java index fa5d418c..2c538431 100644 --- a/src/main/java/com/splunk/hecclient/Indexer.java +++ b/src/main/java/com/splunk/hecclient/Indexer.java @@ -15,21 +15,20 @@ */ package com.splunk.hecclient; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.splunk.kafka.connect.VersionUtils; import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.client.methods.HttpPost; import org.apache.http.message.BasicHeader; import org.apache.http.protocol.HttpContext; import org.apache.http.util.EntityUtils; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.fasterxml.jackson.databind.JsonNode; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -121,8 +120,12 @@ public boolean send(final EventBatch batch) { String url = baseUrl + endpoint; final HttpPost httpPost = new HttpPost(url); httpPost.setHeaders(headers); - httpPost.setEntity(batch.getHttpEntity()); - + if (batch.isEnableCompression()) { + httpPost.setHeader("Content-Encoding", "gzip"); + httpPost.setEntity(batch.getHttpEntityTemplate()); + } else { + httpPost.setEntity(batch.getHttpEntity()); + } String resp; try { resp = executeHttpRequest(httpPost); diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java index 62af3643..585e07ce 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java @@ -16,14 +16,17 @@ package com.splunk.kafka.connect; import com.splunk.hecclient.HecConfig; -import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.commons.lang3.StringUtils; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; -import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.sink.SinkTask; -import java.util.*; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; public final class SplunkSinkConnectorConfig extends AbstractConfig { // General @@ -45,6 +48,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { static final String HEC_THREDS_CONF = "splunk.hec.threads"; static final String SOCKET_TIMEOUT_CONF = "splunk.hec.socket.timeout"; // seconds static final String SSL_VALIDATE_CERTIFICATES_CONF = "splunk.hec.ssl.validate.certs"; + static final String ENABLE_COMPRESSSION_CONF = "splunk.hec.enable.compression"; // Acknowledgement Parameters // Use Ack static final String ACK_CONF = "splunk.hec.ack.enabled"; @@ -108,6 +112,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { + "Socket timeout.By default, this is set to 60 seconds."; static final String SSL_VALIDATE_CERTIFICATES_DOC = "Valid settings are true or false. Enables or disables HTTPS " + "certification validation. By default, this is set to true."; + static final String ENABLE_COMPRESSSION_DOC = "Valid settings are true or false. Used for enable or disable gzip-compression. By default, this is set to false."; // Acknowledgement Parameters // Use Ack static final String ACK_DOC = "Valid settings are true or false. When set to true Splunk Connect for Kafka will " @@ -189,6 +194,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { final int numberOfThreads; final int socketTimeout; final boolean validateCertificates; + final boolean enableCompression; final int lbPollInterval; final boolean ack; @@ -259,6 +265,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { headerSource = getString(HEADER_SOURCE_CONF); headerSourcetype = getString(HEADER_SOURCETYPE_CONF); headerHost = getString(HEADER_HOST_CONF); + enableCompression = getBoolean(ENABLE_COMPRESSSION_CONF); } public static ConfigDef conf() { @@ -297,7 +304,8 @@ public static ConfigDef conf() { .define(HEADER_SOURCE_CONF, ConfigDef.Type.STRING, "splunk.header.source", ConfigDef.Importance.MEDIUM, HEADER_SOURCE_DOC) .define(HEADER_SOURCETYPE_CONF, ConfigDef.Type.STRING, "splunk.header.sourcetype", ConfigDef.Importance.MEDIUM, HEADER_SOURCETYPE_DOC) .define(HEADER_HOST_CONF, ConfigDef.Type.STRING, "splunk.header.host", ConfigDef.Importance.MEDIUM, HEADER_HOST_DOC) - .define(LB_POLL_INTERVAL_CONF, ConfigDef.Type.INT, 120, ConfigDef.Importance.LOW, LB_POLL_INTERVAL_DOC); + .define(LB_POLL_INTERVAL_CONF, ConfigDef.Type.INT, 120, ConfigDef.Importance.LOW, LB_POLL_INTERVAL_DOC) + .define(ENABLE_COMPRESSSION_CONF, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.MEDIUM, ENABLE_COMPRESSSION_DOC); } /** @@ -362,6 +370,7 @@ public String toString() { + "headerSource:" + headerSource + ", " + "headerSourcetype:" + headerSourcetype + ", " + "headerHost:" + headerHost + ", " + + "enableCompression:" + enableCompression + ", " + "lbPollInterval:" + lbPollInterval; } diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java index a567a881..82307fb5 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java @@ -165,10 +165,12 @@ private void handleRaw(final Collection records) { Map> partitionedRecords = partitionRecords(records); for (Map.Entry> entry: partitionedRecords.entrySet()) { EventBatch batch = createRawEventBatch(entry.getKey()); + batch.setEnableCompression(connectorConfig.enableCompression); sendEvents(entry.getValue(), batch); } } else { EventBatch batch = createRawEventBatch(null); + batch.setEnableCompression(connectorConfig.enableCompression); sendEvents(records, batch); } } @@ -193,6 +195,7 @@ private void handleRecordsWithHeader(final Collection records) { ArrayList recordArrayList = set.getValue(); EventBatch batch = createRawHeaderEventBatch(splunkSinkRecordKey); + batch.setEnableCompression(connectorConfig.enableCompression); sendEvents(recordArrayList, batch); } log.debug("{} records have been bucketed in to {} batches", records.size(), recordsWithSameHeaders.size()); @@ -260,6 +263,7 @@ public String insertHeaderToken() { private void handleEvent(final Collection records) { EventBatch batch = new JsonEventBatch(); + batch.setEnableCompression(connectorConfig.enableCompression); sendEvents(records, batch); } @@ -283,6 +287,7 @@ private void sendEvents(final Collection records, EventBatch batch) send(batch); // start a new batch after send batch = batch.createFromThis(); + batch.setEnableCompression(connectorConfig.enableCompression); } } diff --git a/src/test/java/com/splunk/hecclient/IndexerTest.java b/src/test/java/com/splunk/hecclient/IndexerTest.java index 0f17d2bf..0bda509c 100644 --- a/src/test/java/com/splunk/hecclient/IndexerTest.java +++ b/src/test/java/com/splunk/hecclient/IndexerTest.java @@ -194,4 +194,26 @@ private Indexer assertFailure(CloseableHttpClient client) { Assert.assertEquals(indexer.getChannel(), poller.getChannel()); return indexer; } + + @Test + public void sendCompressedBatchWithSuccess() { + for (int i = 0; i < 2; i++) { + CloseableHttpClientMock client = new CloseableHttpClientMock(); + if (i == 0) { + client.setResponse(CloseableHttpClientMock.success); + } + PollerMock poller = new PollerMock(); + + Indexer indexer = new Indexer(baseUrl, token, client, poller); + EventBatch batch = UnitUtil.createBatch(); + batch.setEnableCompression(true); + boolean result = indexer.send(batch); + Assert.assertTrue(result); + Assert.assertNotNull(poller.getBatch()); + Assert.assertNull(poller.getFailedBatch()); + Assert.assertNull(poller.getException()); + Assert.assertEquals(indexer.getChannel(), poller.getChannel()); + Assert.assertEquals(CloseableHttpClientMock.success, poller.getResponse()); + } + } } diff --git a/src/test/java/com/splunk/hecclient/JsonEvenBatchTest.java b/src/test/java/com/splunk/hecclient/JsonEvenBatchTest.java index 0a8709e5..bffe5c29 100644 --- a/src/test/java/com/splunk/hecclient/JsonEvenBatchTest.java +++ b/src/test/java/com/splunk/hecclient/JsonEvenBatchTest.java @@ -19,13 +19,14 @@ import org.junit.Assert; import org.junit.Test; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; +import java.util.zip.GZIPInputStream; public class JsonEvenBatchTest { @Test @@ -177,4 +178,31 @@ private int readContent(final HttpEntity entity, byte[] data) { return UnitUtil.read(in, data); } + + @Test + public void testGZIPCompressionForJsonEvent() { + EventBatch batch = new JsonEventBatch(); + batch.setEnableCompression(true); + Assert.assertTrue(batch.isEnableCompression()); + Event event = new JsonEvent("hello world! hello world! hello world!", "hao"); + batch.add(event); + HttpEntity entity = batch.getHttpEntityTemplate(); + byte[] data = new byte[1024]; + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + entity.writeTo(out); + String expected = "{\"event\":\"hello world! hello world! hello world!\"}\n"; + ByteArrayInputStream bis = new ByteArrayInputStream(out.toByteArray()); + GZIPInputStream gis = new GZIPInputStream(bis); + int read = gis.read(data, 0, data.length); + gis.close(); + bis.close(); + + // Decode the bytes into a String + String ori = new String(data, 0, read, "UTF-8"); + Assert.assertEquals(expected, ori); + } catch (IOException ex) { + Assert.assertTrue("failed to compress and decompress the data", false); + throw new HecException("failed to compress and decompress the data", ex); + } + } } diff --git a/src/test/java/com/splunk/hecclient/RawEventBatchTest.java b/src/test/java/com/splunk/hecclient/RawEventBatchTest.java index 495fd8d3..dfd528a4 100644 --- a/src/test/java/com/splunk/hecclient/RawEventBatchTest.java +++ b/src/test/java/com/splunk/hecclient/RawEventBatchTest.java @@ -15,10 +15,15 @@ */ package com.splunk.hecclient; +import org.apache.http.HttpEntity; import org.junit.Assert; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.util.List; +import java.util.zip.GZIPInputStream; public class RawEventBatchTest { @Test @@ -122,4 +127,31 @@ public void checkEquals() { Assert.assertFalse(batchOne.equals(batchTwo)); } + + @Test + public void testGZIPCompressionForRaw() { + EventBatch batch = RawEventBatch.factory().build(); + batch.setEnableCompression(true); + Assert.assertTrue(batch.isEnableCompression()); + Event event = new RawEvent("hello world! hello world! hello world!", null); + batch.add(event); + HttpEntity entity = batch.getHttpEntityTemplate(); + byte[] data = new byte[1024]; + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + entity.writeTo(out); + String expected = "hello world! hello world! hello world!"; + ByteArrayInputStream bis = new ByteArrayInputStream(out.toByteArray()); + GZIPInputStream gis = new GZIPInputStream(bis); + int read = gis.read(data, 0, data.length); + gis.close(); + bis.close(); + + // Decode the bytes into a String + String ori = new String(data, 0, read, "UTF-8"); + Assert.assertEquals(expected, ori); + } catch (IOException ex) { + Assert.assertTrue("failed to compress and decompress the data", false); + throw new HecException("failed to compress and decompress the data", ex); + } + } } From 1f0a3003ea519bc9a9d8692cd1657e5977a7b6d8 Mon Sep 17 00:00:00 2001 From: ankit-splunk Date: Mon, 15 Nov 2021 19:27:41 +0530 Subject: [PATCH 2/4] Change pyyaml version to 5.4.1 --- test/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/requirements.txt b/test/requirements.txt index 71cfebe6..bcbd5857 100644 --- a/test/requirements.txt +++ b/test/requirements.txt @@ -1,6 +1,6 @@ pytest requests kafka-python -pyyaml +pyyaml == 5.4.1 jinja2 jsonpath \ No newline at end of file From d2f191627aae9a83d08bfc8d340d92e5b37cdec4 Mon Sep 17 00:00:00 2001 From: ankit-splunk Date: Tue, 16 Nov 2021 10:32:42 +0530 Subject: [PATCH 3/4] Added kafka version 3.0.0 in CI pipeline --- .github/workflows/ci_build_test.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/ci_build_test.yaml b/.github/workflows/ci_build_test.yaml index 0bfcd3e1..25062900 100644 --- a/.github/workflows/ci_build_test.yaml +++ b/.github/workflows/ci_build_test.yaml @@ -57,6 +57,8 @@ jobs: kafka_package: "kafka_2.13-2.7.1.tgz" - kafka_version: "2.8.0" kafka_package: "kafka_2.13-2.8.0.tgz" + - kafka_version: "3.0.0" + kafka_package: "kafka_2.13-3.0.0.tgz" env: CI_SPLUNK_VERSION: "8.2.2" CI_SPLUNK_FILENAME: splunk-8.2.2-87344edfcdb4-Linux-x86_64.tgz From 93f8330cd974cc4c18bfcc7b04b8945e3048e9ab Mon Sep 17 00:00:00 2001 From: ankit-splunk Date: Fri, 19 Nov 2021 15:41:34 +0530 Subject: [PATCH 4/4] code reformating and added test case --- .../java/com/splunk/hecclient/EventBatch.java | 7 ++---- .../com/splunk/hecclient/IndexerTest.java | 22 +++++++++++++++++++ .../java/com/splunk/hecclient/UnitUtil.java | 7 ++++++ 3 files changed, 31 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/splunk/hecclient/EventBatch.java b/src/main/java/com/splunk/hecclient/EventBatch.java index 2d747a35..352108f6 100644 --- a/src/main/java/com/splunk/hecclient/EventBatch.java +++ b/src/main/java/com/splunk/hecclient/EventBatch.java @@ -43,17 +43,14 @@ public abstract class EventBatch { protected List events = new ArrayList<>(); public abstract String getRestEndpoint(); - public abstract String getContentType(); - public abstract void add(Event event); - public abstract EventBatch createFromThis(); public final void addExtraFields(final Map fields) { // recalculate the batch length since we inject more meta data to each event int newLength = 0; - for (final Event event : events) { + for (final Event event: events) { event.addFields(fields); newLength += event.length(); } @@ -139,7 +136,7 @@ public final HttpEntity getHttpEntityTemplate() { public final String toString() { StringBuilder builder = new StringBuilder(); builder.append("["); - for (Event e : events) { + for (Event e: events) { builder.append(e.toString()); builder.append(","); } diff --git a/src/test/java/com/splunk/hecclient/IndexerTest.java b/src/test/java/com/splunk/hecclient/IndexerTest.java index 0bda509c..ff61d6a1 100644 --- a/src/test/java/com/splunk/hecclient/IndexerTest.java +++ b/src/test/java/com/splunk/hecclient/IndexerTest.java @@ -216,4 +216,26 @@ public void sendCompressedBatchWithSuccess() { Assert.assertEquals(CloseableHttpClientMock.success, poller.getResponse()); } } + + @Test + public void sendCompressedRawBatchWithSuccess() { + for (int i = 0; i < 2; i++) { + CloseableHttpClientMock client = new CloseableHttpClientMock(); + if (i == 0) { + client.setResponse(CloseableHttpClientMock.success); + } + PollerMock poller = new PollerMock(); + + Indexer indexer = new Indexer(baseUrl, token, client, poller); + EventBatch batch = UnitUtil.createRawEventBatch(); + batch.setEnableCompression(true); + boolean result = indexer.send(batch); + Assert.assertTrue(result); + Assert.assertNotNull(poller.getBatch()); + Assert.assertNull(poller.getFailedBatch()); + Assert.assertNull(poller.getException()); + Assert.assertEquals(indexer.getChannel(), poller.getChannel()); + Assert.assertEquals(CloseableHttpClientMock.success, poller.getResponse()); + } + } } diff --git a/src/test/java/com/splunk/hecclient/UnitUtil.java b/src/test/java/com/splunk/hecclient/UnitUtil.java index eff9181f..7a5650c4 100644 --- a/src/test/java/com/splunk/hecclient/UnitUtil.java +++ b/src/test/java/com/splunk/hecclient/UnitUtil.java @@ -34,6 +34,13 @@ public static EventBatch createBatch() { return batch; } + public static EventBatch createRawEventBatch() { + Event event = new RawEvent("ni", "hao"); + EventBatch batch = RawEventBatch.factory().build(); + batch.add(event); + return batch; + } + public static void milliSleep(long milliseconds) { try { TimeUnit.MILLISECONDS.sleep(milliseconds);