From 7aeb6c408c07835a5a97c7b4ffbee3338f39c1c8 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 15 May 2019 15:44:56 -0700 Subject: [PATCH 1/6] fix --- .../streaming/continuous/ContinuousTrigger.scala | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala index fd0ff31199bd..5e9351165c47 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala @@ -21,8 +21,6 @@ import java.util.concurrent.TimeUnit import scala.concurrent.duration.Duration -import org.apache.commons.lang3.StringUtils - import org.apache.spark.annotation.Evolving import org.apache.spark.sql.streaming.Trigger import org.apache.spark.unsafe.types.CalendarInterval @@ -38,18 +36,7 @@ case class ContinuousTrigger(intervalMs: Long) extends Trigger { private[sql] object ContinuousTrigger { def apply(interval: String): ContinuousTrigger = { - if (StringUtils.isBlank(interval)) { - throw new IllegalArgumentException( - "interval cannot be null or blank.") - } - val cal = if (interval.startsWith("interval")) { - CalendarInterval.fromString(interval) - } else { - CalendarInterval.fromString("interval " + interval) - } - if (cal == null) { - throw new IllegalArgumentException(s"Invalid interval: $interval") - } + val cal = CalendarInterval.fromUserString(interval) if (cal.months > 0) { throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval") } From 2bca6554d8521ad3dc5090c22681a36fcd52cbe8 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 15 May 2019 15:45:14 -0700 Subject: [PATCH 2/6] fix --- .../spark/unsafe/types/CalendarInterval.java | 25 +++++++++++++++++++ .../unsafe/types/CalendarIntervalSuite.java | 25 +++++++++++++++++++ .../sql/catalyst/expressions/TimeWindow.scala | 15 +---------- .../scala/org/apache/spark/sql/Dataset.scala | 8 ++++-- .../execution/streaming/GroupStateImpl.scala | 15 +---------- .../spark/sql/streaming/ProcessingTime.scala | 13 +--------- 6 files changed, 59 insertions(+), 42 deletions(-) diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java index 611b2a217aad..b0c72acc20b1 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java @@ -18,6 +18,7 @@ package org.apache.spark.unsafe.types; import java.io.Serializable; +import java.util.Locale; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -66,6 +67,10 @@ private static long toLong(String s) { } } + /** + * Convert a string to CalendarInterval. Return null if the input string is not a valid interval. + * This method is case-sensitive and all characters in the input string should be in lower case. + */ public static CalendarInterval fromString(String s) { if (s == null) { return null; @@ -87,6 +92,26 @@ public static CalendarInterval fromString(String s) { } } + /** + * Convert a string to CalendarInterval. Unlike fromString, this method is case-insensitive and + * will throw IllegalArgumentException when the input string is not a valid interval. + * + * @throws IllegalArgumentException if the string is not a valid internal. + */ + public static CalendarInterval fromCaseInsensitiveString(String s) { + if (s == null || s.trim().isEmpty()) { + throw new IllegalArgumentException("Interval cannot be null or blank."); + } + String sInLowerCase = s.trim().toLowerCase(Locale.ROOT); + String interval = + sInLowerCase.startsWith("interval ") ? sInLowerCase : "interval " + sInLowerCase; + CalendarInterval cal = CalendarInterval.fromString(interval); + if (cal == null) { + throw new IllegalArgumentException("Invalid interval: " + s); + } + return cal; + } + public static long toLongWithRange(String fieldName, String s, long minValue, long maxValue) throws IllegalArgumentException { long result = 0; diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java index 1e556913b2dc..e5319e26ccd8 100644 --- a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java +++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java @@ -104,6 +104,31 @@ public void fromStringTest() { assertNull(fromString(input)); } + @Test + public void fromUserStringTest() { + for (String input : new String[]{"5 MINUTES", "5 minutes", "5 Minutes"}) { + assertEquals(fromCaseInsensitiveString(input), new CalendarInterval(0, 5L * 60 * 1_000_000)); + } + + for (String input : new String[]{null, "", " "}) { + try { + fromCaseInsensitiveString(input); + fail("Expected to throw an exception for the invalid input"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("cannot be null or blank")); + } + } + + for (String input : new String[]{"interval", "interval1 day", "foo", "foo 1 day"}) { + try { + fromCaseInsensitiveString(input); + fail("Expected to throw an exception for the invalid input"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("Invalid interval")); + } + } + } + @Test public void fromYearMonthStringTest() { String input; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala index 8e48856d4607..0afd60011be3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala @@ -104,20 +104,7 @@ object TimeWindow { * precision. */ private def getIntervalInMicroSeconds(interval: String): Long = { - if (StringUtils.isBlank(interval)) { - throw new IllegalArgumentException( - "The window duration, slide duration and start time cannot be null or blank.") - } - val intervalString = if (interval.startsWith("interval")) { - interval - } else { - "interval " + interval - } - val cal = CalendarInterval.fromString(intervalString) - if (cal == null) { - throw new IllegalArgumentException( - s"The provided interval ($interval) did not correspond to a valid interval string.") - } + val cal = CalendarInterval.fromUserString(interval) if (cal.months > 0) { throw new IllegalArgumentException( s"Intervals greater than a month is not supported ($interval).") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 5d6e5306f174..6219c0ac30fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -695,8 +695,12 @@ class Dataset[T] private[sql]( // defined on a derived column cannot referenced elsewhere in the plan. def withWatermark(eventTime: String, delayThreshold: String): Dataset[T] = withTypedPlan { val parsedDelay = - Option(CalendarInterval.fromString("interval " + delayThreshold)) - .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) + try { + CalendarInterval.fromUserString("interval " + delayThreshold) + } catch { + case e: IllegalArgumentException => + throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'", Some(e)) + } require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0, s"delay threshold ($delayThreshold) should not be negative.") EliminateEventTimeWatermark( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala index fcb230bd088d..960e2ab8c41e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala @@ -161,20 +161,7 @@ private[sql] class GroupStateImpl[S] private( def getTimeoutTimestamp: Long = timeoutTimestamp private def parseDuration(duration: String): Long = { - if (StringUtils.isBlank(duration)) { - throw new IllegalArgumentException( - "Provided duration is null or blank.") - } - val intervalString = if (duration.startsWith("interval")) { - duration - } else { - "interval " + duration - } - val cal = CalendarInterval.fromString(intervalString) - if (cal == null) { - throw new IllegalArgumentException( - s"Provided duration ($duration) is not valid.") - } + val cal = CalendarInterval.fromUserString(duration) if (cal.milliseconds < 0 || cal.months < 0) { throw new IllegalArgumentException(s"Provided duration ($duration) is not positive") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala index 38b0776ec1fe..5482196274f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala @@ -76,18 +76,7 @@ object ProcessingTime { */ @deprecated("use Trigger.ProcessingTime(interval)", "2.2.0") def apply(interval: String): ProcessingTime = { - if (StringUtils.isBlank(interval)) { - throw new IllegalArgumentException( - "interval cannot be null or blank.") - } - val cal = if (interval.startsWith("interval")) { - CalendarInterval.fromString(interval) - } else { - CalendarInterval.fromString("interval " + interval) - } - if (cal == null) { - throw new IllegalArgumentException(s"Invalid interval: $interval") - } + val cal = CalendarInterval.fromUserString(interval) if (cal.months > 0) { throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval") } From 02f6f33e9bcb7b1528fa1f06706cdc22a1f2c903 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 15 May 2019 15:51:51 -0700 Subject: [PATCH 3/6] fix --- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 6219c0ac30fd..74cb3e627432 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -696,10 +696,12 @@ class Dataset[T] private[sql]( def withWatermark(eventTime: String, delayThreshold: String): Dataset[T] = withTypedPlan { val parsedDelay = try { - CalendarInterval.fromUserString("interval " + delayThreshold) + CalendarInterval.fromCaseInsensitiveString(delayThreshold) } catch { case e: IllegalArgumentException => - throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'", Some(e)) + throw new AnalysisException( + s"Unable to parse time delay '$delayThreshold'", + cause = Some(e)) } require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0, s"delay threshold ($delayThreshold) should not be negative.") From 2141445ac754c71047c9673d44fa0828426c83d9 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 15 May 2019 16:02:43 -0700 Subject: [PATCH 4/6] minor --- .../java/org/apache/spark/unsafe/types/CalendarInterval.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java index b0c72acc20b1..e36efa3b0f22 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java @@ -105,7 +105,7 @@ public static CalendarInterval fromCaseInsensitiveString(String s) { String sInLowerCase = s.trim().toLowerCase(Locale.ROOT); String interval = sInLowerCase.startsWith("interval ") ? sInLowerCase : "interval " + sInLowerCase; - CalendarInterval cal = CalendarInterval.fromString(interval); + CalendarInterval cal = fromString(interval); if (cal == null) { throw new IllegalArgumentException("Invalid interval: " + s); } From 50507c55f15fa6aa1033fcd6ece2ce4fc7172ae9 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 15 May 2019 17:51:32 -0700 Subject: [PATCH 5/6] fix --- .../org/apache/spark/unsafe/types/CalendarIntervalSuite.java | 2 +- .../org/apache/spark/sql/catalyst/expressions/TimeWindow.scala | 2 +- .../apache/spark/sql/execution/streaming/GroupStateImpl.scala | 2 +- .../sql/execution/streaming/continuous/ContinuousTrigger.scala | 2 +- .../scala/org/apache/spark/sql/streaming/ProcessingTime.scala | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java index e5319e26ccd8..994af8f08244 100644 --- a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java +++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java @@ -105,7 +105,7 @@ public void fromStringTest() { } @Test - public void fromUserStringTest() { + public void fromCaseInsensitiveStringTest() { for (String input : new String[]{"5 MINUTES", "5 minutes", "5 Minutes"}) { assertEquals(fromCaseInsensitiveString(input), new CalendarInterval(0, 5L * 60 * 1_000_000)); } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala index 0afd60011be3..0c85939a21ac 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala @@ -104,7 +104,7 @@ object TimeWindow { * precision. */ private def getIntervalInMicroSeconds(interval: String): Long = { - val cal = CalendarInterval.fromUserString(interval) + val cal = CalendarInterval.fromCaseInsensitiveString(interval) if (cal.months > 0) { throw new IllegalArgumentException( s"Intervals greater than a month is not supported ($interval).") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala index 960e2ab8c41e..26ec39ef8f6e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala @@ -161,7 +161,7 @@ private[sql] class GroupStateImpl[S] private( def getTimeoutTimestamp: Long = timeoutTimestamp private def parseDuration(duration: String): Long = { - val cal = CalendarInterval.fromUserString(duration) + val cal = CalendarInterval.fromCaseInsensitiveString(duration) if (cal.milliseconds < 0 || cal.months < 0) { throw new IllegalArgumentException(s"Provided duration ($duration) is not positive") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala index 5e9351165c47..bd343f380603 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala @@ -36,7 +36,7 @@ case class ContinuousTrigger(intervalMs: Long) extends Trigger { private[sql] object ContinuousTrigger { def apply(interval: String): ContinuousTrigger = { - val cal = CalendarInterval.fromUserString(interval) + val cal = CalendarInterval.fromCaseInsensitiveString(interval) if (cal.months > 0) { throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala index 5482196274f1..c4af410849d5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala @@ -76,7 +76,7 @@ object ProcessingTime { */ @deprecated("use Trigger.ProcessingTime(interval)", "2.2.0") def apply(interval: String): ProcessingTime = { - val cal = CalendarInterval.fromUserString(interval) + val cal = CalendarInterval.fromCaseInsensitiveString(interval) if (cal.months > 0) { throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval") } From 6e4b73332137fe81d285707f15d43bbcc9432763 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 16 May 2019 10:50:52 -0700 Subject: [PATCH 6/6] remove unused imports --- .../org/apache/spark/sql/catalyst/expressions/TimeWindow.scala | 2 -- .../apache/spark/sql/execution/streaming/GroupStateImpl.scala | 2 -- .../scala/org/apache/spark/sql/streaming/ProcessingTime.scala | 2 -- 3 files changed, 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala index 0c85939a21ac..9aae678deb4b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.catalyst.expressions -import org.apache.commons.lang3.StringUtils - import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala index 26ec39ef8f6e..dda9d41f630e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala @@ -20,8 +20,6 @@ package org.apache.spark.sql.execution.streaming import java.sql.Date import java.util.concurrent.TimeUnit -import org.apache.commons.lang3.StringUtils - import org.apache.spark.sql.catalyst.plans.logical.{EventTimeTimeout, ProcessingTimeTimeout} import org.apache.spark.sql.execution.streaming.GroupStateImpl._ import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala index c4af410849d5..417d698bdbb0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala @@ -21,8 +21,6 @@ import java.util.concurrent.TimeUnit import scala.concurrent.duration.Duration -import org.apache.commons.lang3.StringUtils - import org.apache.spark.annotation.Evolving import org.apache.spark.unsafe.types.CalendarInterval