diff --git a/dependency-reduced-pom.xml b/dependency-reduced-pom.xml index 9c8f0d30..6a6e7beb 100644 --- a/dependency-reduced-pom.xml +++ b/dependency-reduced-pom.xml @@ -204,4 +204,3 @@ 1.3.2 - diff --git a/pom.xml b/pom.xml index 2ee8669a..9bc9627e 100644 --- a/pom.xml +++ b/pom.xml @@ -342,4 +342,4 @@ - + \ No newline at end of file diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java index 84097c79..b84dd277 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java @@ -98,6 +98,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { static final String ENABLE_TIMESTAMP_EXTRACTION_CONF = "enable.timestamp.extraction"; static final String REGEX_CONF = "timestamp.regex"; static final String TIMESTAMP_FORMAT_CONF = "timestamp.format"; + static final String TIMESTAMP_TIMEZONE_CONF = "timestamp.timezone"; // Kafka configuration description strings // Required Parameters @@ -206,6 +207,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { static final String ENABLE_TIMESTAMP_EXTRACTION_DOC = "Set to true if you want to extract the timestamp"; static final String REGEX_DOC = "Regex"; static final String TIMESTAMP_FORMAT_DOC = "Timestamp format"; + static final String TIMESTAMP_TIMEZONE_DOC = "Timestamp timezone"; final String splunkToken; final String splunkURI; @@ -263,6 +265,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { final String timestampFormat; final int queueCapacity; + final String timeZone; + SplunkSinkConnectorConfig(Map taskConfig) { super(conf(), taskConfig); splunkToken = getPassword(TOKEN_CONF).value(); @@ -316,6 +320,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { enableTimestampExtraction = getBoolean(ENABLE_TIMESTAMP_EXTRACTION_CONF); regex = getString(REGEX_CONF); timestampFormat = getString(TIMESTAMP_FORMAT_CONF).trim(); + timeZone = getString(TIMESTAMP_TIMEZONE_CONF); validateRegexForTimestamp(regex); queueCapacity = getInt(QUEUE_CAPACITY_CONF); validateQueueCapacity(queueCapacity); @@ -367,7 +372,8 @@ public static ConfigDef conf() { .define(ENABLE_TIMESTAMP_EXTRACTION_CONF, ConfigDef.Type.BOOLEAN, false , ConfigDef.Importance.MEDIUM, ENABLE_TIMESTAMP_EXTRACTION_DOC) .define(REGEX_CONF, ConfigDef.Type.STRING, "" , ConfigDef.Importance.MEDIUM, REGEX_DOC) .define(TIMESTAMP_FORMAT_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, TIMESTAMP_FORMAT_DOC) - .define(QUEUE_CAPACITY_CONF, ConfigDef.Type.INT, 100, ConfigDef.Importance.LOW, QUEUE_CAPACITY_DOC); + .define(TIMESTAMP_TIMEZONE_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, TIMESTAMP_TIMEZONE_DOC) + .define(QUEUE_CAPACITY_CONF, ConfigDef.Type.INT, 100, ConfigDef.Importance.LOW, QUEUE_CAPACITY_DOC); } /** diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java index d51b8b98..a970c567 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java @@ -579,8 +579,12 @@ private void timestampExtraction(Event event) { } } else { SimpleDateFormat df = new SimpleDateFormat(connectorConfig.timestampFormat); + df.setTimeZone(TimeZone.getTimeZone("UTC")); Date date; try { + if(!connectorConfig.timeZone.isEmpty()) + df.setTimeZone(TimeZone.getTimeZone(connectorConfig.timeZone)); + date = df.parse(timestamp); event.setTime(date.getTime() / 1000.0); } catch (ParseException e) { diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java index f5ae931e..d4d08d95 100644 --- a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java +++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java @@ -274,7 +274,69 @@ public void checkExtractedTimestamp() { } task.stop(); } - + + @Test + public void checkExtractedTimestampWithTimezone() { + SplunkSinkTask task = new SplunkSinkTask(); + Collection record = createSinkRecords(1,"{\"id\": \"19\",\"host\":\"host-01\",\"source\":\"bu\",\"fields\":{\"hn\":\"hostname1\",\"CLASS\":\"class1\",\"cust_id\":\"000013934\",\"REQ_TIME\": \"20230904133016993\",\"category\":\"IFdata\",\"ifname\":\"LoopBack7\",\"IFdata.Bits received\":\"0\",\"IFdata.Bits sent\":\"0\"}"); + + UnitUtil uu = new UnitUtil(0); + Map config = uu.createTaskConfig(); + config.put(SplunkSinkConnectorConfig.RAW_CONF, String.valueOf(false)); + config.put(SplunkSinkConnectorConfig.ENABLE_TIMESTAMP_EXTRACTION_CONF, String.valueOf(true)); + config.put(SplunkSinkConnectorConfig.REGEX_CONF, "\\\"REQ_TIME\\\":\\s*\\\"(?