From a2d83b5635d3c3383a36681358a55963e9900b06 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 19 Oct 2019 19:33:44 +0300 Subject: [PATCH 01/11] Add getDuration() and isPositive() --- .../spark/unsafe/types/CalendarInterval.java | 22 +++++++++++++++ .../unsafe/types/CalendarIntervalSuite.java | 28 +++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java index 28fb64f7cd0e..3c4c18e3f4b1 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java @@ -18,6 +18,7 @@ package org.apache.spark.unsafe.types; import java.io.Serializable; +import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -409,4 +410,25 @@ private void appendUnit(StringBuilder sb, long value, String unit) { sb.append(' ').append(value).append(' ').append(unit).append('s'); } } + + /** + * Gets interval duration + * @param daysPerMonth the number of days per one month + * @param targetUnit time units of the result + * @return duration in the specified time units + */ + public long getDuration(int daysPerMonth, TimeUnit targetUnit) { + long monthsDuration = Math.multiplyExact(daysPerMonth * MICROS_PER_DAY, months); + long result = Math.addExact(microseconds, monthsDuration); + return targetUnit.convert(result, TimeUnit.MICROSECONDS); + } + + /** + * Checks the interval is positive + * @param daysPerMonth the number of days per one month + * @return true if duration of the given interval is greater than 0 otherwise false + */ + public boolean isPositive(int daysPerMonth) { + return getDuration(daysPerMonth, TimeUnit.MICROSECONDS) > 0; + } } diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java index 587071332ce4..140b90c6500b 100644 --- a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java +++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java @@ -20,6 +20,7 @@ import org.junit.Test; import java.util.Arrays; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.*; import static org.apache.spark.unsafe.types.CalendarInterval.*; @@ -297,4 +298,31 @@ public void fromStringCaseSensitivityTest() { assertNull(fromString("INTERVAL")); assertNull(fromString(" Interval ")); } + + @Test + public void durationTest() { + assertEquals(fromString("0 seconds").getDuration(31, TimeUnit.MILLISECONDS), 0); + assertEquals(fromString("1 month").getDuration(31, TimeUnit.DAYS), 31); + assertEquals(fromString("1 microsecond").getDuration(30, TimeUnit.MICROSECONDS), 1); + assertEquals(fromString("1 month -30 days").getDuration(31, TimeUnit.DAYS), 1); + + try { + fromString(Integer.MAX_VALUE + " month").getDuration(31, TimeUnit.SECONDS); + fail("Expected to throw an exception for the invalid input"); + } catch (ArithmeticException e) { + assertTrue(e.getMessage().contains("overflow")); + } + } + + @Test + public void pisitiveIntervalTest() { + assertTrue(fromString("1 months").isPositive(28)); + assertTrue(fromString("1 microsecond").isPositive(30)); + assertTrue(fromString("1 year -360 days").isPositive(31)); + assertTrue(fromString("-1 year 380 days").isPositive(31)); + + assertFalse(fromString("0 months").isPositive(28)); + assertFalse(fromString("-1 month 30 days").isPositive(31)); + assertFalse(fromString("2 months -60 days").isPositive(30)); + } } From f880fd3c5c082fa5f00812a34c7d5251f6db5af8 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 19 Oct 2019 19:43:52 +0300 Subject: [PATCH 02/11] Replace isPositive by isNegative --- .../spark/unsafe/types/CalendarInterval.java | 8 ++++---- .../unsafe/types/CalendarIntervalSuite.java | 19 ++++++++++--------- .../plans/logical/EventTimeWatermark.scala | 3 +-- .../execution/streaming/GroupStateImpl.scala | 3 +-- 4 files changed, 16 insertions(+), 17 deletions(-) diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java index 3c4c18e3f4b1..7947417f2352 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java @@ -424,11 +424,11 @@ public long getDuration(int daysPerMonth, TimeUnit targetUnit) { } /** - * Checks the interval is positive + * Checks the interval is negative * @param daysPerMonth the number of days per one month - * @return true if duration of the given interval is greater than 0 otherwise false + * @return true if duration of the given interval is less than 0 otherwise false */ - public boolean isPositive(int daysPerMonth) { - return getDuration(daysPerMonth, TimeUnit.MICROSECONDS) > 0; + public boolean isNegative(int daysPerMonth) { + return getDuration(daysPerMonth, TimeUnit.MICROSECONDS) < 0; } } diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java index 140b90c6500b..481dc9edc9eb 100644 --- a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java +++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java @@ -315,14 +315,15 @@ public void durationTest() { } @Test - public void pisitiveIntervalTest() { - assertTrue(fromString("1 months").isPositive(28)); - assertTrue(fromString("1 microsecond").isPositive(30)); - assertTrue(fromString("1 year -360 days").isPositive(31)); - assertTrue(fromString("-1 year 380 days").isPositive(31)); - - assertFalse(fromString("0 months").isPositive(28)); - assertFalse(fromString("-1 month 30 days").isPositive(31)); - assertFalse(fromString("2 months -60 days").isPositive(30)); + public void negativeIntervalTest() { + assertTrue(fromString("-1 months").isNegative(28)); + assertTrue(fromString("-1 microsecond").isNegative(30)); + assertTrue(fromString("-1 month 30 days").isNegative(31)); + assertTrue(fromString("2 months -61 days").isNegative(30)); + assertTrue(fromString("-1 year -2 seconds").isNegative(30)); + + assertFalse(fromString("0 months").isNegative(28)); + assertFalse(fromString("1 year -360 days").isNegative(31)); + assertFalse(fromString("-1 year 380 days").isNegative(31)); } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala index 8441c2c481ec..1b26758e4d29 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala @@ -29,8 +29,7 @@ object EventTimeWatermark { def getDelayMs(delay: CalendarInterval): Long = { // We define month as `31 days` to simplify calculation. - val millisPerMonth = TimeUnit.MICROSECONDS.toMillis(CalendarInterval.MICROS_PER_DAY) * 31 - delay.milliseconds + delay.months * millisPerMonth + delay.getDuration(31, TimeUnit.MILLISECONDS) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala index dda9d41f630e..1cb446e19ffa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala @@ -164,8 +164,7 @@ private[sql] class GroupStateImpl[S] private( throw new IllegalArgumentException(s"Provided duration ($duration) is not positive") } - val millisPerMonth = TimeUnit.MICROSECONDS.toMillis(CalendarInterval.MICROS_PER_DAY) * 31 - cal.milliseconds + cal.months * millisPerMonth + cal.getDuration(31, TimeUnit.MILLISECONDS) } private def checkTimeoutTimestampAllowed(): Unit = { From 1f5d68485bd34867d3695cb5468b0f5ff291adc7 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 19 Oct 2019 19:57:43 +0300 Subject: [PATCH 03/11] Use isNegative() --- .../spark/sql/catalyst/analysis/StreamingJoinHelper.scala | 2 +- .../apache/spark/sql/catalyst/expressions/TimeWindow.scala | 2 +- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 2 +- .../spark/sql/execution/streaming/GroupStateImpl.scala | 7 ++++--- .../apache/spark/sql/execution/streaming/Triggers.scala | 2 +- 5 files changed, 8 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala index c1d72f9b58a4..c64aeff3c238 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala @@ -256,7 +256,7 @@ object StreamingJoinHelper extends PredicateHelper with Logging { val castedLit = lit.dataType match { case CalendarIntervalType => val calendarInterval = lit.value.asInstanceOf[CalendarInterval] - if (calendarInterval.months > 0) { + if (calendarInterval.months != 0) { invalid = true logWarning( s"Failed to extract state value watermark from condition $exprToCollectFrom " + diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala index 9aae678deb4b..105c09da0e4e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala @@ -103,7 +103,7 @@ object TimeWindow { */ private def getIntervalInMicroSeconds(interval: String): Long = { val cal = CalendarInterval.fromCaseInsensitiveString(interval) - if (cal.months > 0) { + if (cal.months != 0) { throw new IllegalArgumentException( s"Intervals greater than a month is not supported ($interval).") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 076270a9f1c6..8e51de718fd6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -731,7 +731,7 @@ class Dataset[T] private[sql]( s"Unable to parse time delay '$delayThreshold'", cause = Some(e)) } - require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0, + require(!parsedDelay.isNegative(31), s"delay threshold ($delayThreshold) should not be negative.") EliminateEventTimeWatermark( EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala index 1cb446e19ffa..90b4ac97aab9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala @@ -160,11 +160,12 @@ private[sql] class GroupStateImpl[S] private( private def parseDuration(duration: String): Long = { val cal = CalendarInterval.fromCaseInsensitiveString(duration) - if (cal.milliseconds < 0 || cal.months < 0) { - throw new IllegalArgumentException(s"Provided duration ($duration) is not positive") + val daysPerMonth = 31 + if (cal.isNegative(daysPerMonth)) { + throw new IllegalArgumentException(s"Provided duration ($duration) is negative") } - cal.getDuration(31, TimeUnit.MILLISECONDS) + cal.getDuration(daysPerMonth, TimeUnit.MILLISECONDS) } private def checkTimeoutTimestampAllowed(): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala index 2bdb3402c14b..abc3166c3900 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala @@ -31,7 +31,7 @@ private object Triggers { def convert(interval: String): Long = { val cal = CalendarInterval.fromCaseInsensitiveString(interval) - if (cal.months > 0) { + if (cal.months != 0) { throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval") } TimeUnit.MICROSECONDS.toMillis(cal.microseconds) From ab6036ca1e32dfd81b9bfd8187ef0dc2acce2720 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 19 Oct 2019 22:34:16 +0300 Subject: [PATCH 04/11] Fix FlatMapGroupsWithStateSuite --- .../sql/streaming/FlatMapGroupsWithStateSuite.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala index df7e9217f914..d36c64f61a72 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala @@ -125,6 +125,8 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest { var state: GroupStateImpl[Int] = GroupStateImpl.createForStreaming( None, 1000, 1000, ProcessingTimeTimeout, hasTimedOut = false, watermarkPresent = false) assert(state.getTimeoutTimestamp === NO_TIMESTAMP) + state.setTimeoutDuration("-1 month 31 days 1 second") + assert(state.getTimeoutTimestamp === 2000) state.setTimeoutDuration(500) assert(state.getTimeoutTimestamp === 1500) // can be set without initializing state testTimeoutTimestampNotAllowed[UnsupportedOperationException](state) @@ -225,8 +227,9 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest { testIllegalTimeout { state.setTimeoutDuration("-1 month") } + testIllegalTimeout { - state.setTimeoutDuration("1 month -1 day") + state.setTimeoutDuration("1 month -31 day") } state = GroupStateImpl.createForStreaming( @@ -241,7 +244,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest { state.setTimeoutTimestamp(10000, "-1 month") } testIllegalTimeout { - state.setTimeoutTimestamp(10000, "1 month -1 day") + state.setTimeoutTimestamp(10000, "1 month -32 day") } testIllegalTimeout { state.setTimeoutTimestamp(new Date(-10000)) @@ -253,7 +256,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest { state.setTimeoutTimestamp(new Date(-10000), "-1 month") } testIllegalTimeout { - state.setTimeoutTimestamp(new Date(-10000), "1 month -1 day") + state.setTimeoutTimestamp(new Date(-10000), "1 month -32 day") } } From 6afa0062e0f3782fa3f81e114487c6c939342b3b Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 22 Oct 2019 23:39:41 +0300 Subject: [PATCH 05/11] Move getDuration() and isNegative() to IntervalUtils --- .../spark/unsafe/types/CalendarInterval.java | 22 ------- .../unsafe/types/CalendarIntervalSuite.java | 29 --------- .../plans/logical/EventTimeWatermark.scala | 3 +- .../sql/catalyst/util/IntervalUtils.scala | 30 +++++++++ .../catalyst/util/IntervalUtilsSuite.scala | 63 +++++++++++++++++++ .../scala/org/apache/spark/sql/Dataset.scala | 3 +- .../execution/streaming/GroupStateImpl.scala | 5 +- 7 files changed, 100 insertions(+), 55 deletions(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/IntervalUtilsSuite.scala diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java index 7947417f2352..28fb64f7cd0e 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java @@ -18,7 +18,6 @@ package org.apache.spark.unsafe.types; import java.io.Serializable; -import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -410,25 +409,4 @@ private void appendUnit(StringBuilder sb, long value, String unit) { sb.append(' ').append(value).append(' ').append(unit).append('s'); } } - - /** - * Gets interval duration - * @param daysPerMonth the number of days per one month - * @param targetUnit time units of the result - * @return duration in the specified time units - */ - public long getDuration(int daysPerMonth, TimeUnit targetUnit) { - long monthsDuration = Math.multiplyExact(daysPerMonth * MICROS_PER_DAY, months); - long result = Math.addExact(microseconds, monthsDuration); - return targetUnit.convert(result, TimeUnit.MICROSECONDS); - } - - /** - * Checks the interval is negative - * @param daysPerMonth the number of days per one month - * @return true if duration of the given interval is less than 0 otherwise false - */ - public boolean isNegative(int daysPerMonth) { - return getDuration(daysPerMonth, TimeUnit.MICROSECONDS) < 0; - } } diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java index 481dc9edc9eb..587071332ce4 100644 --- a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java +++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java @@ -20,7 +20,6 @@ import org.junit.Test; import java.util.Arrays; -import java.util.concurrent.TimeUnit; import static org.junit.Assert.*; import static org.apache.spark.unsafe.types.CalendarInterval.*; @@ -298,32 +297,4 @@ public void fromStringCaseSensitivityTest() { assertNull(fromString("INTERVAL")); assertNull(fromString(" Interval ")); } - - @Test - public void durationTest() { - assertEquals(fromString("0 seconds").getDuration(31, TimeUnit.MILLISECONDS), 0); - assertEquals(fromString("1 month").getDuration(31, TimeUnit.DAYS), 31); - assertEquals(fromString("1 microsecond").getDuration(30, TimeUnit.MICROSECONDS), 1); - assertEquals(fromString("1 month -30 days").getDuration(31, TimeUnit.DAYS), 1); - - try { - fromString(Integer.MAX_VALUE + " month").getDuration(31, TimeUnit.SECONDS); - fail("Expected to throw an exception for the invalid input"); - } catch (ArithmeticException e) { - assertTrue(e.getMessage().contains("overflow")); - } - } - - @Test - public void negativeIntervalTest() { - assertTrue(fromString("-1 months").isNegative(28)); - assertTrue(fromString("-1 microsecond").isNegative(30)); - assertTrue(fromString("-1 month 30 days").isNegative(31)); - assertTrue(fromString("2 months -61 days").isNegative(30)); - assertTrue(fromString("-1 year -2 seconds").isNegative(30)); - - assertFalse(fromString("0 months").isNegative(28)); - assertFalse(fromString("1 year -360 days").isNegative(31)); - assertFalse(fromString("-1 year 380 days").isNegative(31)); - } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala index 1b26758e4d29..7f5b044636c9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical import java.util.concurrent.TimeUnit import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.util.IntervalUtils import org.apache.spark.sql.types.MetadataBuilder import org.apache.spark.unsafe.types.CalendarInterval @@ -29,7 +30,7 @@ object EventTimeWatermark { def getDelayMs(delay: CalendarInterval): Long = { // We define month as `31 days` to simplify calculation. - delay.getDuration(31, TimeUnit.MILLISECONDS) + IntervalUtils.getDuration(delay, 31, TimeUnit.MILLISECONDS) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala index 78d188f81f62..6c85dd7c278f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.util +import java.util.concurrent.TimeUnit + import org.apache.spark.sql.types.Decimal import org.apache.spark.unsafe.types.CalendarInterval @@ -88,4 +90,32 @@ object IntervalUtils { result += MICROS_PER_MONTH * (interval.months % MONTHS_PER_YEAR) Decimal(result, 18, 6) } + + /** + * Gets interval duration + * + * @param cal - the interval to get duration + * @param daysPerMonth - the number of days per one month + * @param targetUnit - time units of the result + * @return duration in the specified time units + */ + def getDuration( + cal: CalendarInterval, + daysPerMonth: Int, + targetUnit: TimeUnit): Long = { + val monthsDuration = Math.multiplyExact(daysPerMonth * DateTimeUtils.MICROS_PER_DAY, cal.months) + val result = Math.addExact(cal.microseconds, monthsDuration) + targetUnit.convert(result, TimeUnit.MICROSECONDS) + } + + /** + * Checks the interval is negative + * + * @param cal - the checked interval + * @param daysPerMonth - the number of days per one month + * @return true if duration of the given interval is less than 0 otherwise false + */ + def isNegative(cal: CalendarInterval, daysPerMonth: Int): Boolean = { + getDuration(cal, daysPerMonth, TimeUnit.MICROSECONDS) < 0 + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/IntervalUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/IntervalUtilsSuite.scala new file mode 100644 index 000000000000..665ef290e0b4 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/IntervalUtilsSuite.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import java.util.concurrent.TimeUnit + +import org.scalatest.Matchers + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.util.IntervalUtils._ +import org.apache.spark.unsafe.types.CalendarInterval + +class IntervalUtilsSuite extends SparkFunSuite with Matchers { + test("interval duration") { + def duration(s: String, daysPerMonth: Int, unit: TimeUnit): Long = { + getDuration(CalendarInterval.fromString(s), daysPerMonth, unit) + } + + assert(duration("0 seconds", 31, TimeUnit.MILLISECONDS) === 0) + assert(duration("1 month", 31, TimeUnit.DAYS) === 31) + assert(duration("1 microsecond", 30, TimeUnit.MICROSECONDS) === 1) + assert(duration("1 month -30 days", 31, TimeUnit.DAYS) === 1) + + try { + duration(Integer.MAX_VALUE + " month", 31, TimeUnit.SECONDS) + fail("Expected to throw an exception for the invalid input") + } catch { + case e: ArithmeticException => + assert(e.getMessage.contains("overflow")) + } + } + + test("negative interval") { + def isNegative(s: String, daysPerMonth: Int): Boolean = { + IntervalUtils.isNegative(CalendarInterval.fromString(s), daysPerMonth) + } + + assert(isNegative("-1 months", 28)) + assert(isNegative("-1 microsecond", 30)) + assert(isNegative("-1 month 30 days", 31)) + assert(isNegative("2 months -61 days", 30)) + assert(isNegative("-1 year -2 seconds", 30)) + assert(!isNegative("0 months", 28)) + assert(!isNegative("1 year -360 days", 31)) + assert(!isNegative("-1 year 380 days", 31)) + + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 8e51de718fd6..152fc1a6aaf8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -46,6 +46,7 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection} import org.apache.spark.sql.catalyst.trees.TreeNodeTag +import org.apache.spark.sql.catalyst.util.IntervalUtils import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, ArrowConverters} import org.apache.spark.sql.execution.command._ @@ -731,7 +732,7 @@ class Dataset[T] private[sql]( s"Unable to parse time delay '$delayThreshold'", cause = Some(e)) } - require(!parsedDelay.isNegative(31), + require(!IntervalUtils.isNegative(parsedDelay, 31), s"delay threshold ($delayThreshold) should not be negative.") EliminateEventTimeWatermark( EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala index 90b4ac97aab9..d2c648c72abd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala @@ -21,6 +21,7 @@ import java.sql.Date import java.util.concurrent.TimeUnit import org.apache.spark.sql.catalyst.plans.logical.{EventTimeTimeout, ProcessingTimeTimeout} +import org.apache.spark.sql.catalyst.util.IntervalUtils import org.apache.spark.sql.execution.streaming.GroupStateImpl._ import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout} import org.apache.spark.unsafe.types.CalendarInterval @@ -161,11 +162,11 @@ private[sql] class GroupStateImpl[S] private( private def parseDuration(duration: String): Long = { val cal = CalendarInterval.fromCaseInsensitiveString(duration) val daysPerMonth = 31 - if (cal.isNegative(daysPerMonth)) { + if (IntervalUtils.isNegative(cal, daysPerMonth)) { throw new IllegalArgumentException(s"Provided duration ($duration) is negative") } - cal.getDuration(daysPerMonth, TimeUnit.MILLISECONDS) + IntervalUtils.getDuration(cal, daysPerMonth, TimeUnit.MILLISECONDS) } private def checkTimeoutTimestampAllowed(): Unit = { From 084c8d5716151e677628a39756e2d52fe501f9c7 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 23 Oct 2019 20:01:37 +0300 Subject: [PATCH 06/11] Set default daysPerMonth to 31 --- .../plans/logical/EventTimeWatermark.scala | 2 +- .../spark/sql/catalyst/util/IntervalUtils.scala | 10 +++++----- .../sql/catalyst/util/IntervalUtilsSuite.scala | 14 +++++++------- .../main/scala/org/apache/spark/sql/Dataset.scala | 2 +- .../sql/execution/streaming/GroupStateImpl.scala | 5 ++--- 5 files changed, 16 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala index 7f5b044636c9..fd48b4b291aa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala @@ -30,7 +30,7 @@ object EventTimeWatermark { def getDelayMs(delay: CalendarInterval): Long = { // We define month as `31 days` to simplify calculation. - IntervalUtils.getDuration(delay, 31, TimeUnit.MILLISECONDS) + IntervalUtils.getDuration(delay, TimeUnit.MILLISECONDS) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala index 6c85dd7c278f..7ffe6e46e4e2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala @@ -95,14 +95,14 @@ object IntervalUtils { * Gets interval duration * * @param cal - the interval to get duration - * @param daysPerMonth - the number of days per one month * @param targetUnit - time units of the result + * @param daysPerMonth - the number of days per one month * @return duration in the specified time units */ def getDuration( cal: CalendarInterval, - daysPerMonth: Int, - targetUnit: TimeUnit): Long = { + targetUnit: TimeUnit, + daysPerMonth: Int = 31): Long = { val monthsDuration = Math.multiplyExact(daysPerMonth * DateTimeUtils.MICROS_PER_DAY, cal.months) val result = Math.addExact(cal.microseconds, monthsDuration) targetUnit.convert(result, TimeUnit.MICROSECONDS) @@ -115,7 +115,7 @@ object IntervalUtils { * @param daysPerMonth - the number of days per one month * @return true if duration of the given interval is less than 0 otherwise false */ - def isNegative(cal: CalendarInterval, daysPerMonth: Int): Boolean = { - getDuration(cal, daysPerMonth, TimeUnit.MICROSECONDS) < 0 + def isNegative(cal: CalendarInterval, daysPerMonth: Int = 31): Boolean = { + getDuration(cal, TimeUnit.MICROSECONDS, daysPerMonth) < 0 } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/IntervalUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/IntervalUtilsSuite.scala index 665ef290e0b4..fd5003c5b56a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/IntervalUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/IntervalUtilsSuite.scala @@ -27,17 +27,17 @@ import org.apache.spark.unsafe.types.CalendarInterval class IntervalUtilsSuite extends SparkFunSuite with Matchers { test("interval duration") { - def duration(s: String, daysPerMonth: Int, unit: TimeUnit): Long = { - getDuration(CalendarInterval.fromString(s), daysPerMonth, unit) + def duration(s: String, unit: TimeUnit, daysPerMonth: Int): Long = { + getDuration(CalendarInterval.fromString(s), unit, daysPerMonth) } - assert(duration("0 seconds", 31, TimeUnit.MILLISECONDS) === 0) - assert(duration("1 month", 31, TimeUnit.DAYS) === 31) - assert(duration("1 microsecond", 30, TimeUnit.MICROSECONDS) === 1) - assert(duration("1 month -30 days", 31, TimeUnit.DAYS) === 1) + assert(duration("0 seconds", TimeUnit.MILLISECONDS, 31) === 0) + assert(duration("1 month", TimeUnit.DAYS, 31) === 31) + assert(duration("1 microsecond", TimeUnit.MICROSECONDS, 30) === 1) + assert(duration("1 month -30 days", TimeUnit.DAYS, 31) === 1) try { - duration(Integer.MAX_VALUE + " month", 31, TimeUnit.SECONDS) + duration(Integer.MAX_VALUE + " month", TimeUnit.SECONDS, 31) fail("Expected to throw an exception for the invalid input") } catch { case e: ArithmeticException => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 152fc1a6aaf8..786e75de461c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -732,7 +732,7 @@ class Dataset[T] private[sql]( s"Unable to parse time delay '$delayThreshold'", cause = Some(e)) } - require(!IntervalUtils.isNegative(parsedDelay, 31), + require(!IntervalUtils.isNegative(parsedDelay), s"delay threshold ($delayThreshold) should not be negative.") EliminateEventTimeWatermark( EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala index d2c648c72abd..1c51d5188bb7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala @@ -161,12 +161,11 @@ private[sql] class GroupStateImpl[S] private( private def parseDuration(duration: String): Long = { val cal = CalendarInterval.fromCaseInsensitiveString(duration) - val daysPerMonth = 31 - if (IntervalUtils.isNegative(cal, daysPerMonth)) { + if (IntervalUtils.isNegative(cal)) { throw new IllegalArgumentException(s"Provided duration ($duration) is negative") } - IntervalUtils.getDuration(cal, daysPerMonth, TimeUnit.MILLISECONDS) + IntervalUtils.getDuration(cal, TimeUnit.MILLISECONDS) } private def checkTimeoutTimestampAllowed(): Unit = { From 1840a1da115805274f2ea1fbf255ddf76b5e765c Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 28 Oct 2019 17:27:52 +0300 Subject: [PATCH 07/11] Add comments regarding 31 days per month by default --- .../spark/sql/catalyst/util/IntervalUtils.scala | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala index b37e8dae38f9..8f4624d21504 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala @@ -127,7 +127,11 @@ object IntervalUtils { * * @param cal - the interval to get duration * @param targetUnit - time units of the result - * @param daysPerMonth - the number of days per one month + * @param daysPerMonth - the number of days per one month. The default value is 31 days + * per month. This value was taken as the default because it is used + * in Structured Streaming for watermark calculations. Having 31 days + * per month, we can guarantee that events are not dropped before + * the end of any month (February with 29 days or January with 31 days). * @return duration in the specified time units */ def getDuration( @@ -142,8 +146,12 @@ object IntervalUtils { /** * Checks the interval is negative * - * @param cal - the checked interval - * @param daysPerMonth - the number of days per one month + * @param cal - the checked interval + * @param daysPerMonth - the number of days per one month. The default value is 31 days + * per month. This value was taken as the default because it is used + * in Structured Streaming for watermark calculations. Having 31 days + * per month, we can guarantee that events are not dropped before + * the end of any month (February with 29 days or January with 31 days). * @return true if duration of the given interval is less than 0 otherwise false */ def isNegative(cal: CalendarInterval, daysPerMonth: Int = 31): Boolean = { From 4dab9067f62b56918ab86c9692f65fb393b87b4c Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 28 Oct 2019 21:03:33 +0300 Subject: [PATCH 08/11] Remove a comment in getDelayMs() --- .../spark/sql/catalyst/plans/logical/EventTimeWatermark.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala index fd48b4b291aa..b6bf7cd85d47 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala @@ -29,7 +29,6 @@ object EventTimeWatermark { val delayKey = "spark.watermarkDelayMs" def getDelayMs(delay: CalendarInterval): Long = { - // We define month as `31 days` to simplify calculation. IntervalUtils.getDuration(delay, TimeUnit.MILLISECONDS) } } From b21297666bb838c3f53193dc9583b60350c09fd8 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 28 Oct 2019 21:06:06 +0300 Subject: [PATCH 09/11] cal -> interval --- .../spark/sql/catalyst/util/IntervalUtils.scala | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala index 8f4624d21504..7c477132e478 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala @@ -125,7 +125,7 @@ object IntervalUtils { /** * Gets interval duration * - * @param cal - the interval to get duration + * @param interval - the interval to get duration * @param targetUnit - time units of the result * @param daysPerMonth - the number of days per one month. The default value is 31 days * per month. This value was taken as the default because it is used @@ -135,18 +135,20 @@ object IntervalUtils { * @return duration in the specified time units */ def getDuration( - cal: CalendarInterval, + interval: CalendarInterval, targetUnit: TimeUnit, daysPerMonth: Int = 31): Long = { - val monthsDuration = Math.multiplyExact(daysPerMonth * DateTimeUtils.MICROS_PER_DAY, cal.months) - val result = Math.addExact(cal.microseconds, monthsDuration) + val monthsDuration = Math.multiplyExact( + daysPerMonth * DateTimeUtils.MICROS_PER_DAY, + interval.months) + val result = Math.addExact(interval.microseconds, monthsDuration) targetUnit.convert(result, TimeUnit.MICROSECONDS) } /** * Checks the interval is negative * - * @param cal - the checked interval + * @param interval - the checked interval * @param daysPerMonth - the number of days per one month. The default value is 31 days * per month. This value was taken as the default because it is used * in Structured Streaming for watermark calculations. Having 31 days @@ -154,7 +156,7 @@ object IntervalUtils { * the end of any month (February with 29 days or January with 31 days). * @return true if duration of the given interval is less than 0 otherwise false */ - def isNegative(cal: CalendarInterval, daysPerMonth: Int = 31): Boolean = { - getDuration(cal, TimeUnit.MICROSECONDS, daysPerMonth) < 0 + def isNegative(interval: CalendarInterval, daysPerMonth: Int = 31): Boolean = { + getDuration(interval, TimeUnit.MICROSECONDS, daysPerMonth) < 0 } } From 8c2bcb1424d6269bcad922acbe0d0e3ac10fe60f Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 29 Oct 2019 21:47:29 +0300 Subject: [PATCH 10/11] remove unused import --- .../scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala index 8c0c534ef919..b8d3a838ddca 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.catalyst.util import java.util.concurrent.TimeUnit -import java.util.regex.Pattern import scala.util.control.NonFatal From edad25e70cb07319acc4235b603fc5cd16ed2433 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 29 Oct 2019 21:56:18 +0300 Subject: [PATCH 11/11] refactor comments --- .../sql/catalyst/util/IntervalUtils.scala | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala index b8d3a838ddca..23b9e3f4404c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala @@ -321,14 +321,14 @@ object IntervalUtils { /** * Gets interval duration * - * @param interval - the interval to get duration - * @param targetUnit - time units of the result - * @param daysPerMonth - the number of days per one month. The default value is 31 days - * per month. This value was taken as the default because it is used - * in Structured Streaming for watermark calculations. Having 31 days - * per month, we can guarantee that events are not dropped before - * the end of any month (February with 29 days or January with 31 days). - * @return duration in the specified time units + * @param interval The interval to get duration + * @param targetUnit Time units of the result + * @param daysPerMonth The number of days per one month. The default value is 31 days + * per month. This value was taken as the default because it is used + * in Structured Streaming for watermark calculations. Having 31 days + * per month, we can guarantee that events are not dropped before + * the end of any month (February with 29 days or January with 31 days). + * @return Duration in the specified time units */ def getDuration( interval: CalendarInterval, @@ -344,12 +344,12 @@ object IntervalUtils { /** * Checks the interval is negative * - * @param interval - the checked interval - * @param daysPerMonth - the number of days per one month. The default value is 31 days - * per month. This value was taken as the default because it is used - * in Structured Streaming for watermark calculations. Having 31 days - * per month, we can guarantee that events are not dropped before - * the end of any month (February with 29 days or January with 31 days). + * @param interval The checked interval + * @param daysPerMonth The number of days per one month. The default value is 31 days + * per month. This value was taken as the default because it is used + * in Structured Streaming for watermark calculations. Having 31 days + * per month, we can guarantee that events are not dropped before + * the end of any month (February with 29 days or January with 31 days). * @return true if duration of the given interval is less than 0 otherwise false */ def isNegative(interval: CalendarInterval, daysPerMonth: Int = 31): Boolean = {