-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-29520][SS] Fix checks of negative intervals #26177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| * @param daysPerMonth the number of days per one month | ||
| * @return true if duration of the given interval is less than 0 otherwise false | ||
| */ | ||
| public boolean isNegative(int daysPerMonth) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it necessary to expose this vs just letting callers get the duration and compare?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added it for code readability at caller side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also it could be considered as a companion method for negate()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srowen Do you think it is not worth it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm neutral on it. It makes some sense; I'm just always reluctant to add API methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be static?
Yes, it can.
Can the methods be package private?
Probably, yes.
Why does the caller define how many days are in a month?
Because number of days per month is not constant. Structure Streaming follows conservative approach, and assumes 31 days per months (see #16304 (comment)). In another places, we can assume another numbers (see
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala
Line 31 in 77fe8a8
| val DAYS_PER_MONTH: Byte = 30 |
would that assumption not be fixed?
For now, the methods are used by SS only. Apparently 31 can be hard coded inside.
More generally, do we really need to allow for negative intervals at all? semantically it's nonnegative. Is it allowed in SQL?
The SQL standard allows negative intervals. If we look at other DMBS, PostgreSQL allows as well. If you subtract 2 timestamp columns timestamp1 - timestamp2, what are you going to produce when timestamp1 < timstamp2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, yes makes sense about negative intervals as the necessary result of subtracting timestamps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The CalendarInterval class has many static methods that can be move out of it to a separate class, for instance CalendarIntervalUtils. I would leave only milliseconds(), add(), subtract(), negate(), hashCode(), toString() in CalendarInterval, and put fromString(), fromCaseInsensitiveString(), fromYearMonthString() and etc. to CalendarIntervalUtils.
... or any kind of refactoring is impossible in Spark already because of commit history tracking problem? @cloud-fan WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't bother with a big refactoring here. Whatever is consistent. But yeah you can make it static at least?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will move those 2 methods to IntervalUtils
|
Test build #112319 has finished for PR 26177 at commit
|
|
Test build #112321 has finished for PR 26177 at commit
|
|
I'm neutral on this PR. |
I do believe the caller should not depend on how many fields interval has, 2 now, 3 after your PR or maybe 5 in the future. What's matter is negative or not user's interval.
Users should struggle to pass the constraints. Any combination of non-negative interval should work.
Such assumption has been already made in Structured Streaming code. It is 31 days per months. Nothing more is needed. Splitting microseconds to days and microseconds should not require more assumptions. |
|
@zsxwing Does this fix make sense, WDYT? |
|
Test build #112488 has finished for PR 26177 at commit
|
srowen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking reasonable to me.
| */ | ||
| def getDuration( | ||
| cal: CalendarInterval, | ||
| daysPerMonth: Int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could also be an optional final argument and default to 31. For now, when it's always 31, maybe that's simpler.
|
Test build #112554 has finished for PR 26177 at commit
|
|
I'll start reviewing it after conflicts are resolved, thanks for fixing them! |
| def getDuration( | ||
| cal: CalendarInterval, | ||
| targetUnit: TimeUnit, | ||
| daysPerMonth: Int = 31): Long = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where do we specify daysPerMonth?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there any other places we need to get a duration of interval except for stream watermark? If not I think it's simpler to call it getDurationForWaterwark, and explain why we pick 1 month = 31 days, according to #16304 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you propose is ad-hoc interface which solves current issue only. I don't like it because it makes interval interface not reusable.
- Next time when we need durations in another place, are you going to add
getDurationForAnotherPlace1? - If someone will need a duration with 31 days per month for different purpose but not for watermarks, he/she cannot use it even in Structured Streaming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my understanding, a watermark has type of TIMESTAMP semantically but not INTERVAL. Probably, it can be even negative. If to rename the method, it could getDurationOfDelay or getDurationOfTimeout (or following your proposal getDurationOfDelayForEventTimeWatermark or getDurationOfTimeoutForGroupState)
| * @param daysPerMonth - the number of days per one month | ||
| * @return true if duration of the given interval is less than 0 otherwise false | ||
| */ | ||
| def isNegative(cal: CalendarInterval, daysPerMonth: Int = 31): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, we can call it isNegativeWatermark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO, this is confusing method name. Here, we do not check any watermarks. I would understand isNegativeDelay
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, this isn't specific to one usage. If it lives in a general class a general name is better
|
Test build #112671 has finished for PR 26177 at commit
|
| def getDuration( | ||
| cal: CalendarInterval, | ||
| targetUnit: TimeUnit, | ||
| daysPerMonth: Int = 31): Long = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then we can remove the default value? The caller side should explicitly specify that it wants 31 days per month and why.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggested the default as that is the only value used now. I think callers that specify otherwise can explain why (later)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then at least we need to explain why 31 is picked as the default value
|
Test build #112774 has started for PR 26177 at commit |
| val delayKey = "spark.watermarkDelayMs" | ||
|
|
||
| def getDelayMs(delay: CalendarInterval): Long = { | ||
| // We define month as `31 days` to simplify calculation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment can be removed, as no 31 can be seen here.
| /** | ||
| * Gets interval duration | ||
| * | ||
| * @param cal - the interval to get duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: seems "interval" is a better name.
| def convert(interval: String): Long = { | ||
| val cal = IntervalUtils.fromString(interval) | ||
| if (cal.months > 0) { | ||
| if (cal.months != 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like another way of converting interval to duration: make sure the months field is 0. Shall we also take it into account in the new getDuration method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can change getDuration() to:
def getDuration(
interval: CalendarInterval,
targetUnit: TimeUnit,
daysPerMonth: Option[Int] = Some(31)): Long = {
val monthsDuration = daysPerMonth
.map { days =>
Math.multiplyExact(days * DateTimeUtils.MICROS_PER_DAY, interval.months)
}.getOrElse {
if (interval.months == 0) {
0L
} else {
throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval")
}
}
val result = Math.addExact(interval.microseconds, monthsDuration)
targetUnit.convert(result, TimeUnit.MICROSECONDS)
}and call getDuration(cal, TimeUnit.MILLISECONDS, None)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but I am not sure that this check should be inside of getDuration()
|
Test build #112790 has finished for PR 26177 at commit
|
|
retest this please |
|
Test build #112827 has finished for PR 26177 at commit
|
|
retest this please |
|
Test build #112833 has finished for PR 26177 at commit
|
…-positive # Conflicts: # sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala # sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/IntervalUtilsSuite.scala
|
Test build #112864 has finished for PR 26177 at commit
|
|
@cloud-fan Could you take a look at this one more time, please. |
|
thanks, merging to master! |
What changes were proposed in this pull request?
getDuration()to calculate interval duration in specified time units assuming provided days per monthsisNegative()which returntrueis the interval duration is less than 0isNegative()in structured streaming classesyear-monthsintervalsWhy are the changes needed?
This fixes incorrect checking of negative intervals. An interval is negative when its duration is negative but not if interval's months or microseconds is negative. Also this fixes checking of
year-monthinterval support because themonthfield could be negative.Does this PR introduce any user-facing change?
Should not
How was this patch tested?
getDuration()andisNegative()methods toIntervalUtilsSuite