diff --git a/dependency-reduced-pom.xml b/dependency-reduced-pom.xml index 953b7ca0..a1442fac 100644 --- a/dependency-reduced-pom.xml +++ b/dependency-reduced-pom.xml @@ -4,7 +4,7 @@ com.github.splunk.kafka.connect splunk-kafka-connect splunk-kafka-connect - v1.0.0-LAR + v1.1.0 @@ -115,36 +115,6 @@ - - com.fasterxml.jackson.core - jackson-core - 2.9.5 - provided - - - com.fasterxml.jackson.core - jackson-databind - 2.9.5 - provided - - - jackson-annotations - com.fasterxml.jackson.core - - - - - org.apache.kafka - connect-api - 1.1.0 - provided - - - kafka-clients - org.apache.kafka - - - org.junit.jupiter junit-jupiter-api @@ -209,42 +179,18 @@ - - commons-logging - commons-logging - 1.2 - provided - - - commons-codec - commons-codec - 1.11 - provided - org.slf4j slf4j-simple 1.7.25 test - - org.slf4j - slf4j-api - 1.7.25 - provided - org.apiguardian apiguardian-api 1.0.0 test - - org.apache.commons - commons-lang3 - 3.7 - provided - diff --git a/kafka-connect-splunk.iml b/kafka-connect-splunk.iml index 63333b0a..2258fa16 100644 --- a/kafka-connect-splunk.iml +++ b/kafka-connect-splunk.iml @@ -12,13 +12,18 @@ - - - - - - - + + + + + + + + + + + + @@ -30,12 +35,12 @@ - - + + - + - + \ No newline at end of file diff --git a/pom.xml b/pom.xml index fc82cdab..77d4b094 100644 --- a/pom.xml +++ b/pom.xml @@ -296,4 +296,4 @@ - + \ No newline at end of file diff --git a/src/changes b/src/changes new file mode 100644 index 00000000..b7a0ebae --- /dev/null +++ b/src/changes @@ -0,0 +1,1225 @@ +From 588e947fe0e9584c62c8549207dae818d03f9281 Mon Sep 17 00:00:00 2001 +From: Vitalii Rudenskyi +Date: Fri, 6 Jul 2018 11:25:07 -0700 +Subject: [PATCH] * config style for topic metadata + +--- + .gitignore | 3 +- + .../java/com/splunk/hecclient/JsonEventBatch.java | 24 + + .../java/com/splunk/hecclient/RawEventBatch.java | 30 +- + .../kafka/connect/SplunkSinkConnectorConfig.java | 692 +++++++++++---------- + .../com/splunk/kafka/connect/SplunkSinkTask.java | 142 +++-- + .../connect/SplunkSinkConnectorConfigTest.java | 30 +- + .../splunk/kafka/connect/SplunkSinkTaskTest.java | 55 +- + .../java/com/splunk/kafka/connect/UnitUtil.java | 6 +- + 8 files changed, 593 insertions(+), 389 deletions(-) + +diff --git a/.gitignore b/.gitignore +index d9ed333..f31ed44 100644 +--- a/.gitignore ++++ b/.gitignore +@@ -27,4 +27,5 @@ target/* + splunk-kafka-connect/ + pom.xml.versionsBackup + .classpath +-.project +\ No newline at end of file ++.project ++/target/ +diff --git a/src/main/java/com/splunk/hecclient/JsonEventBatch.java b/src/main/java/com/splunk/hecclient/JsonEventBatch.java +index 1f7f45a..43f81bd 100644 +--- a/src/main/java/com/splunk/hecclient/JsonEventBatch.java ++++ b/src/main/java/com/splunk/hecclient/JsonEventBatch.java +@@ -15,6 +15,9 @@ + */ + package com.splunk.hecclient; + ++import org.apache.commons.lang3.builder.EqualsBuilder; ++import org.apache.commons.lang3.builder.HashCodeBuilder; ++ + public final class JsonEventBatch extends EventBatch { + public static final String endpoint = "/services/collector/event"; + public static final String contentType = "application/json; profile=urn:splunk:event:1.0; charset=utf-8"; +@@ -43,4 +46,25 @@ public final class JsonEventBatch extends EventBatch { + public EventBatch createFromThis() { + return new JsonEventBatch(); + } ++ ++ ++ @Override ++ public int hashCode() { ++ return new HashCodeBuilder() ++ .append(endpoint) ++ .toHashCode(); ++ } ++ ++ @Override ++ public boolean equals(Object obj) { ++ if (obj instanceof JsonEventBatch) { ++ final JsonEventBatch other = (JsonEventBatch) obj; ++ return new EqualsBuilder() ++ .append(endpoint, other.endpoint) ++ .isEquals(); ++ } else { ++ return false; ++ } ++ } ++ + } +diff --git a/src/main/java/com/splunk/hecclient/RawEventBatch.java b/src/main/java/com/splunk/hecclient/RawEventBatch.java +index a5fbc47..905d25b 100644 +--- a/src/main/java/com/splunk/hecclient/RawEventBatch.java ++++ b/src/main/java/com/splunk/hecclient/RawEventBatch.java +@@ -15,6 +15,8 @@ + */ + package com.splunk.hecclient; + ++import org.apache.commons.lang3.builder.EqualsBuilder; ++import org.apache.commons.lang3.builder.HashCodeBuilder; + import org.apache.http.client.utils.URIBuilder; + + public final class RawEventBatch extends EventBatch { +@@ -87,7 +89,7 @@ public final class RawEventBatch extends EventBatch { + return this; + } + +- public Builder setTime(final int time) { ++ public Builder setTime(final long time) { + this.time = time; + return this; + } +@@ -146,4 +148,30 @@ public final class RawEventBatch extends EventBatch { + params.addParameter(tag, val); + } + } ++ ++ @Override ++ public int hashCode() { ++ return new HashCodeBuilder() ++ .append(index) ++ .append(sourcetype) ++ .append(source) ++ .append(host) ++ .toHashCode(); ++ } ++ ++ @Override ++ public boolean equals(Object obj) { ++ if (obj instanceof RawEventBatch) { ++ final RawEventBatch other = (RawEventBatch) obj; ++ return new EqualsBuilder() ++ .append(index, other.index) ++ .append(sourcetype, other.sourcetype) ++ .append(source, other.source) ++ .append(host, other.host) ++ .isEquals(); ++ } else { ++ return false; ++ } ++ } ++ + } +diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java +index 25d2c1f..10c95af 100644 +--- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java ++++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java +@@ -18,6 +18,7 @@ package com.splunk.kafka.connect; + import com.splunk.hecclient.HecConfig; + import org.apache.kafka.common.config.ConfigException; + import org.apache.kafka.connect.sink.SinkConnector; ++import org.apache.kafka.connect.sink.SinkTask; + import org.apache.kafka.common.config.AbstractConfig; + import org.apache.kafka.common.config.ConfigDef; + import org.apache.commons.lang3.StringUtils; +@@ -25,332 +26,395 @@ import org.apache.commons.lang3.StringUtils; + import java.util.*; + + public final class SplunkSinkConnectorConfig extends AbstractConfig { +- // General +- static final String INDEX = "index"; +- static final String SOURCE = "source"; +- static final String SOURCETYPE = "sourcetype"; +- // Required Parameters +- static final String URI_CONF = "splunk.hec.uri"; +- static final String TOKEN_CONF = "splunk.hec.token"; +- // General Parameters +- static final String INDEX_CONF = "splunk.indexes"; +- static final String SOURCE_CONF = "splunk.sources"; +- static final String SOURCETYPE_CONF = "splunk.sourcetypes"; +- static final String TOTAL_HEC_CHANNEL_CONF = "splunk.hec.total.channels"; +- static final String MAX_HTTP_CONNECTION_PER_CHANNEL_CONF = "splunk.hec.max.http.connection.per.channel"; +- static final String MAX_BATCH_SIZE_CONF = "splunk.hec.max.batch.size"; // record count +- static final String HTTP_KEEPALIVE_CONF = "splunk.hec.http.keepalive"; +- static final String HEC_THREDS_CONF = "splunk.hec.threads"; +- static final String SOCKET_TIMEOUT_CONF = "splunk.hec.socket.timeout"; // seconds +- static final String SSL_VALIDATE_CERTIFICATES_CONF = "splunk.hec.ssl.validate.certs"; +- // Acknowledgement Parameters +- // Use Ack +- static final String ACK_CONF = "splunk.hec.ack.enabled"; +- static final String ACK_POLL_INTERVAL_CONF = "splunk.hec.ack.poll.interval"; // seconds +- static final String ACK_POLL_THREADS_CONF = "splunk.hec.ack.poll.threads"; +- static final String EVENT_TIMEOUT_CONF = "splunk.hec.event.timeout"; // seconds +- static final String MAX_OUTSTANDING_EVENTS_CONF = "splunk.hec.max.outstanding.events"; +- static final String MAX_RETRIES_CONF = "splunk.hec.max.retries"; +- // Endpoint Parameters +- static final String RAW_CONF = "splunk.hec.raw"; +- // /raw endpoint only +- static final String LINE_BREAKER_CONF = "splunk.hec.raw.line.breaker"; +- // /event endpoint only +- static final String USE_RECORD_TIMESTAMP_CONF = "splunk.hec.use.record.timestamp"; +- static final String ENRICHMENT_CONF = "splunk.hec.json.event.enrichment"; +- static final String TRACK_DATA_CONF = "splunk.hec.track.data"; +- // TBD +- static final String SSL_TRUSTSTORE_PATH_CONF = "splunk.hec.ssl.trust.store.path"; +- static final String SSL_TRUSTSTORE_PASSWORD_CONF = "splunk.hec.ssl.trust.store.password"; +- +- // Kafka configuration description strings +- // Required Parameters +- static final String URI_DOC = "Splunk HEC URIs. Either a list of FQDNs or IPs of all Splunk indexers, separated " +- + "with a \",\", or a load balancer. The connector will load balance to indexers using " +- + "round robin. Splunk Connector will round robin to this list of indexers. " +- + "https://hec1.splunk.com:8088,https://hec2.splunk.com:8088,https://hec3.splunk.com:8088"; +- static final String TOKEN_DOC = "Splunk Http Event Collector token."; +- // General Parameters +- static final String INDEX_DOC = "Splunk index names for Kafka topic data separated by comma for multiple topics to " +- + "indexers (\"prod-index1,prod-index2,prod-index3\")."; +- static final String SOURCE_DOC = "Splunk event source metadata for Kafka topic data. The same configuration rules " +- + "as indexes can be applied. If left un-configured, the default source binds to" +- + " the HEC token. By default, this setting is empty."; +- static final String SOURCETYPE_DOC = "Splunk event sourcetype metadata for Kafka topic data. The same configuration " +- + "rules as indexes can be applied here. If left unconfigured, the default source" +- + " binds to the HEC token. By default, this setting is empty"; +- static final String TOTAL_HEC_CHANNEL_DOC = "Total HEC Channels used to post events to Splunk. When enabling HEC ACK, " +- + "setting to the same or 2X number of indexers is generally good."; +- static final String MAX_HTTP_CONNECTION_PER_CHANNEL_DOC = "Max HTTP connections pooled for one HEC Channel " +- + "when posting events to Splunk."; +- static final String MAX_BATCH_SIZE_DOC = "Maximum batch size when posting events to Splunk. The size is the actual number of " +- + "Kafka events not the byte size. By default, this is set to 100."; +- static final String HTTP_KEEPALIVE_DOC = "Valid settings are true or false. Enables or disables HTTP connection " +- + "keep-alive. By default, this is set to true"; +- static final String HEC_THREADS_DOC = "Controls how many threads are spawned to do data injection via HEC in a single " +- + "connector task. By default, this is set to 1."; +- static final String SOCKET_TIMEOUT_DOC = "Max duration in seconds to read / write data to network before internal TCP " +- + "Socket timeout.By default, this is set to 60 seconds."; +- static final String SSL_VALIDATE_CERTIFICATES_DOC = "Valid settings are true or false. Enables or disables HTTPS " +- + "certification validation. By default, this is set to true."; +- // Acknowledgement Parameters +- // Use Ack +- static final String ACK_DOC = "Valid settings are true or false. When set to true Splunk Connect for Kafka will " +- + "poll event ACKs for POST events before check-pointing the Kafka offsets. This is used " +- + "to prevent data loss, as this setting implements guaranteed delivery. By default, this " +- + "setting is set to true."; +- static final String ACK_POLL_INTERVAL_DOC = "This setting is only applicable when splunk.hec.ack.enabled is set to " +- + "true. Internally it controls the event ACKs polling interval. By default, " +- + "this setting is 10 seconds."; +- static final String ACK_POLL_THREADS_DOC = "This setting is used for performance tuning and is only applicable when " +- + "splunk.hec.ack.enabled is set to true. It controls how many threads " +- + "should be spawned to poll event ACKs. By default, this is set to 1."; +- static final String EVENT_TIMEOUT_DOC = "This setting is applicable when splunk.hec.ack.enabled is set to true. " +- + "When events are POSTed to Splunk and before they are ACKed, this setting " +- + "determines how long the connector will wait before timing out and resending. " +- + "By default, this is set to 300 seconds."; +- static final String MAX_OUTSTANDING_EVENTS_DOC = "Maximum amount of un-acknowledged events kept in memory by connector. " +- + "Will trigger back-pressure event to slow collection. By default, this " +- + "is set to 1000000."; +- static final String MAX_RETRIES_DOC = "Number of retries for failed batches before giving up. By default this is set to " +- + "-1 which will retry indefinitely."; +- // Endpoint Parameters +- static final String RAW_DOC = "Set to true in order for Splunk software to ingest data using the the /raw HEC " +- + "endpoint. Default is false, which will use the /event endpoint."; +- // /raw endpoint only +- static final String LINE_BREAKER_DOC = "Only applicable to /raw HEC endpoint. The setting is used to specify a custom " +- + "line breaker to help Splunk separate the events correctly. Note: For example" +- + "you can specify \"#####\" as a special line breaker.By default, this setting is " +- + "empty."; +- // /event endpoint only +- static final String USE_RECORD_TIMESTAMP_DOC = "Valid settings are true or false. When set to `true`, The timestamp " +- + "is retrieved from the Kafka record and passed to Splunk as a HEC meta-data " +- + "override. This will index events in Splunk with the record timestamp. By " +- + "default, this is set to true."; +- static final String ENRICHMENT_DOC = "Only applicable to /event HEC endpoint. This setting is used to enrich raw data " +- + "with extra metadata fields. It contains a list of key value pairs separated by \",\"." +- + " The configured enrichment metadata will be indexed along with raw event data " +- + "by Splunk software. Note: Data enrichment for /event HEC endpoint is only available " +- + "in Splunk Enterprise 6.5 and above. By default, this setting is empty."; +- static final String TRACK_DATA_DOC = "Valid settings are true or false. When set to true, data loss and data injection " +- + "latency metadata will be indexed along with raw data. This setting only works in " +- + "conjunction with /event HEC endpoint (\"splunk.hec.raw\" : \"false\"). By default" +- + ", this is set to false."; +- // TBD +- static final String SSL_TRUSTSTORE_PATH_DOC = "Path on the local disk to the certificate trust store."; +- static final String SSL_TRUSTSTORE_PASSWORD_DOC = "Password for the trust store."; +- +- final String splunkToken; +- final String splunkURI; +- final Map> topicMetas; +- +- final String indexes; +- final String sourcetypes; +- final String sources; +- +- final int totalHecChannels; +- final int maxHttpConnPerChannel; +- final int maxBatchSize; +- final boolean httpKeepAlive; +- final int numberOfThreads; +- final int socketTimeout; +- final boolean validateCertificates; +- +- final boolean ack; +- final int ackPollInterval; +- final int ackPollThreads; +- final int eventBatchTimeout; +- final int maxOutstandingEvents; +- final int maxRetries; +- +- final boolean raw; +- final String lineBreaker; +- final boolean useRecordTimestamp; +- final Map enrichments; +- final boolean trackData; +- +- final boolean hasTrustStorePath; +- final String trustStorePath; +- final String trustStorePassword; +- +- SplunkSinkConnectorConfig(Map taskConfig) { +- super(conf(), taskConfig); +- splunkToken = getPassword(TOKEN_CONF).value(); +- splunkURI = getString(URI_CONF); +- raw = getBoolean(RAW_CONF); +- ack = getBoolean(ACK_CONF); +- indexes = getString(INDEX_CONF); +- sourcetypes = getString(SOURCETYPE_CONF); +- sources = getString(SOURCE_CONF); +- httpKeepAlive = getBoolean(HTTP_KEEPALIVE_CONF); +- validateCertificates = getBoolean(SSL_VALIDATE_CERTIFICATES_CONF); +- trustStorePath = getString(SSL_TRUSTSTORE_PATH_CONF); +- hasTrustStorePath = StringUtils.isNotBlank(trustStorePath); +- trustStorePassword = getPassword(SSL_TRUSTSTORE_PASSWORD_CONF).value(); +- eventBatchTimeout = getInt(EVENT_TIMEOUT_CONF); +- ackPollInterval = getInt(ACK_POLL_INTERVAL_CONF); +- ackPollThreads = getInt(ACK_POLL_THREADS_CONF); +- maxHttpConnPerChannel = getInt(MAX_HTTP_CONNECTION_PER_CHANNEL_CONF); +- totalHecChannels = getInt(TOTAL_HEC_CHANNEL_CONF); +- socketTimeout = getInt(SOCKET_TIMEOUT_CONF); +- enrichments = parseEnrichments(getString(ENRICHMENT_CONF)); +- trackData = getBoolean(TRACK_DATA_CONF); +- useRecordTimestamp = getBoolean(USE_RECORD_TIMESTAMP_CONF); +- maxBatchSize = getInt(MAX_BATCH_SIZE_CONF); +- numberOfThreads = getInt(HEC_THREDS_CONF); +- lineBreaker = getString(LINE_BREAKER_CONF); +- maxOutstandingEvents = getInt(MAX_OUTSTANDING_EVENTS_CONF); +- maxRetries = getInt(MAX_RETRIES_CONF); +- topicMetas = initMetaMap(taskConfig); +- } ++ // General ++ static final String INDEX = "index"; ++ static final String SOURCE = "source"; ++ static final String SOURCETYPE = "sourcetype"; ++ static final String HOST = "host"; + +- public static ConfigDef conf() { +- return new ConfigDef() +- .define(TOKEN_CONF, ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, TOKEN_DOC) +- .define(URI_CONF, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URI_DOC) +- .define(RAW_CONF, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.MEDIUM, RAW_DOC) +- .define(ACK_CONF, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.MEDIUM, ACK_DOC) +- .define(INDEX_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, INDEX_DOC) +- .define(SOURCETYPE_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, SOURCETYPE_DOC) +- .define(SOURCE_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, SOURCE_DOC) +- .define(HTTP_KEEPALIVE_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, HTTP_KEEPALIVE_DOC) +- .define(SSL_VALIDATE_CERTIFICATES_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, SSL_VALIDATE_CERTIFICATES_DOC) +- .define(SSL_TRUSTSTORE_PATH_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PATH_DOC) +- .define(SSL_TRUSTSTORE_PASSWORD_CONF, ConfigDef.Type.PASSWORD, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PASSWORD_DOC) +- .define(EVENT_TIMEOUT_CONF, ConfigDef.Type.INT, 300, ConfigDef.Importance.MEDIUM, EVENT_TIMEOUT_DOC) +- .define(ACK_POLL_INTERVAL_CONF, ConfigDef.Type.INT, 10, ConfigDef.Importance.MEDIUM, ACK_POLL_INTERVAL_DOC) +- .define(ACK_POLL_THREADS_CONF, ConfigDef.Type.INT, 2, ConfigDef.Importance.MEDIUM, ACK_POLL_THREADS_DOC) +- .define(MAX_HTTP_CONNECTION_PER_CHANNEL_CONF, ConfigDef.Type.INT, 2, ConfigDef.Importance.MEDIUM, MAX_HTTP_CONNECTION_PER_CHANNEL_DOC) +- .define(TOTAL_HEC_CHANNEL_CONF, ConfigDef.Type.INT, 2, ConfigDef.Importance.HIGH, TOTAL_HEC_CHANNEL_DOC) +- .define(SOCKET_TIMEOUT_CONF, ConfigDef.Type.INT, 60, ConfigDef.Importance.LOW, SOCKET_TIMEOUT_DOC) +- .define(ENRICHMENT_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW, ENRICHMENT_DOC) +- .define(TRACK_DATA_CONF, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, TRACK_DATA_DOC) +- .define(USE_RECORD_TIMESTAMP_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, USE_RECORD_TIMESTAMP_DOC) +- .define(HEC_THREDS_CONF, ConfigDef.Type.INT, 1, ConfigDef.Importance.LOW, HEC_THREADS_DOC) +- .define(LINE_BREAKER_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, LINE_BREAKER_DOC) +- .define(MAX_OUTSTANDING_EVENTS_CONF, ConfigDef.Type.INT, 1000000, ConfigDef.Importance.MEDIUM, MAX_OUTSTANDING_EVENTS_DOC) +- .define(MAX_RETRIES_CONF, ConfigDef.Type.INT, -1, ConfigDef.Importance.MEDIUM, MAX_RETRIES_DOC) +- .define(MAX_BATCH_SIZE_CONF, ConfigDef.Type.INT, 500, ConfigDef.Importance.MEDIUM, MAX_BATCH_SIZE_DOC); +- } ++ //Headers ++ static final String INDEX_HDR = "splunk.index"; ++ static final String SOURCE_HDR = "splunk.source"; ++ static final String SOURCETYPE_HDR = "splunk.sourcetype"; ++ static final String HOST_HDR = "splunk.host"; ++ static final String TIME_HDR = "splunk.time"; + +- /** +- Configuration Method to setup all settings related to Splunk HEC Client +- */ +- public HecConfig getHecConfig() { +- HecConfig config = new HecConfig(Arrays.asList(splunkURI.split(",")), splunkToken); +- config.setDisableSSLCertVerification(!validateCertificates) +- .setSocketTimeout(socketTimeout) +- .setMaxHttpConnectionPerChannel(maxHttpConnPerChannel) +- .setTotalChannels(totalHecChannels) +- .setEventBatchTimeout(eventBatchTimeout) +- .setHttpKeepAlive(httpKeepAlive) +- .setAckPollInterval(ackPollInterval) +- .setAckPollThreads(ackPollThreads) +- .setEnableChannelTracking(trackData) +- .setTrustStorePath(trustStorePath) +- .setTrustStorePassword(trustStorePassword) +- .setHasCustomTrustStore(hasTrustStorePath); +- return config; +- } ++ // Required Parameters ++ static final String URI_CONF = "splunk.hec.uri"; ++ static final String TOKEN_CONF = "splunk.hec.token"; ++ // General Parameters ++ static final String INDEX_CONF = "splunk.index"; ++ static final String SOURCE_CONF = "splunk.source"; ++ static final String SOURCETYPE_CONF = "splunk.sourcetype"; ++ static final String HOST_CONF = "splunk.host"; ++ static final String INDEXES_CONF = "splunk.indexes"; ++ static final String SOURCES_CONF = "splunk.sources"; ++ static final String SOURCETYPES_CONF = "splunk.sourcetypes"; ++ static final String HOSTS_CONF = "splunk.hosts"; ++ ++ static final String TOTAL_HEC_CHANNEL_CONF = "splunk.hec.total.channels"; ++ static final String MAX_HTTP_CONNECTION_PER_CHANNEL_CONF = "splunk.hec.max.http.connection.per.channel"; ++ static final String MAX_BATCH_SIZE_CONF = "splunk.hec.max.batch.size"; // record count ++ static final String HTTP_KEEPALIVE_CONF = "splunk.hec.http.keepalive"; ++ static final String HEC_THREDS_CONF = "splunk.hec.threads"; ++ static final String SOCKET_TIMEOUT_CONF = "splunk.hec.socket.timeout"; // seconds ++ static final String SSL_VALIDATE_CERTIFICATES_CONF = "splunk.hec.ssl.validate.certs"; ++ // Acknowledgement Parameters ++ // Use Ack ++ static final String ACK_CONF = "splunk.hec.ack.enabled"; ++ static final String ACK_POLL_INTERVAL_CONF = "splunk.hec.ack.poll.interval"; // seconds ++ static final String ACK_POLL_THREADS_CONF = "splunk.hec.ack.poll.threads"; ++ static final String EVENT_TIMEOUT_CONF = "splunk.hec.event.timeout"; // seconds ++ static final String MAX_OUTSTANDING_EVENTS_CONF = "splunk.hec.max.outstanding.events"; ++ static final String MAX_RETRIES_CONF = "splunk.hec.max.retries"; ++ // Endpoint Parameters ++ static final String RAW_CONF = "splunk.hec.raw"; ++ // /raw endpoint only ++ static final String LINE_BREAKER_CONF = "splunk.hec.raw.line.breaker"; ++ // /event endpoint only ++ static final String USE_RECORD_TIMESTAMP_CONF = "splunk.hec.use.record.timestamp"; ++ static final String ENRICHMENT_CONF = "splunk.hec.json.event.enrichment"; ++ static final String TRACK_DATA_CONF = "splunk.hec.track.data"; ++ // TBD ++ static final String SSL_TRUSTSTORE_PATH_CONF = "splunk.hec.ssl.trust.store.path"; ++ static final String SSL_TRUSTSTORE_PASSWORD_CONF = "splunk.hec.ssl.trust.store.password"; ++ ++ // Kafka configuration description strings ++ // Required Parameters ++ static final String URI_DOC = "Splunk HEC URIs. Either a list of FQDNs or IPs of all Splunk indexers, separated " ++ + "with a \",\", or a load balancer. The connector will load balance to indexers using " ++ + "round robin. Splunk Connector will round robin to this list of indexers. " ++ + "https://hec1.splunk.com:8088,https://hec2.splunk.com:8088,https://hec3.splunk.com:8088"; ++ static final String TOKEN_DOC = "Splunk Http Event Collector token."; ++ // General Parameters ++ static final String INDEXES_DOC = "Splunk index names for Kafka topic data separated by comma for multiple topics to " ++ + "indexers (\"prod-index1,prod-index2,prod-index3\")."; ++ static final String SOURCES_DOC = "Splunk event source metadata for Kafka topic data. The same configuration rules " ++ + "as indexes can be applied. If left un-configured, the default source binds to" ++ + " the HEC token. By default, this setting is empty."; ++ static final String SOURCETYPES_DOC = "Splunk event sourcetype metadata for Kafka topic data. The same configuration " ++ + "rules as indexes can be applied here. If left unconfigured, the default source" ++ + " binds to the HEC token. By default, this setting is empty"; ++ static final String HOSTS_DOC = "Splunk event host metadata for Kafka topic data. The same configuration " ++ + "rules as indexes can be applied here. If left unconfigured, the default source" ++ + " binds to the HEC token. By default, this setting is empty"; ++ ++ static final String INDEX_DOC = "Splunk default index name. If unconfigured, the default value binds to the HEC token"; ++ static final String SOURCE_DOC = "Splunk event source. If unconfigured, the default value binds to the HEC token"; ++ static final String SOURCETYPE_DOC = "Splunk event sourcetype. If unconfigured, the default value binds to the HEC token"; ++ static final String HOST_DOC = "Splunk event host. If unconfigured, the default value binds to the HEC token"; ++ ++ static final String TOTAL_HEC_CHANNEL_DOC = "Total HEC Channels used to post events to Splunk. When enabling HEC ACK, " ++ + "setting to the same or 2X number of indexers is generally good."; ++ static final String MAX_HTTP_CONNECTION_PER_CHANNEL_DOC = "Max HTTP connections pooled for one HEC Channel " ++ + "when posting events to Splunk."; ++ static final String MAX_BATCH_SIZE_DOC = "Maximum batch size when posting events to Splunk. The size is the actual number of " ++ + "Kafka events not the byte size. By default, this is set to 100."; ++ static final String HTTP_KEEPALIVE_DOC = "Valid settings are true or false. Enables or disables HTTP connection " ++ + "keep-alive. By default, this is set to true"; ++ static final String HEC_THREADS_DOC = "Controls how many threads are spawned to do data injection via HEC in a single " ++ + "connector task. By default, this is set to 1."; ++ static final String SOCKET_TIMEOUT_DOC = "Max duration in seconds to read / write data to network before internal TCP " ++ + "Socket timeout.By default, this is set to 60 seconds."; ++ static final String SSL_VALIDATE_CERTIFICATES_DOC = "Valid settings are true or false. Enables or disables HTTPS " ++ + "certification validation. By default, this is set to true."; ++ // Acknowledgement Parameters ++ // Use Ack ++ static final String ACK_DOC = "Valid settings are true or false. When set to true Splunk Connect for Kafka will " ++ + "poll event ACKs for POST events before check-pointing the Kafka offsets. This is used " ++ + "to prevent data loss, as this setting implements guaranteed delivery. By default, this " ++ + "setting is set to true."; ++ static final String ACK_POLL_INTERVAL_DOC = "This setting is only applicable when splunk.hec.ack.enabled is set to " ++ + "true. Internally it controls the event ACKs polling interval. By default, " ++ + "this setting is 10 seconds."; ++ static final String ACK_POLL_THREADS_DOC = "This setting is used for performance tuning and is only applicable when " ++ + "splunk.hec.ack.enabled is set to true. It controls how many threads " ++ + "should be spawned to poll event ACKs. By default, this is set to 1."; ++ static final String EVENT_TIMEOUT_DOC = "This setting is applicable when splunk.hec.ack.enabled is set to true. " ++ + "When events are POSTed to Splunk and before they are ACKed, this setting " ++ + "determines how long the connector will wait before timing out and resending. " ++ + "By default, this is set to 300 seconds."; ++ static final String MAX_OUTSTANDING_EVENTS_DOC = "Maximum amount of un-acknowledged events kept in memory by connector. " ++ + "Will trigger back-pressure event to slow collection. By default, this " ++ + "is set to 1000000."; ++ static final String MAX_RETRIES_DOC = "Number of retries for failed batches before giving up. By default this is set to " ++ + "-1 which will retry indefinitely."; ++ // Endpoint Parameters ++ static final String RAW_DOC = "Set to true in order for Splunk software to ingest data using the the /raw HEC " ++ + "endpoint. Default is false, which will use the /event endpoint."; ++ // /raw endpoint only ++ static final String LINE_BREAKER_DOC = "Only applicable to /raw HEC endpoint. The setting is used to specify a custom " ++ + "line breaker to help Splunk separate the events correctly. Note: For example" ++ + "you can specify \"#####\" as a special line breaker.By default, this setting is " ++ + "empty."; ++ // /event endpoint only ++ static final String USE_RECORD_TIMESTAMP_DOC = "Valid settings are true or false. When set to `true`, The timestamp " ++ + "is retrieved from the Kafka record and passed to Splunk as a HEC meta-data " ++ + "override. This will index events in Splunk with the record timestamp. By " ++ + "default, this is set to true."; ++ static final String ENRICHMENT_DOC = "Only applicable to /event HEC endpoint. This setting is used to enrich raw data " ++ + "with extra metadata fields. It contains a list of key value pairs separated by \",\"." ++ + " The configured enrichment metadata will be indexed along with raw event data " ++ + "by Splunk software. Note: Data enrichment for /event HEC endpoint is only available " ++ + "in Splunk Enterprise 6.5 and above. By default, this setting is empty."; ++ static final String TRACK_DATA_DOC = "Valid settings are true or false. When set to true, data loss and data injection " ++ + "latency metadata will be indexed along with raw data. This setting only works in " ++ + "conjunction with /event HEC endpoint (\"splunk.hec.raw\" : \"false\"). By default" ++ + ", this is set to false."; ++ // TBD ++ static final String SSL_TRUSTSTORE_PATH_DOC = "Path on the local disk to the certificate trust store."; ++ static final String SSL_TRUSTSTORE_PASSWORD_DOC = "Password for the trust store."; ++ ++ final String splunkToken; ++ final String splunkURI; ++ final Map> topicMetas; ++ ++ final List indexes; ++ final List sourcetypes; ++ final List sources; ++ final List hosts; ++ ++ final String index; ++ final String sourcetype; ++ final String source; ++ final String host; ++ ++ final int totalHecChannels; ++ final int maxHttpConnPerChannel; ++ final int maxBatchSize; ++ final boolean httpKeepAlive; ++ final int numberOfThreads; ++ final int socketTimeout; ++ final boolean validateCertificates; ++ ++ final boolean ack; ++ final int ackPollInterval; ++ final int ackPollThreads; ++ final int eventBatchTimeout; ++ final int maxOutstandingEvents; ++ final int maxRetries; ++ ++ final boolean raw; ++ final String lineBreaker; ++ final boolean useRecordTimestamp; ++ final Map enrichments; ++ final boolean trackData; + +- public boolean hasMetaDataConfigured() { +- return (indexes != null && !indexes.isEmpty() +- || (sources != null && !sources.isEmpty()) +- || (sourcetypes != null && !sourcetypes.isEmpty())); ++ final boolean hasTrustStorePath; ++ final String trustStorePath; ++ final String trustStorePassword; ++ ++ SplunkSinkConnectorConfig(Map taskConfig) { ++ super(conf(), taskConfig); ++ splunkToken = getPassword(TOKEN_CONF).value(); ++ splunkURI = getString(URI_CONF); ++ raw = getBoolean(RAW_CONF); ++ ack = getBoolean(ACK_CONF); ++ indexes = getList(INDEXES_CONF); ++ sourcetypes = getList(SOURCETYPES_CONF); ++ sources = getList(SOURCES_CONF); ++ hosts = getList(HOSTS_CONF); ++ index = getString(INDEX_CONF); ++ sourcetype = getString(SOURCETYPE_CONF); ++ source = getString(SOURCE_CONF); ++ host = getString(HOST_CONF); ++ httpKeepAlive = getBoolean(HTTP_KEEPALIVE_CONF); ++ validateCertificates = getBoolean(SSL_VALIDATE_CERTIFICATES_CONF); ++ trustStorePath = getString(SSL_TRUSTSTORE_PATH_CONF); ++ hasTrustStorePath = StringUtils.isNotBlank(trustStorePath); ++ trustStorePassword = getPassword(SSL_TRUSTSTORE_PASSWORD_CONF).value(); ++ eventBatchTimeout = getInt(EVENT_TIMEOUT_CONF); ++ ackPollInterval = getInt(ACK_POLL_INTERVAL_CONF); ++ ackPollThreads = getInt(ACK_POLL_THREADS_CONF); ++ maxHttpConnPerChannel = getInt(MAX_HTTP_CONNECTION_PER_CHANNEL_CONF); ++ totalHecChannels = getInt(TOTAL_HEC_CHANNEL_CONF); ++ socketTimeout = getInt(SOCKET_TIMEOUT_CONF); ++ enrichments = parseEnrichments(getString(ENRICHMENT_CONF)); ++ trackData = getBoolean(TRACK_DATA_CONF); ++ useRecordTimestamp = getBoolean(USE_RECORD_TIMESTAMP_CONF); ++ maxBatchSize = getInt(MAX_BATCH_SIZE_CONF); ++ numberOfThreads = getInt(HEC_THREDS_CONF); ++ lineBreaker = getString(LINE_BREAKER_CONF); ++ maxOutstandingEvents = getInt(MAX_OUTSTANDING_EVENTS_CONF); ++ maxRetries = getInt(MAX_RETRIES_CONF); ++ topicMetas = initMetaMap(taskConfig); ++ } ++ ++ public static ConfigDef conf() { ++ return new ConfigDef() ++ .define(SinkTask.TOPICS_CONFIG, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.HIGH, "Sink topics config") ++ .define(TOKEN_CONF, ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, TOKEN_DOC) ++ .define(URI_CONF, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URI_DOC) ++ .define(RAW_CONF, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.MEDIUM, RAW_DOC) ++ .define(ACK_CONF, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.MEDIUM, ACK_DOC) ++ .define(INDEXES_CONF, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM, INDEXES_DOC) ++ .define(SOURCETYPES_CONF, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM, SOURCETYPES_DOC) ++ .define(SOURCES_CONF, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM, SOURCES_DOC) ++ .define(HOSTS_CONF, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM, HOSTS_DOC) ++ .define(INDEX_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, INDEX_DOC) ++ .define(SOURCETYPE_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, SOURCETYPE_DOC) ++ .define(SOURCE_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, SOURCE_DOC) ++ .define(HOST_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, HOST_DOC) ++ .define(HTTP_KEEPALIVE_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, HTTP_KEEPALIVE_DOC) ++ .define(SSL_VALIDATE_CERTIFICATES_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, SSL_VALIDATE_CERTIFICATES_DOC) ++ .define(SSL_TRUSTSTORE_PATH_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PATH_DOC) ++ .define(SSL_TRUSTSTORE_PASSWORD_CONF, ConfigDef.Type.PASSWORD, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PASSWORD_DOC) ++ .define(EVENT_TIMEOUT_CONF, ConfigDef.Type.INT, 300, ConfigDef.Importance.MEDIUM, EVENT_TIMEOUT_DOC) ++ .define(ACK_POLL_INTERVAL_CONF, ConfigDef.Type.INT, 10, ConfigDef.Importance.MEDIUM, ACK_POLL_INTERVAL_DOC) ++ .define(ACK_POLL_THREADS_CONF, ConfigDef.Type.INT, 2, ConfigDef.Importance.MEDIUM, ACK_POLL_THREADS_DOC) ++ .define(MAX_HTTP_CONNECTION_PER_CHANNEL_CONF, ConfigDef.Type.INT, 2, ConfigDef.Importance.MEDIUM, MAX_HTTP_CONNECTION_PER_CHANNEL_DOC) ++ .define(TOTAL_HEC_CHANNEL_CONF, ConfigDef.Type.INT, 2, ConfigDef.Importance.HIGH, TOTAL_HEC_CHANNEL_DOC) ++ .define(SOCKET_TIMEOUT_CONF, ConfigDef.Type.INT, 60, ConfigDef.Importance.LOW, SOCKET_TIMEOUT_DOC) ++ .define(ENRICHMENT_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW, ENRICHMENT_DOC) ++ .define(TRACK_DATA_CONF, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, TRACK_DATA_DOC) ++ .define(USE_RECORD_TIMESTAMP_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, USE_RECORD_TIMESTAMP_DOC) ++ .define(HEC_THREDS_CONF, ConfigDef.Type.INT, 1, ConfigDef.Importance.LOW, HEC_THREADS_DOC) ++ .define(LINE_BREAKER_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, LINE_BREAKER_DOC) ++ .define(MAX_OUTSTANDING_EVENTS_CONF, ConfigDef.Type.INT, 1000000, ConfigDef.Importance.MEDIUM, MAX_OUTSTANDING_EVENTS_DOC) ++ .define(MAX_RETRIES_CONF, ConfigDef.Type.INT, -1, ConfigDef.Importance.MEDIUM, MAX_RETRIES_DOC) ++ .define(MAX_BATCH_SIZE_CONF, ConfigDef.Type.INT, 500, ConfigDef.Importance.MEDIUM, MAX_BATCH_SIZE_DOC); ++ } ++ ++ /** ++ Configuration Method to setup all settings related to Splunk HEC Client ++ */ ++ public HecConfig getHecConfig() { ++ HecConfig config = new HecConfig(Arrays.asList(splunkURI.split(",")), splunkToken); ++ config.setDisableSSLCertVerification(!validateCertificates) ++ .setSocketTimeout(socketTimeout) ++ .setMaxHttpConnectionPerChannel(maxHttpConnPerChannel) ++ .setTotalChannels(totalHecChannels) ++ .setEventBatchTimeout(eventBatchTimeout) ++ .setHttpKeepAlive(httpKeepAlive) ++ .setAckPollInterval(ackPollInterval) ++ .setAckPollThreads(ackPollThreads) ++ .setEnableChannelTracking(trackData) ++ .setTrustStorePath(trustStorePath) ++ .setTrustStorePassword(trustStorePassword) ++ .setHasCustomTrustStore(hasTrustStorePath); ++ return config; ++ } ++ ++ public boolean hasMetaDataConfigured() { ++ return (indexes != null && !indexes.isEmpty() ++ || (sources != null && !sources.isEmpty()) ++ || (sourcetypes != null && !sourcetypes.isEmpty())); ++ } ++ ++ public String toString() { ++ return "splunkURI:" + splunkURI + ", " ++ + "raw:" + raw + ", " ++ + "ack:" + ack + ", " ++ + "index:" + index + ", " ++ + "sourcetype:" + sourcetype + ", " ++ + "source:" + source + ", " ++ + "host:" + host + ", " ++ + "indexes:" + indexes + ", " ++ + "sourcetypes:" + sourcetypes + ", " ++ + "sources:" + sources + ", " ++ + "hosts:" + hosts + ", " ++ + "httpKeepAlive:" + httpKeepAlive + ", " ++ + "validateCertificates:" + validateCertificates + ", " ++ + "trustStorePath:" + trustStorePath + ", " ++ + "socketTimeout:" + socketTimeout + ", " ++ + "eventBatchTimeout:" + eventBatchTimeout + ", " ++ + "ackPollInterval:" + ackPollInterval + ", " ++ + "ackPollThreads:" + ackPollThreads + ", " ++ + "maxHttpConnectionPerChannel:" + maxHttpConnPerChannel + ", " ++ + "totalHecChannels:" + totalHecChannels + ", " ++ + "enrichment: " + getString(ENRICHMENT_CONF) + ", " ++ + "maxBatchSize: " + maxBatchSize + ", " ++ + "numberOfThreads: " + numberOfThreads + ", " ++ + "lineBreaker: " + lineBreaker + ", " ++ + "maxOutstandingEvents: " + maxOutstandingEvents + ", " ++ + "maxRetries: " + maxRetries + ", " ++ + "useRecordTimestamp: " + useRecordTimestamp + ", " ++ + "trackData: " + trackData; ++ } ++ ++ private static String[] split(String data, String sep) { ++ if (data != null && !data.trim().isEmpty()) { ++ return data.trim().split(sep); + } ++ return null; ++ } + +- public String toString() { +- return "splunkURI:" + splunkURI + ", " +- + "raw:" + raw + ", " +- + "ack:" + ack + ", " +- + "indexes:" + indexes + ", " +- + "sourcetypes:" + sourcetypes + ", " +- + "sources:" + sources + ", " +- + "httpKeepAlive:" + httpKeepAlive + ", " +- + "validateCertificates:" + validateCertificates + ", " +- + "trustStorePath:" + trustStorePath + ", " +- + "socketTimeout:" + socketTimeout + ", " +- + "eventBatchTimeout:" + eventBatchTimeout + ", " +- + "ackPollInterval:" + ackPollInterval + ", " +- + "ackPollThreads:" + ackPollThreads + ", " +- + "maxHttpConnectionPerChannel:" + maxHttpConnPerChannel + ", " +- + "totalHecChannels:" + totalHecChannels + ", " +- + "enrichment: " + getString(ENRICHMENT_CONF) + ", " +- + "maxBatchSize: " + maxBatchSize + ", " +- + "numberOfThreads: " + numberOfThreads + ", " +- + "lineBreaker: " + lineBreaker + ", " +- + "maxOutstandingEvents: " + maxOutstandingEvents + ", " +- + "maxRetries: " + maxRetries + ", " +- + "useRecordTimestamp: " + useRecordTimestamp + ", " +- + "trackData: " + trackData; ++ private static Map parseEnrichments(String enrichment) { ++ String[] kvs = split(enrichment, ","); ++ if (kvs == null) { ++ return null; + } + +- private static String[] split(String data, String sep) { +- if (data != null && !data.trim().isEmpty()) { +- return data.trim().split(sep); +- } +- return null; ++ Map enrichmentKvs = new HashMap<>(); ++ for (final String kv : kvs) { ++ String[] kvPairs = split(kv, "="); ++ if (kvPairs.length != 2) { ++ throw new ConfigException("Invalid enrichment: " + enrichment + ". Expect key value pairs and separated by comma"); ++ } ++ enrichmentKvs.put(kvPairs[0], kvPairs[1]); + } ++ return enrichmentKvs; ++ } + +- private static Map parseEnrichments(String enrichment) { +- String[] kvs = split(enrichment, ","); +- if (kvs == null) { +- return null; +- } +- +- Map enrichmentKvs = new HashMap<>(); +- for (final String kv: kvs) { +- String[] kvPairs = split(kv, "="); +- if (kvPairs.length != 2) { +- throw new ConfigException("Invalid enrichment: " + enrichment+ ". Expect key value pairs and separated by comma"); +- } +- enrichmentKvs.put(kvPairs[0], kvPairs[1]); +- } +- return enrichmentKvs; ++ private String getMetaForTopic(List metas, int expectedLength, int curIdx, String confKey) { ++ if (metas == null || metas.size() == 0) { ++ return null; + } + +- private String getMetaForTopic(String[] metas, int expectedLength, int curIdx, String confKey) { +- if (metas == null) { +- return null; +- } +- +- if (metas.length == 1) { +- return metas[0]; +- } else if (metas.length == expectedLength) { +- return metas[curIdx]; +- } else { +- throw new ConfigException("Invalid " + confKey + " configuration=" + metas); +- } ++ if (metas.size() == 1) { ++ return metas.get(0); ++ } else if (metas.size() == expectedLength) { ++ return metas.get(curIdx); ++ } else { ++ throw new ConfigException("Invalid " + confKey + " configuration=" + metas); + } ++ } ++ ++ private Map> initMetaMap(Map taskConfig) { ++ List topics = getList(SinkTask.TOPICS_CONFIG); ++ List topicIndexes = getList(INDEXES_CONF); ++ List topicSourcetypes = getList(SOURCETYPES_CONF); ++ List topicSources = getList(SOURCES_CONF); + +- private Map> initMetaMap(Map taskConfig) { +- String[] topics = split(taskConfig.get(SinkConnector.TOPICS_CONFIG), ","); +- String[] topicIndexes = split(indexes, ","); +- String[] topicSourcetypes = split(sourcetypes, ","); +- String[] topicSources = split(sources, ","); +- +- Map> metaMap = new HashMap<>(); +- int idx = 0; +- for (String topic: topics) { +- HashMap topicMeta = new HashMap<>(); +- String meta = getMetaForTopic(topicIndexes, topics.length, idx, INDEX_CONF); +- if (meta != null) { +- topicMeta.put(INDEX, meta); +- } +- +- meta = getMetaForTopic(topicSourcetypes, topics.length, idx, SOURCETYPE_CONF); +- if (meta != null) { +- topicMeta.put(SOURCETYPE, meta); +- } +- +- meta = getMetaForTopic(topicSources, topics.length, idx, SOURCE_CONF); +- if (meta != null) { +- topicMeta.put(SOURCE, meta); +- } +- +- metaMap.put(topic, topicMeta); +- idx += 1; +- } +- return metaMap; ++ Map> metaMap = new HashMap<>(); ++ int idx = 0; ++ for (String topic : topics) { ++ HashMap topicMeta = new HashMap<>(); ++ String meta = getMetaForTopic(topicIndexes, topics.size(), idx, INDEXES_CONF); ++ if (meta != null) { ++ topicMeta.put(INDEX, meta); ++ } ++ ++ meta = getMetaForTopic(topicSourcetypes, topics.size(), idx, SOURCETYPES_CONF); ++ if (meta != null) { ++ topicMeta.put(SOURCETYPE, meta); ++ } ++ ++ meta = getMetaForTopic(topicSources, topics.size(), idx, SOURCES_CONF); ++ if (meta != null) { ++ topicMeta.put(SOURCE, meta); ++ } ++ ++ //"new style" config for topics overides the old ++ Map topicConfigs = originalsWithPrefix(SinkTask.TOPICS_CONFIG + "." + topic + "."); ++ String index = topicConfigs.getOrDefault(INDEX_CONF, getString(INDEX_CONF)).toString(); ++ if (StringUtils.isNotBlank(index)) topicMeta.put(INDEX, index); ++ ++ String sourcetype = topicConfigs.getOrDefault(SOURCETYPE_CONF, getString(SOURCETYPE_CONF)).toString(); ++ if (StringUtils.isNotBlank(sourcetype)) topicMeta.put(SOURCETYPE, sourcetype); ++ ++ String source = topicConfigs.getOrDefault(SOURCE_CONF, getString(SOURCE_CONF)).toString(); ++ if (StringUtils.isNotBlank(source)) topicMeta.put(SOURCE, source); ++ ++ String host = topicConfigs.getOrDefault(HOST_CONF, getString(HOST_CONF)).toString(); ++ if (StringUtils.isNotBlank(host)) topicMeta.put(HOST, host); ++ ++ metaMap.put(topic, topicMeta); ++ idx += 1; ++ + } ++ ++ ++ return metaMap; ++ } + } +diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java +index 8a76868..e9eee9f 100644 +--- a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java ++++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java +@@ -17,12 +17,17 @@ package com.splunk.kafka.connect; + + import com.splunk.hecclient.*; + import com.splunk.kafka.connect.VersionUtils; ++ ++import org.apache.commons.lang3.StringUtils; + import org.apache.kafka.clients.consumer.OffsetAndMetadata; + import org.apache.kafka.common.TopicPartition; ++import org.apache.kafka.connect.data.Schema; + import org.apache.kafka.connect.errors.RetriableException; + import org.apache.kafka.connect.sink.SinkRecord; + import org.apache.kafka.connect.sink.SinkTask; + ++import java.time.Instant; ++import java.time.LocalDateTime; + import java.util.*; + + import org.slf4j.Logger; +@@ -139,17 +144,13 @@ public final class SplunkSinkTask extends SinkTask implements PollerCallback { + } + + private void handleRaw(final Collection records) { +- if (connectorConfig.hasMetaDataConfigured()) { +- // when setup metadata - index, source, sourcetype, we need partition records for /raw +- Map> partitionedRecords = partitionRecords(records); +- for (Map.Entry> entry: partitionedRecords.entrySet()) { +- EventBatch batch = createRawEventBatch(entry.getKey()); +- sendEvents(entry.getValue(), batch); +- } +- } else { +- EventBatch batch = createRawEventBatch(null); +- sendEvents(records, batch); +- } ++ ++ Map> partitionedRecords = partitionRecords(records); ++ // partition records based on calculated index, source, sourcetype, host ++ for (Map.Entry> entry : partitionedRecords.entrySet()) { ++ sendEvents(entry.getValue(), entry.getKey()); ++ } ++ + } + + private void handleEvent(final Collection records) { +@@ -194,24 +195,6 @@ public final class SplunkSinkTask extends SinkTask implements PollerCallback { + } + } + +- // setup metadata on RawEventBatch +- private EventBatch createRawEventBatch(final TopicPartition tp) { +- if (tp == null) { +- return RawEventBatch.factory().build(); +- } +- +- Map metas = connectorConfig.topicMetas.get(tp.topic()); +- if (metas == null || metas.isEmpty()) { +- return RawEventBatch.factory().build(); +- } +- +- return RawEventBatch.factory() +- .setIndex(metas.get(SplunkSinkConnectorConfig.INDEX)) +- .setSourcetype(metas.get(SplunkSinkConnectorConfig.SOURCETYPE)) +- .setSource(metas.get(SplunkSinkConnectorConfig.SOURCE)) +- .build(); +- } +- + @Override + public Map preCommit(Map meta) { + // tell Kafka Connect framework what are offsets we can safely commit to Kafka now +@@ -260,13 +243,36 @@ public final class SplunkSinkTask extends SinkTask implements PollerCallback { + event.setTime(record.timestamp() / 1000.0); + } + ++ + Map metas = connectorConfig.topicMetas.get(record.topic()); + if (metas != null) { + event.setIndex(metas.get(SplunkSinkConnectorConfig.INDEX)); + event.setSourcetype(metas.get(SplunkSinkConnectorConfig.SOURCETYPE)); + event.setSource(metas.get(SplunkSinkConnectorConfig.SOURCE)); ++ event.setHost(metas.get(SplunkSinkConnectorConfig.HOST)); + event.addFields(connectorConfig.enrichments); + } ++ ++ //overwrite with values from headers ++ if (record.headers().lastWithName(SplunkSinkConnectorConfig.INDEX_HDR) != null) { ++ event.setIndex(record.headers().lastWithName(SplunkSinkConnectorConfig.INDEX_HDR).value().toString()); ++ } ++ if (record.headers().lastWithName(SplunkSinkConnectorConfig.SOURCETYPE_HDR) != null) { ++ event.setSourcetype(record.headers().lastWithName(SplunkSinkConnectorConfig.SOURCETYPE_HDR).value().toString()); ++ } ++ if (record.headers().lastWithName(SplunkSinkConnectorConfig.SOURCE_HDR) != null) { ++ event.setSource(record.headers().lastWithName(SplunkSinkConnectorConfig.SOURCE_HDR).value().toString()); ++ } ++ if (record.headers().lastWithName(SplunkSinkConnectorConfig.HOST_HDR) != null) { ++ event.setHost(record.headers().lastWithName(SplunkSinkConnectorConfig.HOST_HDR).value().toString()); ++ } ++ if (record.headers().lastWithName(SplunkSinkConnectorConfig.TIME_HDR) != null) { ++ long time = Long.valueOf(record.headers().lastWithName(SplunkSinkConnectorConfig.TIME_HDR).value().toString()); ++ event.setTime(time/1000); ++ ++ } ++ ++ + + if (connectorConfig.trackData) { + // for data loss, latency tracking +@@ -304,19 +310,73 @@ public final class SplunkSinkTask extends SinkTask implements PollerCallback { + } + + // partition records according to topic-partition key +- private Map> partitionRecords(Collection records) { +- Map> partitionedRecords = new HashMap<>(); +- +- for (SinkRecord record: records) { +- TopicPartition key = new TopicPartition(record.topic(), record.kafkaPartition()); +- Collection partitioned = partitionedRecords.get(key); +- if (partitioned == null) { +- partitioned = new ArrayList<>(); +- partitionedRecords.put(key, partitioned); +- } +- partitioned.add(record); ++ private Map> partitionRecords(Collection records) { ++ Map> partitionedRecords = new HashMap<>(); ++ ++ for (SinkRecord record : records) { ++ EventBatch batch = getBatchForRecord(record); ++ Collection partitioned = partitionedRecords.get(batch); ++ if (partitioned == null) { ++ partitioned = new ArrayList<>(); ++ partitionedRecords.put(batch, partitioned); + } +- return partitionedRecords; ++ partitioned.add(record); ++ } ++ return partitionedRecords; ++ } ++ ++ private EventBatch getBatchForRecord(SinkRecord record) { ++ //get metadata in the order: ++ // 1. defaults (splunk.index, splunk.sourcetype, splunk.source etc ) ++ // 2. configured topic metas ++ // 3. from record headers ++ ++ String index = connectorConfig.index; ++ String sourcetype= connectorConfig.sourcetype; ++ String source= connectorConfig.source; ++ String host= connectorConfig.host; ++ ++ ++ if (StringUtils.isNotBlank(connectorConfig.topicMetas.getOrDefault(record.topic(), Collections.emptyMap()).getOrDefault(SplunkSinkConnectorConfig.INDEX, null))) { ++ index = connectorConfig.topicMetas.getOrDefault(record.topic(), Collections.emptyMap()).getOrDefault(SplunkSinkConnectorConfig.INDEX, null); ++ } ++ if (StringUtils.isNotBlank(connectorConfig.topicMetas.getOrDefault(record.topic(), Collections.emptyMap()).getOrDefault(SplunkSinkConnectorConfig.SOURCETYPE, null))) { ++ sourcetype = connectorConfig.topicMetas.getOrDefault(record.topic(), Collections.emptyMap()).getOrDefault(SplunkSinkConnectorConfig.SOURCETYPE, null); ++ } ++ ++ if (StringUtils.isNotBlank(connectorConfig.topicMetas.getOrDefault(record.topic(), Collections.emptyMap()).getOrDefault(SplunkSinkConnectorConfig.SOURCE, null))) { ++ source = connectorConfig.topicMetas.getOrDefault(record.topic(), Collections.emptyMap()).getOrDefault(SplunkSinkConnectorConfig.SOURCE, null); ++ } ++ if (StringUtils.isNotBlank(connectorConfig.topicMetas.getOrDefault(record.topic(), Collections.emptyMap()).getOrDefault(SplunkSinkConnectorConfig.HOST, null))) { ++ host = connectorConfig.topicMetas.getOrDefault(record.topic(), Collections.emptyMap()).getOrDefault(SplunkSinkConnectorConfig.HOST, null); ++ } ++ ++ if (record.headers().lastWithName(SplunkSinkConnectorConfig.INDEX_HDR) != null) { ++ index = record.headers().lastWithName(SplunkSinkConnectorConfig.INDEX_HDR).value().toString(); ++ } ++ if (record.headers().lastWithName(SplunkSinkConnectorConfig.SOURCETYPE_HDR) != null) { ++ sourcetype = record.headers().lastWithName(SplunkSinkConnectorConfig.SOURCETYPE_HDR).value().toString(); ++ } ++ if (record.headers().lastWithName(SplunkSinkConnectorConfig.SOURCE_HDR) != null) { ++ source = record.headers().lastWithName(SplunkSinkConnectorConfig.SOURCE_HDR).value().toString(); ++ } ++ if (record.headers().lastWithName(SplunkSinkConnectorConfig.HOST_HDR) != null) { ++ host = record.headers().lastWithName(SplunkSinkConnectorConfig.HOST_HDR).value().toString(); ++ } ++ ++ long time = -1; ++ if (record.headers().lastWithName(SplunkSinkConnectorConfig.TIME_HDR) != null) { ++ time = Long.valueOf(record.headers().lastWithName(SplunkSinkConnectorConfig.TIME_HDR).value().toString()); ++ } ++ ++ ++ return RawEventBatch.factory() ++ .setHost(host) ++ .setIndex(index) ++ .setSource(source) ++ .setSourcetype(sourcetype) ++ .setTime(time) ++ .build(); + } + + private HecInf createHec() { +diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnectorConfigTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnectorConfigTest.java +index 769f403..f82eaa7 100644 +--- a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnectorConfigTest.java ++++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnectorConfigTest.java +@@ -144,9 +144,9 @@ public class SplunkSinkConnectorConfigTest { + UnitUtil uu = new UnitUtil(0); + Map config = uu.createTaskConfig(); + config.put(SinkConnector.TOPICS_CONFIG, "t1,t2,t3"); +- config.put(SplunkSinkConnectorConfig.INDEX_CONF, "i1,i2,i3"); +- config.put(SplunkSinkConnectorConfig.SOURCE_CONF, "s1,s2,s3"); +- config.put(SplunkSinkConnectorConfig.SOURCETYPE_CONF, "e1,e2,e3"); ++ config.put(SplunkSinkConnectorConfig.INDEXES_CONF, "i1,i2,i3"); ++ config.put(SplunkSinkConnectorConfig.SOURCES_CONF, "s1,s2,s3"); ++ config.put(SplunkSinkConnectorConfig.SOURCETYPES_CONF, "e1,e2,e3"); + SplunkSinkConnectorConfig connectorConfig = new SplunkSinkConnectorConfig(config); + + Map> topicMetas = new HashMap<>(); +@@ -169,9 +169,9 @@ public class SplunkSinkConnectorConfigTest { + // one index, multiple source, source types + Map config = uu.createTaskConfig(); + config.put(SinkConnector.TOPICS_CONFIG, "t1,t2,t3"); +- config.put(SplunkSinkConnectorConfig.INDEX_CONF, "i1"); +- config.put(SplunkSinkConnectorConfig.SOURCE_CONF, "s1,s2,s3"); +- config.put(SplunkSinkConnectorConfig.SOURCETYPE_CONF, "e1,e2,e3"); ++ config.put(SplunkSinkConnectorConfig.INDEXES_CONF, "i1"); ++ config.put(SplunkSinkConnectorConfig.SOURCES_CONF, "s1,s2,s3"); ++ config.put(SplunkSinkConnectorConfig.SOURCETYPES_CONF, "e1,e2,e3"); + SplunkSinkConnectorConfig connectorConfig = new SplunkSinkConnectorConfig(config); + + Map> topicMetas = new HashMap<>(); +@@ -194,24 +194,24 @@ public class SplunkSinkConnectorConfigTest { + // index, source, sourcetypes + Map config = uu.createTaskConfig(); + config.put(SinkConnector.TOPICS_CONFIG, "t1"); +- config.put(SplunkSinkConnectorConfig.INDEX_CONF, "i1"); +- config.put(SplunkSinkConnectorConfig.SOURCE_CONF, "s1"); +- config.put(SplunkSinkConnectorConfig.SOURCETYPE_CONF, "e1"); ++ config.put(SplunkSinkConnectorConfig.INDEXES_CONF, "i1"); ++ config.put(SplunkSinkConnectorConfig.SOURCES_CONF, "s1"); ++ config.put(SplunkSinkConnectorConfig.SOURCETYPES_CONF, "e1"); + SplunkSinkConnectorConfig connectorConfig = new SplunkSinkConnectorConfig(config); + Assert.assertTrue(connectorConfig.hasMetaDataConfigured()); + + // source, sourcetype + config = uu.createTaskConfig(); + config.put(SinkConnector.TOPICS_CONFIG, "t1"); +- config.put(SplunkSinkConnectorConfig.SOURCE_CONF, "s1"); +- config.put(SplunkSinkConnectorConfig.SOURCETYPE_CONF, "e1"); ++ config.put(SplunkSinkConnectorConfig.SOURCES_CONF, "s1"); ++ config.put(SplunkSinkConnectorConfig.SOURCETYPES_CONF, "e1"); + connectorConfig = new SplunkSinkConnectorConfig(config); + Assert.assertTrue(connectorConfig.hasMetaDataConfigured()); + + // sourcetype + config = uu.createTaskConfig(); + config.put(SinkConnector.TOPICS_CONFIG, "t1"); +- config.put(SplunkSinkConnectorConfig.SOURCETYPE_CONF, "e1"); ++ config.put(SplunkSinkConnectorConfig.SOURCETYPES_CONF, "e1"); + connectorConfig = new SplunkSinkConnectorConfig(config); + Assert.assertTrue(connectorConfig.hasMetaDataConfigured()); + } +@@ -223,9 +223,9 @@ public class SplunkSinkConnectorConfigTest { + // one index, multiple source, sourcetypes + Map config = uu.createTaskConfig(); + config.put(SinkConnector.TOPICS_CONFIG, "t1,t2,t3"); +- config.put(SplunkSinkConnectorConfig.INDEX_CONF, "i1,i2"); +- config.put(SplunkSinkConnectorConfig.SOURCE_CONF, "s1,s2,s3"); +- config.put(SplunkSinkConnectorConfig.SOURCETYPE_CONF, "e1,e2,e3"); ++ config.put(SplunkSinkConnectorConfig.INDEXES_CONF, "i1,i2"); ++ config.put(SplunkSinkConnectorConfig.SOURCES_CONF, "s1,s2,s3"); ++ config.put(SplunkSinkConnectorConfig.SOURCETYPES_CONF, "e1,e2,e3"); + SplunkSinkConnectorConfig connectorConfig = new SplunkSinkConnectorConfig(config); + } + +diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java +index 59c3417..bf3619e 100644 +--- a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java ++++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java +@@ -23,6 +23,7 @@ import org.apache.kafka.common.TopicPartition; + import org.apache.kafka.common.record.TimestampType; + import org.apache.kafka.connect.errors.RetriableException; + import org.apache.kafka.connect.sink.SinkRecord; ++import org.apache.kafka.connect.sink.SinkTask; + import org.junit.Assert; + import org.junit.Test; + +@@ -64,8 +65,18 @@ public class SplunkSinkTaskTest { + + @Test + public void putWithEventAndAck() { +- putWithSuccess(false, true); +- putWithSuccess(false, false); ++ Map extraConf = new HashMap<>(); ++ extraConf.put(SplunkSinkConnectorConfig.INDEXES_CONF, "i1"); ++ extraConf.put(SplunkSinkConnectorConfig.SOURCETYPES_CONF, "s1"); ++ extraConf.put(SplunkSinkConnectorConfig.SOURCES_CONF, "e1"); ++ putWithSuccess(true, true, extraConf); ++ ++ Map conf = new HashMap<>(); ++ conf.put(SplunkSinkConnectorConfig.INDEXES_CONF, ""); ++ conf.put(SplunkSinkConnectorConfig.SOURCETYPES_CONF, ""); ++ conf.put(SplunkSinkConnectorConfig.SOURCES_CONF, ""); ++ putWithSuccess(true, false, conf); ++ + } + + @Test +@@ -218,15 +229,38 @@ public class SplunkSinkTaskTest { + + @Test + public void putWithRawAndAck() { +- putWithSuccess(true, true); ++ Map extraConf = new HashMap<>(); ++ extraConf.put(SplunkSinkConnectorConfig.INDEXES_CONF, "i1"); ++ extraConf.put(SplunkSinkConnectorConfig.SOURCETYPES_CONF, "s1"); ++ extraConf.put(SplunkSinkConnectorConfig.SOURCES_CONF, "e1"); ++ putWithSuccess(true, true, extraConf); ++ ++ ++ } ++ ++ @Test ++ public void putWithRawAndAckAndNewMeta() { ++ Map extraConf = new HashMap<>(); ++ extraConf.put(SinkTask.TOPICS_CONFIG, "mytopic"); ++ extraConf.put(SinkTask.TOPICS_CONFIG+".mytopic."+ SplunkSinkConnectorConfig.INDEX_CONF, "i1"); ++ extraConf.put(SinkTask.TOPICS_CONFIG+".mytopic."+ SplunkSinkConnectorConfig.SOURCETYPE_CONF, "s1"); ++ extraConf.put(SinkTask.TOPICS_CONFIG+".mytopic."+ SplunkSinkConnectorConfig.SOURCE_CONF, "e1"); ++ putWithSuccess(true, true, extraConf); ++ ++ + } ++ + + @Test + public void putWithRawAndAckWithoutMeta() { +- putWithSuccess(true, false); ++ Map conf = new HashMap<>(); ++ conf.put(SplunkSinkConnectorConfig.INDEXES_CONF, ""); ++ conf.put(SplunkSinkConnectorConfig.SOURCETYPES_CONF, ""); ++ conf.put(SplunkSinkConnectorConfig.SOURCES_CONF, ""); ++ putWithSuccess(true, false, conf); + } + +- private void putWithSuccess(boolean raw, boolean withMeta) { ++ private void putWithSuccess(boolean raw, boolean withMeta, Map conf) { + int batchSize = 100; + int total = 1000; + +@@ -235,15 +269,8 @@ public class SplunkSinkTaskTest { + config.put(SplunkSinkConnectorConfig.RAW_CONF, String.valueOf(raw)); + config.put(SplunkSinkConnectorConfig.ACK_CONF, String.valueOf(true)); + config.put(SplunkSinkConnectorConfig.MAX_BATCH_SIZE_CONF, String.valueOf(batchSize)); +- if (withMeta) { +- config.put(SplunkSinkConnectorConfig.INDEX_CONF, "i1"); +- config.put(SplunkSinkConnectorConfig.SOURCETYPE_CONF, "s1"); +- config.put(SplunkSinkConnectorConfig.SOURCE_CONF, "e1"); +- } else { +- config.put(SplunkSinkConnectorConfig.INDEX_CONF, ""); +- config.put(SplunkSinkConnectorConfig.SOURCETYPE_CONF, ""); +- config.put(SplunkSinkConnectorConfig.SOURCE_CONF, ""); +- } ++ config.putAll(conf); ++ + + SplunkSinkTask task = new SplunkSinkTask(); + HecMock hec = new HecMock(task); +diff --git a/src/test/java/com/splunk/kafka/connect/UnitUtil.java b/src/test/java/com/splunk/kafka/connect/UnitUtil.java +index ff870f0..96b4ae8 100644 +--- a/src/test/java/com/splunk/kafka/connect/UnitUtil.java ++++ b/src/test/java/com/splunk/kafka/connect/UnitUtil.java +@@ -35,9 +35,9 @@ public class UnitUtil { + config.put(SplunkSinkConnectorConfig.URI_CONF, configProfile.getUri()); + config.put(SplunkSinkConnectorConfig.RAW_CONF, String.valueOf(configProfile.isRaw())); + config.put(SplunkSinkConnectorConfig.ACK_CONF , String.valueOf(configProfile.isAck())); +- config.put(SplunkSinkConnectorConfig.INDEX_CONF, configProfile.getIndexes()); +- config.put(SplunkSinkConnectorConfig.SOURCETYPE_CONF, configProfile.getSourcetypes()); +- config.put(SplunkSinkConnectorConfig.SOURCE_CONF, configProfile.getSources()); ++ config.put(SplunkSinkConnectorConfig.INDEXES_CONF, configProfile.getIndexes()); ++ config.put(SplunkSinkConnectorConfig.SOURCETYPES_CONF, configProfile.getSourcetypes()); ++ config.put(SplunkSinkConnectorConfig.SOURCES_CONF, configProfile.getSources()); + config.put(SplunkSinkConnectorConfig.HTTP_KEEPALIVE_CONF, String.valueOf(configProfile.isHttpKeepAlive())); + config.put(SplunkSinkConnectorConfig.SSL_VALIDATE_CERTIFICATES_CONF, String.valueOf(configProfile.isValidateCertificates())); + +-- +2.8.3.windows.1 + diff --git a/src/main/java/com/splunk/hecclient/HecAckPoller.java b/src/main/java/com/splunk/hecclient/HecAckPoller.java index 0fcfafa9..832f6af7 100644 --- a/src/main/java/com/splunk/hecclient/HecAckPoller.java +++ b/src/main/java/com/splunk/hecclient/HecAckPoller.java @@ -209,9 +209,9 @@ public void stickySessionHandler(HecChannel channel) { log.info("Failing {} batches for the channel {}, these will be resent by the connector.", channelBatches.size(), oldChannelId); if (pollerCallback != null) { List expired = new ArrayList<>(); - Iterator iter = channelBatches.entrySet().iterator(); + Iterator> iter = channelBatches.entrySet().iterator(); while(iter.hasNext()) { - Map.Entry pair = (Map.Entry) iter.next(); + Map.Entry pair = iter.next(); EventBatch batch = pair.getValue(); totalOutstandingEventBatches.decrementAndGet(); batch.fail(); diff --git a/src/main/java/com/splunk/hecclient/JsonEvent.java b/src/main/java/com/splunk/hecclient/JsonEvent.java index 983e3f3c..da24c8e7 100644 --- a/src/main/java/com/splunk/hecclient/JsonEvent.java +++ b/src/main/java/com/splunk/hecclient/JsonEvent.java @@ -127,7 +127,7 @@ public String toString() { return jsonMapper.writeValueAsString(this); } catch (Exception ex) { log.error("failed to json serlized JsonEvent", ex); - throw new HecException("failed to json serlized JsonEvent", ex); + throw new HecException("failed to json serialized JsonEvent", ex); } } diff --git a/src/main/java/com/splunk/hecclient/JsonEventBatch.java b/src/main/java/com/splunk/hecclient/JsonEventBatch.java index 1f7f45af..26978279 100644 --- a/src/main/java/com/splunk/hecclient/JsonEventBatch.java +++ b/src/main/java/com/splunk/hecclient/JsonEventBatch.java @@ -15,6 +15,8 @@ */ package com.splunk.hecclient; +import org.apache.commons.lang3.builder.HashCodeBuilder; + public final class JsonEventBatch extends EventBatch { public static final String endpoint = "/services/collector/event"; public static final String contentType = "application/json; profile=urn:splunk:event:1.0; charset=utf-8"; @@ -43,4 +45,20 @@ public String getContentType() { public EventBatch createFromThis() { return new JsonEventBatch(); } -} + + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(endpoint) + .toHashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof JsonEventBatch) { + final JsonEventBatch other = (JsonEventBatch) obj; + return this.endpoint.equals(other.endpoint); + } + return false; + } +} \ No newline at end of file diff --git a/src/main/java/com/splunk/hecclient/RawEventBatch.java b/src/main/java/com/splunk/hecclient/RawEventBatch.java index a5fbc47f..860d0090 100644 --- a/src/main/java/com/splunk/hecclient/RawEventBatch.java +++ b/src/main/java/com/splunk/hecclient/RawEventBatch.java @@ -15,6 +15,8 @@ */ package com.splunk.hecclient; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.http.client.utils.URIBuilder; public final class RawEventBatch extends EventBatch { @@ -87,7 +89,7 @@ public Builder setHost(final String host) { return this; } - public Builder setTime(final int time) { + public Builder setTime(final long time) { this.time = time; return this; } @@ -146,4 +148,28 @@ private static void putIfPresent(String val, String tag, URIBuilder params) { params.addParameter(tag, val); } } -} + + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(index) + .append(sourcetype) + .append(source) + .append(host) + .toHashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof RawEventBatch) { + final RawEventBatch other = (RawEventBatch) obj; + return new EqualsBuilder() + .append(index, other.index) + .append(sourcetype, other.sourcetype) + .append(source, other.source) + .append(host, other.host) + .isEquals(); + } + return false; + } +} \ No newline at end of file diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java index 25d2c1f2..0299b075 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java @@ -36,6 +36,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { static final String INDEX_CONF = "splunk.indexes"; static final String SOURCE_CONF = "splunk.sources"; static final String SOURCETYPE_CONF = "splunk.sourcetypes"; + static final String TOTAL_HEC_CHANNEL_CONF = "splunk.hec.total.channels"; static final String MAX_HTTP_CONNECTION_PER_CHANNEL_CONF = "splunk.hec.max.http.connection.per.channel"; static final String MAX_BATCH_SIZE_CONF = "splunk.hec.max.batch.size"; // record count @@ -59,9 +60,17 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { static final String USE_RECORD_TIMESTAMP_CONF = "splunk.hec.use.record.timestamp"; static final String ENRICHMENT_CONF = "splunk.hec.json.event.enrichment"; static final String TRACK_DATA_CONF = "splunk.hec.track.data"; - // TBD + static final String HEC_EVENT_FORMATTED_CONF = "splunk.hec.json.event.formatted"; + // Trust store static final String SSL_TRUSTSTORE_PATH_CONF = "splunk.hec.ssl.trust.store.path"; static final String SSL_TRUSTSTORE_PASSWORD_CONF = "splunk.hec.ssl.trust.store.password"; + //Headers + static final String HEADER_SUPPORT_CONF = "splunk.header.support"; + static final String HEADER_CUSTOM_CONF = "splunk.header.custom"; + static final String HEADER_INDEX_CONF = "splunk.header.index"; + static final String HEADER_SOURCE_CONF = "splunk.header.source"; + static final String HEADER_SOURCETYPE_CONF = "splunk.header.sourcetype"; + static final String HEADER_HOST_CONF = "splunk.header.host"; // Kafka configuration description strings // Required Parameters @@ -78,7 +87,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { + " the HEC token. By default, this setting is empty."; static final String SOURCETYPE_DOC = "Splunk event sourcetype metadata for Kafka topic data. The same configuration " + "rules as indexes can be applied here. If left unconfigured, the default source" - + " binds to the HEC token. By default, this setting is empty"; + + " binds to the HEC token. By default, this setting is empty" + + "through to splunk. Only use with JSON Event endpoint"; static final String TOTAL_HEC_CHANNEL_DOC = "Total HEC Channels used to post events to Splunk. When enabling HEC ACK, " + "setting to the same or 2X number of indexers is generally good."; static final String MAX_HTTP_CONNECTION_PER_CHANNEL_DOC = "Max HTTP connections pooled for one HEC Channel " @@ -136,10 +146,21 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { + "latency metadata will be indexed along with raw data. This setting only works in " + "conjunction with /event HEC endpoint (\"splunk.hec.raw\" : \"false\"). By default" + ", this is set to false."; + static final String HEC_EVENT_FORMATTED_DOC = "Ensures events that are pre-formatted into the properly formatted HEC " + + "JSON format as per http://dev.splunk.com/view/event-collector/SP-CAAAE6P have meta-data and event data indexed " + + "correctly by Splunk."; // TBD static final String SSL_TRUSTSTORE_PATH_DOC = "Path on the local disk to the certificate trust store."; static final String SSL_TRUSTSTORE_PASSWORD_DOC = "Password for the trust store."; + static final String HEADER_SUPPORT_DOC = "Setting will enable Kafka Record headers to be used for meta data override"; + static final String HEADER_CUSTOM_DOC = "Setting will enable look for Record headers with these values and add them" + + "to each event if present. Custom headers are configured separated by comma for multiple headers. ex, (\"custom_header_1,custom_header_2,custom_header_3\")."; + static final String HEADER_INDEX_DOC = "Header to use for Splunk Header Index"; + static final String HEADER_SOURCE_DOC = "Header to use for Splunk Header Source"; + static final String HEADER_SOURCETYPE_DOC = "Header to use for Splunk Header Sourcetype"; + static final String HEADER_HOST_DOC = "Header to use for Splunk Header Host"; + final String splunkToken; final String splunkURI; final Map> topicMetas; @@ -164,6 +185,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { final int maxRetries; final boolean raw; + final boolean hecEventFormatted; + final String lineBreaker; final boolean useRecordTimestamp; final Map enrichments; @@ -173,6 +196,13 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { final String trustStorePath; final String trustStorePassword; + final boolean headerSupport; + final String headerCustom; + final String headerIndex; + final String headerSource; + final String headerSourcetype; + final String headerHost; + SplunkSinkConnectorConfig(Map taskConfig) { super(conf(), taskConfig); splunkToken = getPassword(TOKEN_CONF).value(); @@ -201,55 +231,68 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { lineBreaker = getString(LINE_BREAKER_CONF); maxOutstandingEvents = getInt(MAX_OUTSTANDING_EVENTS_CONF); maxRetries = getInt(MAX_RETRIES_CONF); + hecEventFormatted = getBoolean(HEC_EVENT_FORMATTED_CONF); topicMetas = initMetaMap(taskConfig); + headerSupport = getBoolean(HEADER_SUPPORT_CONF); + headerCustom = getString(HEADER_CUSTOM_CONF); + headerIndex = getString(HEADER_INDEX_CONF); + headerSource = getString(HEADER_SOURCE_CONF); + headerSourcetype = getString(HEADER_SOURCETYPE_CONF); + headerHost = getString(HEADER_HOST_CONF); } public static ConfigDef conf() { return new ConfigDef() - .define(TOKEN_CONF, ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, TOKEN_DOC) - .define(URI_CONF, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URI_DOC) - .define(RAW_CONF, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.MEDIUM, RAW_DOC) - .define(ACK_CONF, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.MEDIUM, ACK_DOC) - .define(INDEX_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, INDEX_DOC) - .define(SOURCETYPE_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, SOURCETYPE_DOC) - .define(SOURCE_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, SOURCE_DOC) - .define(HTTP_KEEPALIVE_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, HTTP_KEEPALIVE_DOC) - .define(SSL_VALIDATE_CERTIFICATES_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, SSL_VALIDATE_CERTIFICATES_DOC) - .define(SSL_TRUSTSTORE_PATH_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PATH_DOC) - .define(SSL_TRUSTSTORE_PASSWORD_CONF, ConfigDef.Type.PASSWORD, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PASSWORD_DOC) - .define(EVENT_TIMEOUT_CONF, ConfigDef.Type.INT, 300, ConfigDef.Importance.MEDIUM, EVENT_TIMEOUT_DOC) - .define(ACK_POLL_INTERVAL_CONF, ConfigDef.Type.INT, 10, ConfigDef.Importance.MEDIUM, ACK_POLL_INTERVAL_DOC) - .define(ACK_POLL_THREADS_CONF, ConfigDef.Type.INT, 2, ConfigDef.Importance.MEDIUM, ACK_POLL_THREADS_DOC) - .define(MAX_HTTP_CONNECTION_PER_CHANNEL_CONF, ConfigDef.Type.INT, 2, ConfigDef.Importance.MEDIUM, MAX_HTTP_CONNECTION_PER_CHANNEL_DOC) - .define(TOTAL_HEC_CHANNEL_CONF, ConfigDef.Type.INT, 2, ConfigDef.Importance.HIGH, TOTAL_HEC_CHANNEL_DOC) - .define(SOCKET_TIMEOUT_CONF, ConfigDef.Type.INT, 60, ConfigDef.Importance.LOW, SOCKET_TIMEOUT_DOC) - .define(ENRICHMENT_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW, ENRICHMENT_DOC) - .define(TRACK_DATA_CONF, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, TRACK_DATA_DOC) - .define(USE_RECORD_TIMESTAMP_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, USE_RECORD_TIMESTAMP_DOC) - .define(HEC_THREDS_CONF, ConfigDef.Type.INT, 1, ConfigDef.Importance.LOW, HEC_THREADS_DOC) - .define(LINE_BREAKER_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, LINE_BREAKER_DOC) - .define(MAX_OUTSTANDING_EVENTS_CONF, ConfigDef.Type.INT, 1000000, ConfigDef.Importance.MEDIUM, MAX_OUTSTANDING_EVENTS_DOC) - .define(MAX_RETRIES_CONF, ConfigDef.Type.INT, -1, ConfigDef.Importance.MEDIUM, MAX_RETRIES_DOC) - .define(MAX_BATCH_SIZE_CONF, ConfigDef.Type.INT, 500, ConfigDef.Importance.MEDIUM, MAX_BATCH_SIZE_DOC); + .define(TOKEN_CONF, ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, TOKEN_DOC) + .define(URI_CONF, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URI_DOC) + .define(RAW_CONF, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.MEDIUM, RAW_DOC) + .define(ACK_CONF, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.MEDIUM, ACK_DOC) + .define(INDEX_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, INDEX_DOC) + .define(SOURCETYPE_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, SOURCETYPE_DOC) + .define(SOURCE_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, SOURCE_DOC) + .define(HTTP_KEEPALIVE_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, HTTP_KEEPALIVE_DOC) + .define(SSL_VALIDATE_CERTIFICATES_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, SSL_VALIDATE_CERTIFICATES_DOC) + .define(SSL_TRUSTSTORE_PATH_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PATH_DOC) + .define(SSL_TRUSTSTORE_PASSWORD_CONF, ConfigDef.Type.PASSWORD, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PASSWORD_DOC) + .define(EVENT_TIMEOUT_CONF, ConfigDef.Type.INT, 300, ConfigDef.Importance.MEDIUM, EVENT_TIMEOUT_DOC) + .define(ACK_POLL_INTERVAL_CONF, ConfigDef.Type.INT, 10, ConfigDef.Importance.MEDIUM, ACK_POLL_INTERVAL_DOC) + .define(ACK_POLL_THREADS_CONF, ConfigDef.Type.INT, 2, ConfigDef.Importance.MEDIUM, ACK_POLL_THREADS_DOC) + .define(MAX_HTTP_CONNECTION_PER_CHANNEL_CONF, ConfigDef.Type.INT, 2, ConfigDef.Importance.MEDIUM, MAX_HTTP_CONNECTION_PER_CHANNEL_DOC) + .define(TOTAL_HEC_CHANNEL_CONF, ConfigDef.Type.INT, 2, ConfigDef.Importance.HIGH, TOTAL_HEC_CHANNEL_DOC) + .define(SOCKET_TIMEOUT_CONF, ConfigDef.Type.INT, 60, ConfigDef.Importance.LOW, SOCKET_TIMEOUT_DOC) + .define(ENRICHMENT_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW, ENRICHMENT_DOC) + .define(TRACK_DATA_CONF, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, TRACK_DATA_DOC) + .define(USE_RECORD_TIMESTAMP_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, USE_RECORD_TIMESTAMP_DOC) + .define(HEC_THREDS_CONF, ConfigDef.Type.INT, 1, ConfigDef.Importance.LOW, HEC_THREADS_DOC) + .define(LINE_BREAKER_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, LINE_BREAKER_DOC) + .define(MAX_OUTSTANDING_EVENTS_CONF, ConfigDef.Type.INT, 1000000, ConfigDef.Importance.MEDIUM, MAX_OUTSTANDING_EVENTS_DOC) + .define(MAX_RETRIES_CONF, ConfigDef.Type.INT, -1, ConfigDef.Importance.MEDIUM, MAX_RETRIES_DOC) + .define(HEC_EVENT_FORMATTED_CONF, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, HEC_EVENT_FORMATTED_DOC) + .define(MAX_BATCH_SIZE_CONF, ConfigDef.Type.INT, 500, ConfigDef.Importance.MEDIUM, MAX_BATCH_SIZE_DOC) + .define(HEADER_SUPPORT_CONF, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.MEDIUM, HEADER_SUPPORT_DOC) + .define(HEADER_CUSTOM_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, HEADER_CUSTOM_DOC) + .define(HEADER_INDEX_CONF, ConfigDef.Type.STRING, "splunk.header.index", ConfigDef.Importance.MEDIUM, HEADER_INDEX_DOC) + .define(HEADER_SOURCE_CONF, ConfigDef.Type.STRING, "splunk.header.source", ConfigDef.Importance.MEDIUM, HEADER_SOURCE_DOC) + .define(HEADER_SOURCETYPE_CONF, ConfigDef.Type.STRING, "splunk.header.sourcetype", ConfigDef.Importance.MEDIUM, HEADER_SOURCETYPE_DOC) + .define(HEADER_HOST_CONF, ConfigDef.Type.STRING, "splunk.header.host", ConfigDef.Importance.MEDIUM, HEADER_HOST_DOC); } - /** - Configuration Method to setup all settings related to Splunk HEC Client + Configuration Method to setup all settings related to Splunk HEC Client */ public HecConfig getHecConfig() { HecConfig config = new HecConfig(Arrays.asList(splunkURI.split(",")), splunkToken); config.setDisableSSLCertVerification(!validateCertificates) - .setSocketTimeout(socketTimeout) - .setMaxHttpConnectionPerChannel(maxHttpConnPerChannel) - .setTotalChannels(totalHecChannels) - .setEventBatchTimeout(eventBatchTimeout) - .setHttpKeepAlive(httpKeepAlive) - .setAckPollInterval(ackPollInterval) - .setAckPollThreads(ackPollThreads) - .setEnableChannelTracking(trackData) - .setTrustStorePath(trustStorePath) - .setTrustStorePassword(trustStorePassword) - .setHasCustomTrustStore(hasTrustStorePath); + .setSocketTimeout(socketTimeout) + .setMaxHttpConnectionPerChannel(maxHttpConnPerChannel) + .setTotalChannels(totalHecChannels) + .setEventBatchTimeout(eventBatchTimeout) + .setHttpKeepAlive(httpKeepAlive) + .setAckPollInterval(ackPollInterval) + .setAckPollThreads(ackPollThreads) + .setEnableChannelTracking(trackData) + .setTrustStorePath(trustStorePath) + .setTrustStorePassword(trustStorePassword) + .setHasCustomTrustStore(hasTrustStorePath); return config; } @@ -266,6 +309,8 @@ public String toString() { + "indexes:" + indexes + ", " + "sourcetypes:" + sourcetypes + ", " + "sources:" + sources + ", " + + "headerSupport:" + headerSupport + ", " + + "headerCustom:" + headerCustom + ", " + "httpKeepAlive:" + httpKeepAlive + ", " + "validateCertificates:" + validateCertificates + ", " + "trustStorePath:" + trustStorePath + ", " @@ -282,7 +327,14 @@ public String toString() { + "maxOutstandingEvents: " + maxOutstandingEvents + ", " + "maxRetries: " + maxRetries + ", " + "useRecordTimestamp: " + useRecordTimestamp + ", " - + "trackData: " + trackData; + + "hecEventFormatted" + hecEventFormatted + ", " + + "trackData: " + trackData + "," + + "headerSupport:" + headerSupport + "," + + "headerCustom:" + headerCustom + "," + + "headerIndex:" + headerIndex + "," + + "headerSource:" + headerSource + "," + + "headerSourcetype:" + headerSourcetype + "," + + "headerHost" + headerHost; } private static String[] split(String data, String sep) { diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkRecord.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkRecord.java new file mode 100644 index 00000000..7a566115 --- /dev/null +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkRecord.java @@ -0,0 +1,172 @@ +/* + * Copyright 2017-2018 Splunk, Inc.. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.splunk.kafka.connect; + +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.header.Headers; +import org.apache.kafka.connect.sink.SinkRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * SplunkSinkRecord provides helper functionality to enable Header support for the Splunk Connect for Kafka, Namely + * Header functionality introspection and comparison. + *

+ * + * @version 1.1.0 + * @since 1.1.0 + */ +public class SplunkSinkRecord { + private static final Logger log = LoggerFactory.getLogger(SplunkSinkRecord.class); + Headers headers; + SplunkSinkConnectorConfig connectorConfig; + String splunkHeaderIndex = ""; + String splunkHeaderHost = ""; + String splunkHeaderSource = ""; + String splunkHeaderSourcetype = ""; + + public SplunkSinkRecord() {} + + /** + * Creates a new Kafka Header utility object. Will take a Kafka SinkRecord and Splunk Sink Connector configuration + * and create the object based on Headers included with te Kafka Record. + * + * @param record Kafka SinkRecord to be introspected and headers retrieved from. + * @param connectorConfig Splunk Connector configuration used to determine headers of importance + * @version 1.1.0 + * @since 1.1.0 + */ + public SplunkSinkRecord(SinkRecord record, SplunkSinkConnectorConfig connectorConfig) { + this.connectorConfig = connectorConfig; + this.headers = record.headers(); + if(this.headers != null) { + setMetadataValues(); + } + } + + /** + * CompareRecordHeaders will compare a SinkRecords Header values against values that have already populate the + * Kakfa Header Utility object. This is used in batching events with the same meta-data values while using the /raw + * event point in Splunk + * + * @param record Kafka SinkRecord to be introspected and headers retrieved from. + * @version 1.1.0 + * @since 1.1.0 + */ + protected boolean compareRecordHeaders(SinkRecord record) { + headers = record.headers(); + + Header indexHeader = headers.lastWithName(connectorConfig.headerIndex); + Header hostHeader = headers.lastWithName(connectorConfig.headerHost); + Header sourceHeader = headers.lastWithName(connectorConfig.headerSource); + Header sourcetypeHeader = headers.lastWithName(connectorConfig.headerSourcetype); + + String index = ""; + String host = ""; + String source = ""; + String sourcetype = ""; + + if(indexHeader != null) { + index = indexHeader.value().toString(); + } + if(hostHeader != null) { + host = hostHeader.value().toString(); + } + if(sourceHeader != null) { + source = sourceHeader.value().toString(); + } + if(sourcetypeHeader != null) { + sourcetype = sourcetypeHeader.value().toString(); + } + + return splunkHeaderIndex.equals(index) && splunkHeaderHost.equals(host) && + splunkHeaderSource.equals(source) && splunkHeaderSourcetype.equals(sourcetype); + } + + private void setMetadataValues() { + Header indexHeader = this.headers.lastWithName(connectorConfig.headerIndex); + Header hostHeader = this.headers.lastWithName(connectorConfig.headerHost); + Header sourceHeader = this.headers.lastWithName(connectorConfig.headerSource); + Header sourcetypeHeader = this.headers.lastWithName(connectorConfig.headerSourcetype); + + if(indexHeader != null) { + splunkHeaderIndex = indexHeader.value().toString(); + } + if(hostHeader != null) { + splunkHeaderHost = hostHeader.value().toString(); + } + if(sourceHeader != null) { + splunkHeaderSource = sourceHeader.value().toString(); + } + if(sourcetypeHeader != null) { + splunkHeaderSourcetype = sourcetypeHeader.value().toString(); + } + } + + public String id() { + String separator = "$$$"; + return new StringBuilder() + .append(splunkHeaderIndex) + .append(separator) + .append(splunkHeaderHost) + .append(separator) + .append(splunkHeaderSource) + .append(separator) + .append(splunkHeaderSourcetype) + .toString(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(splunkHeaderIndex) + .append(splunkHeaderHost) + .append(splunkHeaderSource) + .append(splunkHeaderSourcetype) + .toHashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof SplunkSinkRecord) { + final SplunkSinkRecord other = (SplunkSinkRecord) obj; + return id().equals(other.id()); + } + return false; + } + + public Headers getHeaders() { + return headers; + } + + public String getSplunkHeaderIndex() { + return splunkHeaderIndex; + } + + public String getSplunkHeaderHost() { + return splunkHeaderHost; + } + + public String getSplunkHeaderSource() { + return splunkHeaderSource; + } + + public String getSplunkHeaderSourcetype() { + return splunkHeaderSourcetype; + } +} \ No newline at end of file diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java index 8a768686..9b2c4dae 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java @@ -16,12 +16,14 @@ package com.splunk.kafka.connect; import com.splunk.hecclient.*; -import com.splunk.kafka.connect.VersionUtils; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.errors.RetriableException; +import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.connect.header.Header; import java.util.*; @@ -31,6 +33,7 @@ public final class SplunkSinkTask extends SinkTask implements PollerCallback { private static final Logger log = LoggerFactory.getLogger(SplunkSinkTask.class); private static final long flushWindow = 30 * 1000; // 30 seconds + private static final String HEADERTOKEN = "$$$"; private HecInf hec; private KafkaRecordTracker tracker; @@ -139,7 +142,9 @@ private void preventTooManyOutstandingEvents() { } private void handleRaw(final Collection records) { - if (connectorConfig.hasMetaDataConfigured()) { + if(connectorConfig.headerSupport) { + if(records != null) { handleRecordsWithHeader(records); } + } else if (connectorConfig.hasMetaDataConfigured()) { // when setup metadata - index, source, sourcetype, we need partition records for /raw Map> partitionedRecords = partitionRecords(records); for (Map.Entry> entry: partitionedRecords.entrySet()) { @@ -152,6 +157,69 @@ private void handleRaw(final Collection records) { } } + private void handleRecordsWithHeader(final Collection records) { + HashMap> recordsWithSameHeaders = new HashMap<>(); + + for (SinkRecord record : records) { + String key = headerId(record); + if (!recordsWithSameHeaders.containsKey(key)) { + ArrayList recordList = new ArrayList(); + recordsWithSameHeaders.put(key, recordList); + } + ArrayList recordList = recordsWithSameHeaders.get(key); + recordList.add(record); + } + + Iterator>> itr = recordsWithSameHeaders.entrySet().iterator(); + while(itr.hasNext()) { + Map.Entry set = itr.next(); + String splunkSinkRecordKey = (String)set.getKey(); + ArrayList recordArrayList = (ArrayList)set.getValue(); + EventBatch batch = createRawHeaderEventBatch(splunkSinkRecordKey); + sendEvents(recordArrayList, batch); + } + log.debug("{} records have been bucketed in to {} batches", records.size(), recordsWithSameHeaders.size()); + } + + public String headerId(SinkRecord sinkRecord) { + Headers headers = sinkRecord.headers(); + + Header indexHeader = headers.lastWithName(connectorConfig.headerIndex); + Header hostHeader = headers.lastWithName(connectorConfig.headerHost); + Header sourceHeader = headers.lastWithName(connectorConfig.headerSource); + Header sourcetypeHeader = headers.lastWithName(connectorConfig.headerSourcetype); + + StringBuilder headerString = new StringBuilder(); + + if(indexHeader != null) { + headerString.append(indexHeader.value().toString()); + } + + headerString.append(insertHeaderToken()); + + if(hostHeader != null) { + headerString.append(hostHeader.value().toString()); + } + + headerString.append(insertHeaderToken()); + + if(sourceHeader != null) { + headerString.append(sourceHeader.value().toString()); + } + + headerString.append(insertHeaderToken()); + + if(sourcetypeHeader != null) { + headerString.append(sourcetypeHeader.value().toString()); + } + + return headerString.toString(); + } + + public String insertHeaderToken() { + return HEADERTOKEN; + } + private void handleEvent(final Collection records) { EventBatch batch = new JsonEventBatch(); sendEvents(records, batch); @@ -194,6 +262,17 @@ private void send(final EventBatch batch) { } } + private EventBatch createRawHeaderEventBatch(String splunkSinkRecord) { + String[] split = splunkSinkRecord.split("[$]{3}"); + + return RawEventBatch.factory() + .setIndex(split[0]) + .setSourcetype(split[1]) + .setSource(split[2]) + .setHost(split[3]) + .build(); + } + // setup metadata on RawEventBatch private EventBatch createRawEventBatch(final TopicPartition tp) { if (tp == null) { @@ -235,7 +314,7 @@ public String version() { public void onEventCommitted(final List batches) { // for (final EventBatch batch: batches) { - // assert batch.isCommitted(); + // assert batch.isCommitted(); // } } @@ -250,26 +329,31 @@ private Event createHecEventFrom(final SinkRecord record) { if (connectorConfig.raw) { RawEvent event = new RawEvent(record.value(), record); event.setLineBreaker(connectorConfig.lineBreaker); + if(connectorConfig.headerSupport) { + event = (RawEvent)addHeaders(event, record); + } return event; } - // meta data for /event endpoint is per event basis - JsonEvent event = new JsonEvent(record.value(), record); - if (connectorConfig.useRecordTimestamp && record.timestamp() != null) { - // record timestamp is in milliseconds - event.setTime(record.timestamp() / 1000.0); + JsonEvent event = null; + ObjectMapper objectMapper = new ObjectMapper(); + + if(connectorConfig.hecEventFormatted) { + try { + event = objectMapper.readValue(record.value().toString(), JsonEvent.class); + } catch(Exception e) { + log.error("event does not follow correct HEC pre-formatted format", record.toString()); + event = createHECEventNonFormatted(record); + } + } else { + event = createHECEventNonFormatted(record); } - Map metas = connectorConfig.topicMetas.get(record.topic()); - if (metas != null) { - event.setIndex(metas.get(SplunkSinkConnectorConfig.INDEX)); - event.setSourcetype(metas.get(SplunkSinkConnectorConfig.SOURCETYPE)); - event.setSource(metas.get(SplunkSinkConnectorConfig.SOURCE)); - event.addFields(connectorConfig.enrichments); + if(connectorConfig.headerSupport) { + addHeaders(event, record); } if (connectorConfig.trackData) { - // for data loss, latency tracking Map trackMetas = new HashMap<>(); trackMetas.put("kafka_offset", String.valueOf(record.kafkaOffset())); trackMetas.put("kafka_timestamp", String.valueOf(record.timestamp())); @@ -277,12 +361,67 @@ private Event createHecEventFrom(final SinkRecord record) { trackMetas.put("kafka_partition", String.valueOf(record.kafkaPartition())); event.addFields(trackMetas); } - event.validate(); return event; } + private Event addHeaders(Event event, SinkRecord record) { + Headers headers = record.headers(); + if(headers.isEmpty() && connectorConfig.headerCustom.isEmpty()) { + return event; + } + + Header headerIndex = headers.lastWithName(connectorConfig.headerIndex); + Header headerHost = headers.lastWithName(connectorConfig.headerHost); + Header headerSource = headers.lastWithName(connectorConfig.headerSource); + Header headerSourcetype = headers.lastWithName(connectorConfig.headerSourcetype); + + if (headerIndex != null) { + event.setIndex(headerIndex.value().toString()); + } + if (headerHost != null) { + event.setHost(headerHost.value().toString()); + } + if (headerSource != null) { + event.setSource(headerSource.value().toString()); + } + if (headerSourcetype != null) { + event.setSourcetype(headerSourcetype.value().toString()); + } + + // Custom headers are configured with a comma separated list passed in configuration + // "custom_header_1,custom_header_2,custom_header_3" + if (!connectorConfig.headerCustom.isEmpty()) { + String[] customHeaders = connectorConfig.headerCustom.split(","); + Map headerMap = new HashMap<>(); + for (String header : customHeaders) { + Header customHeader = headers.lastWithName(header); + if (customHeader != null) { + headerMap.put(header, customHeader.value().toString()); + } + } + event.addFields(headerMap); + } + return event; + } + + private JsonEvent createHECEventNonFormatted(final SinkRecord record) { + JsonEvent event = new JsonEvent(record.value(), record); + if (connectorConfig.useRecordTimestamp && record.timestamp() != null) { + event.setTime(record.timestamp() / 1000.0); // record timestamp is in milliseconds + } + + Map metas = connectorConfig.topicMetas.get(record.topic()); + if (metas != null) { + event.setIndex(metas.get(SplunkSinkConnectorConfig.INDEX)); + event.setSourcetype(metas.get(SplunkSinkConnectorConfig.SOURCETYPE)); + event.setSource(metas.get(SplunkSinkConnectorConfig.SOURCE)); + event.addFields(connectorConfig.enrichments); + } + return event; + } + private Event createHecEventFromMalformed(final SinkRecord record) { Object data; if (connectorConfig.raw) { diff --git a/src/main/resources/version.properties b/src/main/resources/version.properties index f306651a..4ddaa874 100644 --- a/src/main/resources/version.properties +++ b/src/main/resources/version.properties @@ -1,3 +1,3 @@ -githash=@0f3e74e -gitbranch=issue117-rename-directories -gitversion=v1.0.0-LAR +githash=@1e92a95 +gitbranch=release/1.0.x +gitversion=v1.0.0 diff --git a/src/test/java/com/splunk/hecclient/RawEventBatchTest.java b/src/test/java/com/splunk/hecclient/RawEventBatchTest.java index 57a2473d..495fd8d3 100644 --- a/src/test/java/com/splunk/hecclient/RawEventBatchTest.java +++ b/src/test/java/com/splunk/hecclient/RawEventBatchTest.java @@ -103,4 +103,23 @@ public void getter() { Assert.assertEquals("index", batch.getIndex()); Assert.assertEquals(1, batch.getTime()); } + + @Test + public void checkEquals() { + RawEventBatch batchOne = RawEventBatch.factory() + .setSource("source3") + .setIndex("idx1") + .setSourcetype("sourcetype2") + .setHost("host4") + .build(); + + RawEventBatch batchTwo = RawEventBatch.factory() + .setSource("source") + .setIndex("idx") + .setSourcetype("1sourcetype2") + .setHost("3host4") + .build(); + + Assert.assertFalse(batchOne.equals(batchTwo)); + } } diff --git a/src/test/java/com/splunk/kafka/connect/ConfigProfile.java b/src/test/java/com/splunk/kafka/connect/ConfigProfile.java index f49ea8ca..215e393a 100644 --- a/src/test/java/com/splunk/kafka/connect/ConfigProfile.java +++ b/src/test/java/com/splunk/kafka/connect/ConfigProfile.java @@ -28,6 +28,13 @@ public class ConfigProfile { private boolean trackData; private int maxBatchSize; private int numOfThreads; + private boolean headerSupport; + private boolean hecFormatted; + private String headerCustom; + private String headerIndex; + private String headerSource; + private String headerSourcetype; + private String headerHost; public ConfigProfile() { this(0); @@ -41,6 +48,8 @@ public ConfigProfile(int profile) { break; case 2: buildProfileTwo(); break; + case 3: buildProfileThree(); + break; default: buildProfileDefault(); break; } @@ -143,6 +152,37 @@ public ConfigProfile buildProfileTwo() { return this; } + public ConfigProfile buildProfileThree() { + this.topics = "kafka-data"; + this.token = "mytoken"; + this.uri = "https://dummy:8088"; + this.raw = true; + this.ack = false; + this.indexes = "index-1"; + this.sourcetypes = "kafka-data"; + this.sources = "kafka-connect"; + this.httpKeepAlive = true; + this.validateCertificates = false; + this.eventBatchTimeout = 1; + this.ackPollInterval = 1; + this.ackPollThreads = 1; + this.maxHttpConnPerChannel = 1; + this.totalHecChannels = 1; + this.socketTimeout = 1; + this.enrichements = "hello=world"; + this.enrichementMap = new HashMap<>(); + this.trackData = false; + this.maxBatchSize = 1; + this.numOfThreads = 1; + this.headerSupport = true; + this.headerIndex = "splunk.header.index"; + this.headerSource = "splunk.header.source"; + this.headerSourcetype = "splunk.header.sourcetype"; + this.headerHost = "splunk.header.host"; + + return this; + } + public String getTopics() { return topics; } @@ -335,6 +375,38 @@ public void setNumOfThreads(int numOfThreads) { this.numOfThreads = numOfThreads; } + public String getHeaderIndex() { + return headerIndex; + } + + public void setHeaderIndex(String headerIndex) { + this.headerIndex = headerIndex; + } + + public String getHeaderSource() { + return headerSource; + } + + public void setHeaderSource(String headerSource) { + this.headerSource = headerSource; + } + + public String getHeaderSourcetype() { + return headerSourcetype; + } + + public void setHeaderSourcetype(String headerSourcetype) { + this.headerSourcetype = headerSourcetype; + } + + public String getHeaderHost() { + return headerHost; + } + + public void setHeaderHost(String headerHost) { + this.headerHost = headerHost; + } + @Override public String toString() { return "ConfigProfile{" + "topics='" + topics + '\'' + ", token='" + token + '\'' + ", uri='" + uri + '\'' + ", raw=" + raw + ", ack=" + ack + ", indexes='" + indexes + '\'' + ", sourcetypes='" + sourcetypes + '\'' + ", sources='" + sources + '\'' + ", httpKeepAlive=" + httpKeepAlive + ", validateCertificates=" + validateCertificates + ", hasTrustStorePath=" + hasTrustStorePath + ", trustStorePath='" + trustStorePath + '\'' + ", trustStorePassword='" + trustStorePassword + '\'' + ", eventBatchTimeout=" + eventBatchTimeout + ", ackPollInterval=" + ackPollInterval + ", ackPollThreads=" + ackPollThreads + ", maxHttpConnPerChannel=" + maxHttpConnPerChannel + ", totalHecChannels=" + totalHecChannels + ", socketTimeout=" + socketTimeout + ", enrichements='" + enrichements + '\'' + ", enrichementMap=" + enrichementMap + ", trackData=" + trackData + ", maxBatchSize=" + maxBatchSize + ", numOfThreads=" + numOfThreads + '}'; } diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkRecordTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkRecordTest.java new file mode 100644 index 00000000..a8a58c65 --- /dev/null +++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkRecordTest.java @@ -0,0 +1,107 @@ +/* + * Copyright 2017-2018 Splunk, Inc.. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.splunk.kafka.connect; + +import org.apache.kafka.connect.header.Headers; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.Assert; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +public class SplunkSinkRecordTest { + + @Test + public void checkKafkaHeaderUtilityGetters() { + UnitUtil uu = new UnitUtil(3); + Map config = uu.createTaskConfig(); + + SplunkSinkConnectorConfig connectorConfig = new SplunkSinkConnectorConfig(config); + + SinkRecord record = setupRecord(); + Headers headers = record.headers(); + + headers.addString(uu.configProfile.getHeaderIndex(), "splunk.header.index"); + headers.addString(uu.configProfile.getHeaderHost(), "splunk.header.host"); + headers.addString(uu.configProfile.getHeaderSource(), "splunk.header.source"); + headers.addString(uu.configProfile.getHeaderSourcetype(), "splunk.header.sourcetype"); + + System.out.println(headers.toString()); + + SplunkSinkRecord splunkSinkRecord = new SplunkSinkRecord(record, connectorConfig); + + Assert.assertEquals("splunk.header.index", (splunkSinkRecord.getSplunkHeaderIndex())); + Assert.assertEquals("splunk.header.host", (splunkSinkRecord.getSplunkHeaderHost())); + Assert.assertEquals("splunk.header.source", (splunkSinkRecord.getSplunkHeaderSource())); + Assert.assertEquals("splunk.header.sourcetype", (splunkSinkRecord.getSplunkHeaderSourcetype())); + } + + @Test + public void CompareRecordHeaders() { + UnitUtil uu = new UnitUtil(3); + Map config = uu.createTaskConfig(); + + SinkRecord record_1 = setupRecord(); + + Headers headers_1 = record_1.headers(); + headers_1.addString("splunk.header.index", "header-index"); + headers_1.addString("splunk.header.host", "header.splunk.com"); + headers_1.addString("splunk.header.source", "headersource"); + headers_1.addString("splunk.header.sourcetype", "test message"); + + SplunkSinkConnectorConfig connectorConfig = new SplunkSinkConnectorConfig(config); + + SplunkSinkRecord splunkSinkRecord = new SplunkSinkRecord(record_1, connectorConfig); + + SinkRecord record_2 = setupRecord(); + + Headers headers_2 = record_2.headers(); + headers_2.addString("splunk.header.index", "header-index"); + headers_2.addString("splunk.header.host", "header.splunk.com"); + headers_2.addString("splunk.header.source", "headersource"); + headers_2.addString("splunk.header.sourcetype", "test message"); + + Assert.assertTrue(splunkSinkRecord.compareRecordHeaders(record_2)); + + SinkRecord record_3 = setupRecord(); + + Headers headers_3 = record_3.headers(); + headers_3.addString("splunk.header.index", "header-index=diff"); + headers_3.addString("splunk.header.host", "header.splunk.com"); + headers_3.addString("splunk.header.source", "headersource"); + headers_3.addString("splunk.header.sourcetype", "test message"); + + Assert.assertFalse(splunkSinkRecord.compareRecordHeaders(record_3)); + } + + public SinkRecord setupRecord() { + String topic = "test-topic"; + int partition = 1; + Schema keySchema = null; + Object key = "key"; + Schema valueSchema = null; + Object value = "value"; + long timestamp = System.currentTimeMillis(); + + SinkRecord record = createMockSinkRecord(topic, partition, keySchema, key, valueSchema, value, timestamp); + return record; + } + + public SinkRecord createMockSinkRecord(String topic, int partition, Schema keySchema, Object key, Schema valueSchema, Object value, long timestamp) { + return new SinkRecord(topic, partition, keySchema, key, valueSchema, value, timestamp); + } +} \ No newline at end of file