From 9576ca751fe1b8302924f0c64f09225c3cd53b2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=AA=A8=EC=83=81=ED=98=81?= Date: Mon, 28 Aug 2023 11:25:42 +0900 Subject: [PATCH 1/5] added a timestamp.timezone configuration in SplunkSinkConnectorConfig for the connector config --- dependency-reduced-pom.xml | 8 +++++++- pom.xml | 2 +- .../splunk/kafka/connect/SplunkSinkConnectorConfig.java | 8 +++++++- .../java/com/splunk/kafka/connect/SplunkSinkTask.java | 5 ++++- .../java/com/splunk/kafka/connect/SplunkSinkTaskTest.java | 1 + 5 files changed, 20 insertions(+), 4 deletions(-) diff --git a/dependency-reduced-pom.xml b/dependency-reduced-pom.xml index b30ed2d3..6de8fb3e 100644 --- a/dependency-reduced-pom.xml +++ b/dependency-reduced-pom.xml @@ -4,7 +4,7 @@ com.github.splunk.kafka.connect splunk-kafka-connect splunk-kafka-connect - v2.1.0 + v2.1.2 @@ -107,6 +107,12 @@ + + + confluent + https://packages.confluent.io/maven/ + + org.junit.jupiter diff --git a/pom.xml b/pom.xml index 70a611f1..c8d4c3dd 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.github.splunk.kafka.connect splunk-kafka-connect - v2.1.0 + v2.1.2 splunk-kafka-connect diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java index 2d18f587..e1bf1744 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java @@ -95,6 +95,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { static final String ENABLE_TIMESTAMP_EXTRACTION_CONF = "enable.timestamp.extraction"; static final String REGEX_CONF = "timestamp.regex"; static final String TIMESTAMP_FORMAT_CONF = "timestamp.format"; + static final String TIMESTAMP_TIMEZONE_CONF = "timestamp.timezone"; // Kafka configuration description strings // Required Parameters @@ -202,6 +203,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { static final String ENABLE_TIMESTAMP_EXTRACTION_DOC = "Set to true if you want to extract the timestamp"; static final String REGEX_DOC = "Regex"; static final String TIMESTAMP_FORMAT_DOC = "Timestamp format"; + static final String TIMESTAMP_TIMEZONE_DOC = "Timestamp timezone"; final String splunkToken; final String splunkURI; @@ -258,6 +260,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { final String regex; final String timestampFormat; + final String timeZone; + SplunkSinkConnectorConfig(Map taskConfig) { super(conf(), taskConfig); splunkToken = getPassword(TOKEN_CONF).value(); @@ -311,6 +315,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { enableTimestampExtraction = getBoolean(ENABLE_TIMESTAMP_EXTRACTION_CONF); regex = getString(REGEX_CONF); timestampFormat = getString(TIMESTAMP_FORMAT_CONF).trim(); + timeZone = getString(TIMESTAMP_TIMEZONE_CONF); validateRegexForTimestamp(regex); } @@ -360,7 +365,8 @@ public static ConfigDef conf() { .define(KERBEROS_KEYTAB_PATH_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, KERBEROS_KEYTAB_LOCATION_DOC) .define(ENABLE_TIMESTAMP_EXTRACTION_CONF, ConfigDef.Type.BOOLEAN, false , ConfigDef.Importance.MEDIUM, ENABLE_TIMESTAMP_EXTRACTION_DOC) .define(REGEX_CONF, ConfigDef.Type.STRING, "" , ConfigDef.Importance.MEDIUM, REGEX_DOC) - .define(TIMESTAMP_FORMAT_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, TIMESTAMP_FORMAT_DOC); + .define(TIMESTAMP_FORMAT_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, TIMESTAMP_FORMAT_DOC) + .define(TIMESTAMP_TIMEZONE_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, TIMESTAMP_TIMEZONE_DOC); } /** diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java index 9179a9f1..b459009f 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java @@ -578,9 +578,12 @@ private void timestampExtraction(Event event) { log.warn("Could not set the time", e); } } else { - SimpleDateFormat df = new SimpleDateFormat(connectorConfig.timestampFormat); + SimpleDateFormat df = new SimpleDateFormat(connectorConfig.timestampFormat,Locale.ENGLISH); Date date; try { + if(!connectorConfig.timeZone.isEmpty()) + df.setTimeZone(TimeZone.getTimeZone(connectorConfig.timeZone)); + date = df.parse(timestamp); event.setTime(date.getTime() / 1000.0); } catch (ParseException e) { diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java index f5ae931e..d1c9635a 100644 --- a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java +++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java @@ -257,6 +257,7 @@ public void checkExtractedTimestamp() { config.put(SplunkSinkConnectorConfig.ENABLE_TIMESTAMP_EXTRACTION_CONF, String.valueOf(true)); config.put(SplunkSinkConnectorConfig.REGEX_CONF, "\\\"time\\\":\\s*\\\"(?