Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.unsafe.types;

import java.io.Serializable;
import java.util.Locale;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -66,6 +67,10 @@ private static long toLong(String s) {
}
}

/**
* Convert a string to CalendarInterval. Return null if the input string is not a valid interval.
* This method is case-sensitive and all characters in the input string should be in lower case.
*/
public static CalendarInterval fromString(String s) {
if (s == null) {
return null;
Expand All @@ -87,6 +92,26 @@ public static CalendarInterval fromString(String s) {
}
}

/**
* Convert a string to CalendarInterval. Unlike fromString, this method is case-insensitive and
* will throw IllegalArgumentException when the input string is not a valid interval.
*
* @throws IllegalArgumentException if the string is not a valid internal.
*/
public static CalendarInterval fromCaseInsensitiveString(String s) {
if (s == null || s.trim().isEmpty()) {
throw new IllegalArgumentException("Interval cannot be null or blank.");
}
String sInLowerCase = s.trim().toLowerCase(Locale.ROOT);
String interval =
sInLowerCase.startsWith("interval ") ? sInLowerCase : "interval " + sInLowerCase;
CalendarInterval cal = fromString(interval);
if (cal == null) {
throw new IllegalArgumentException("Invalid interval: " + s);
}
return cal;
}

public static long toLongWithRange(String fieldName,
String s, long minValue, long maxValue) throws IllegalArgumentException {
long result = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,31 @@ public void fromStringTest() {
assertNull(fromString(input));
}

@Test
public void fromCaseInsensitiveStringTest() {
for (String input : new String[]{"5 MINUTES", "5 minutes", "5 Minutes"}) {
assertEquals(fromCaseInsensitiveString(input), new CalendarInterval(0, 5L * 60 * 1_000_000));
}

for (String input : new String[]{null, "", " "}) {
try {
fromCaseInsensitiveString(input);
fail("Expected to throw an exception for the invalid input");
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().contains("cannot be null or blank"));
}
}

for (String input : new String[]{"interval", "interval1 day", "foo", "foo 1 day"}) {
try {
fromCaseInsensitiveString(input);
fail("Expected to throw an exception for the invalid input");
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().contains("Invalid interval"));
}
}
}

@Test
public void fromYearMonthStringTest() {
String input;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.catalyst.expressions

import org.apache.commons.lang3.StringUtils

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
Expand Down Expand Up @@ -104,20 +102,7 @@ object TimeWindow {
* precision.
*/
private def getIntervalInMicroSeconds(interval: String): Long = {
if (StringUtils.isBlank(interval)) {
throw new IllegalArgumentException(
"The window duration, slide duration and start time cannot be null or blank.")
}
val intervalString = if (interval.startsWith("interval")) {
interval
} else {
"interval " + interval
}
val cal = CalendarInterval.fromString(intervalString)
if (cal == null) {
throw new IllegalArgumentException(
s"The provided interval ($interval) did not correspond to a valid interval string.")
}
val cal = CalendarInterval.fromCaseInsensitiveString(interval)
if (cal.months > 0) {
throw new IllegalArgumentException(
s"Intervals greater than a month is not supported ($interval).")
Expand Down
10 changes: 8 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -695,8 +695,14 @@ class Dataset[T] private[sql](
// defined on a derived column cannot referenced elsewhere in the plan.
def withWatermark(eventTime: String, delayThreshold: String): Dataset[T] = withTypedPlan {
val parsedDelay =
Option(CalendarInterval.fromString("interval " + delayThreshold))
.getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'"))
try {
CalendarInterval.fromCaseInsensitiveString(delayThreshold)
} catch {
case e: IllegalArgumentException =>
throw new AnalysisException(
s"Unable to parse time delay '$delayThreshold'",
cause = Some(e))
}
require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0,
s"delay threshold ($delayThreshold) should not be negative.")
EliminateEventTimeWatermark(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ package org.apache.spark.sql.execution.streaming
import java.sql.Date
import java.util.concurrent.TimeUnit

import org.apache.commons.lang3.StringUtils

import org.apache.spark.sql.catalyst.plans.logical.{EventTimeTimeout, ProcessingTimeTimeout}
import org.apache.spark.sql.execution.streaming.GroupStateImpl._
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
Expand Down Expand Up @@ -161,20 +159,7 @@ private[sql] class GroupStateImpl[S] private(
def getTimeoutTimestamp: Long = timeoutTimestamp

private def parseDuration(duration: String): Long = {
if (StringUtils.isBlank(duration)) {
throw new IllegalArgumentException(
"Provided duration is null or blank.")
}
val intervalString = if (duration.startsWith("interval")) {
duration
} else {
"interval " + duration
}
val cal = CalendarInterval.fromString(intervalString)
if (cal == null) {
throw new IllegalArgumentException(
s"Provided duration ($duration) is not valid.")
}
val cal = CalendarInterval.fromCaseInsensitiveString(duration)
if (cal.milliseconds < 0 || cal.months < 0) {
throw new IllegalArgumentException(s"Provided duration ($duration) is not positive")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import java.util.concurrent.TimeUnit

import scala.concurrent.duration.Duration

import org.apache.commons.lang3.StringUtils

import org.apache.spark.annotation.Evolving
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.unsafe.types.CalendarInterval
Expand All @@ -38,18 +36,7 @@ case class ContinuousTrigger(intervalMs: Long) extends Trigger {

private[sql] object ContinuousTrigger {
def apply(interval: String): ContinuousTrigger = {
if (StringUtils.isBlank(interval)) {
throw new IllegalArgumentException(
"interval cannot be null or blank.")
}
val cal = if (interval.startsWith("interval")) {
CalendarInterval.fromString(interval)
} else {
CalendarInterval.fromString("interval " + interval)
}
if (cal == null) {
throw new IllegalArgumentException(s"Invalid interval: $interval")
}
val cal = CalendarInterval.fromCaseInsensitiveString(interval)
if (cal.months > 0) {
throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import java.util.concurrent.TimeUnit

import scala.concurrent.duration.Duration

import org.apache.commons.lang3.StringUtils

import org.apache.spark.annotation.Evolving
import org.apache.spark.unsafe.types.CalendarInterval

Expand Down Expand Up @@ -76,18 +74,7 @@ object ProcessingTime {
*/
@deprecated("use Trigger.ProcessingTime(interval)", "2.2.0")
def apply(interval: String): ProcessingTime = {
if (StringUtils.isBlank(interval)) {
throw new IllegalArgumentException(
"interval cannot be null or blank.")
}
val cal = if (interval.startsWith("interval")) {
CalendarInterval.fromString(interval)
} else {
CalendarInterval.fromString("interval " + interval)
}
if (cal == null) {
throw new IllegalArgumentException(s"Invalid interval: $interval")
}
val cal = CalendarInterval.fromCaseInsensitiveString(interval)
if (cal.months > 0) {
throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval")
}
Expand Down