Skip to content

Commit 5e9a155

Browse files
MaxGekkcloud-fan
authored andcommitted
[SPARK-29520][SS] Fix checks of negative intervals
### What changes were proposed in this pull request? - Added `getDuration()` to calculate interval duration in specified time units assuming provided days per months - Added `isNegative()` which return `true` is the interval duration is less than 0 - Fix checking negative intervals by using `isNegative()` in structured streaming classes - Fix checking of `year-months` intervals ### Why are the changes needed? This fixes incorrect checking of negative intervals. An interval is negative when its duration is negative but not if interval's months **or** microseconds is negative. Also this fixes checking of `year-month` interval support because the `month` field could be negative. ### Does this PR introduce any user-facing change? Should not ### How was this patch tested? - Added tests for the `getDuration()` and `isNegative()` methods to `IntervalUtilsSuite` - By existing SS tests Closes #26177 from MaxGekk/interval-is-positive. Authored-by: Maxim Gekk <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 095f7b0 commit 5e9a155

File tree

9 files changed

+90
-15
lines changed

9 files changed

+90
-15
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ object StreamingJoinHelper extends PredicateHelper with Logging {
256256
val castedLit = lit.dataType match {
257257
case CalendarIntervalType =>
258258
val calendarInterval = lit.value.asInstanceOf[CalendarInterval]
259-
if (calendarInterval.months > 0) {
259+
if (calendarInterval.months != 0) {
260260
invalid = true
261261
logWarning(
262262
s"Failed to extract state value watermark from condition $exprToCollectFrom " +

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ object TimeWindow {
103103
*/
104104
private def getIntervalInMicroSeconds(interval: String): Long = {
105105
val cal = IntervalUtils.fromString(interval)
106-
if (cal.months > 0) {
106+
if (cal.months != 0) {
107107
throw new IllegalArgumentException(
108108
s"Intervals greater than a month is not supported ($interval).")
109109
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical
2020
import java.util.concurrent.TimeUnit
2121

2222
import org.apache.spark.sql.catalyst.expressions.Attribute
23+
import org.apache.spark.sql.catalyst.util.IntervalUtils
2324
import org.apache.spark.sql.types.MetadataBuilder
2425
import org.apache.spark.unsafe.types.CalendarInterval
2526

@@ -28,9 +29,7 @@ object EventTimeWatermark {
2829
val delayKey = "spark.watermarkDelayMs"
2930

3031
def getDelayMs(delay: CalendarInterval): Long = {
31-
// We define month as `31 days` to simplify calculation.
32-
val millisPerMonth = TimeUnit.MICROSECONDS.toMillis(CalendarInterval.MICROS_PER_DAY) * 31
33-
delay.milliseconds + delay.months * millisPerMonth
32+
IntervalUtils.getDuration(delay, TimeUnit.MILLISECONDS)
3433
}
3534
}
3635

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.catalyst.util
1919

20-
import java.util.regex.Pattern
20+
import java.util.concurrent.TimeUnit
2121

2222
import scala.util.control.NonFatal
2323

@@ -325,4 +325,42 @@ object IntervalUtils {
325325
"Interval string does not match second-nano format of ss.nnnnnnnnn")
326326
}
327327
}
328+
329+
/**
330+
* Gets interval duration
331+
*
332+
* @param interval The interval to get duration
333+
* @param targetUnit Time units of the result
334+
* @param daysPerMonth The number of days per one month. The default value is 31 days
335+
* per month. This value was taken as the default because it is used
336+
* in Structured Streaming for watermark calculations. Having 31 days
337+
* per month, we can guarantee that events are not dropped before
338+
* the end of any month (February with 29 days or January with 31 days).
339+
* @return Duration in the specified time units
340+
*/
341+
def getDuration(
342+
interval: CalendarInterval,
343+
targetUnit: TimeUnit,
344+
daysPerMonth: Int = 31): Long = {
345+
val monthsDuration = Math.multiplyExact(
346+
daysPerMonth * DateTimeUtils.MICROS_PER_DAY,
347+
interval.months)
348+
val result = Math.addExact(interval.microseconds, monthsDuration)
349+
targetUnit.convert(result, TimeUnit.MICROSECONDS)
350+
}
351+
352+
/**
353+
* Checks the interval is negative
354+
*
355+
* @param interval The checked interval
356+
* @param daysPerMonth The number of days per one month. The default value is 31 days
357+
* per month. This value was taken as the default because it is used
358+
* in Structured Streaming for watermark calculations. Having 31 days
359+
* per month, we can guarantee that events are not dropped before
360+
* the end of any month (February with 29 days or January with 31 days).
361+
* @return true if duration of the given interval is less than 0 otherwise false
362+
*/
363+
def isNegative(interval: CalendarInterval, daysPerMonth: Int = 31): Boolean = {
364+
getDuration(interval, TimeUnit.MICROSECONDS, daysPerMonth) < 0
365+
}
328366
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/IntervalUtilsSuite.scala

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.catalyst.util
1919

20+
import java.util.concurrent.TimeUnit
21+
2022
import org.apache.spark.SparkFunSuite
2123
import org.apache.spark.sql.catalyst.util.IntervalUtils.{fromDayTimeString, fromString, fromYearMonthString}
2224
import org.apache.spark.unsafe.types.CalendarInterval
@@ -148,4 +150,38 @@ class IntervalUtilsSuite extends SparkFunSuite {
148150
assert(e.getMessage.contains("Cannot support (interval"))
149151
}
150152
}
153+
154+
test("interval duration") {
155+
def duration(s: String, unit: TimeUnit, daysPerMonth: Int): Long = {
156+
IntervalUtils.getDuration(fromString(s), unit, daysPerMonth)
157+
}
158+
159+
assert(duration("0 seconds", TimeUnit.MILLISECONDS, 31) === 0)
160+
assert(duration("1 month", TimeUnit.DAYS, 31) === 31)
161+
assert(duration("1 microsecond", TimeUnit.MICROSECONDS, 30) === 1)
162+
assert(duration("1 month -30 days", TimeUnit.DAYS, 31) === 1)
163+
164+
try {
165+
duration(Integer.MAX_VALUE + " month", TimeUnit.SECONDS, 31)
166+
fail("Expected to throw an exception for the invalid input")
167+
} catch {
168+
case e: ArithmeticException =>
169+
assert(e.getMessage.contains("overflow"))
170+
}
171+
}
172+
173+
test("negative interval") {
174+
def isNegative(s: String, daysPerMonth: Int): Boolean = {
175+
IntervalUtils.isNegative(fromString(s), daysPerMonth)
176+
}
177+
178+
assert(isNegative("-1 months", 28))
179+
assert(isNegative("-1 microsecond", 30))
180+
assert(isNegative("-1 month 30 days", 31))
181+
assert(isNegative("2 months -61 days", 30))
182+
assert(isNegative("-1 year -2 seconds", 30))
183+
assert(!isNegative("0 months", 28))
184+
assert(!isNegative("1 year -360 days", 31))
185+
assert(!isNegative("-1 year 380 days", 31))
186+
}
151187
}

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -732,7 +732,7 @@ class Dataset[T] private[sql](
732732
s"Unable to parse time delay '$delayThreshold'",
733733
cause = Some(e))
734734
}
735-
require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0,
735+
require(!IntervalUtils.isNegative(parsedDelay),
736736
s"delay threshold ($delayThreshold) should not be negative.")
737737
EliminateEventTimeWatermark(
738738
EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan))

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -161,12 +161,11 @@ private[sql] class GroupStateImpl[S] private(
161161

162162
private def parseDuration(duration: String): Long = {
163163
val cal = IntervalUtils.fromString(duration)
164-
if (cal.milliseconds < 0 || cal.months < 0) {
165-
throw new IllegalArgumentException(s"Provided duration ($duration) is not positive")
164+
if (IntervalUtils.isNegative(cal)) {
165+
throw new IllegalArgumentException(s"Provided duration ($duration) is negative")
166166
}
167167

168-
val millisPerMonth = TimeUnit.MICROSECONDS.toMillis(CalendarInterval.MICROS_PER_DAY) * 31
169-
cal.milliseconds + cal.months * millisPerMonth
168+
IntervalUtils.getDuration(cal, TimeUnit.MILLISECONDS)
170169
}
171170

172171
private def checkTimeoutTimestampAllowed(): Unit = {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ private object Triggers {
3131

3232
def convert(interval: String): Long = {
3333
val cal = IntervalUtils.fromString(interval)
34-
if (cal.months > 0) {
34+
if (cal.months != 0) {
3535
throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval")
3636
}
3737
TimeUnit.MICROSECONDS.toMillis(cal.microseconds)

sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest {
125125
var state: GroupStateImpl[Int] = GroupStateImpl.createForStreaming(
126126
None, 1000, 1000, ProcessingTimeTimeout, hasTimedOut = false, watermarkPresent = false)
127127
assert(state.getTimeoutTimestamp === NO_TIMESTAMP)
128+
state.setTimeoutDuration("-1 month 31 days 1 second")
129+
assert(state.getTimeoutTimestamp === 2000)
128130
state.setTimeoutDuration(500)
129131
assert(state.getTimeoutTimestamp === 1500) // can be set without initializing state
130132
testTimeoutTimestampNotAllowed[UnsupportedOperationException](state)
@@ -225,8 +227,9 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest {
225227
testIllegalTimeout {
226228
state.setTimeoutDuration("-1 month")
227229
}
230+
228231
testIllegalTimeout {
229-
state.setTimeoutDuration("1 month -1 day")
232+
state.setTimeoutDuration("1 month -31 day")
230233
}
231234

232235
state = GroupStateImpl.createForStreaming(
@@ -241,7 +244,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest {
241244
state.setTimeoutTimestamp(10000, "-1 month")
242245
}
243246
testIllegalTimeout {
244-
state.setTimeoutTimestamp(10000, "1 month -1 day")
247+
state.setTimeoutTimestamp(10000, "1 month -32 day")
245248
}
246249
testIllegalTimeout {
247250
state.setTimeoutTimestamp(new Date(-10000))
@@ -253,7 +256,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest {
253256
state.setTimeoutTimestamp(new Date(-10000), "-1 month")
254257
}
255258
testIllegalTimeout {
256-
state.setTimeoutTimestamp(new Date(-10000), "1 month -1 day")
259+
state.setTimeoutTimestamp(new Date(-10000), "1 month -32 day")
257260
}
258261
}
259262

0 commit comments

Comments
 (0)