Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,13 @@ case class TimeWindow(
if (slideDuration <= 0) {
return TypeCheckFailure(s"The slide duration ($slideDuration) must be greater than 0.")
}
if (startTime < 0) {
return TypeCheckFailure(s"The start time ($startTime) must be greater than or equal to 0.")
}
if (slideDuration > windowDuration) {
return TypeCheckFailure(s"The slide duration ($slideDuration) must be less than or equal" +
s" to the windowDuration ($windowDuration).")
}
if (startTime >= slideDuration) {
return TypeCheckFailure(s"The start time ($startTime) must be less than the " +
s"slideDuration ($slideDuration).")
if (startTime.abs >= slideDuration) {
return TypeCheckFailure(s"The absolute value of start time ($startTime) must be less " +
s"than the slideDuration ($slideDuration).")
}
}
dataTypeCheck
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,14 +334,28 @@ class AnalysisErrorSuite extends AnalysisTest {
"start time greater than slide duration in time window",
testRelation.select(
TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "1 second", "1 minute").as("window")),
"The start time " :: " must be less than the slideDuration " :: Nil
"The absolute value of start time " :: " must be less than the slideDuration " :: Nil
)

errorTest(
"start time equal to slide duration in time window",
testRelation.select(
TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "1 second", "1 second").as("window")),
"The start time " :: " must be less than the slideDuration " :: Nil
"The absolute value of start time " :: " must be less than the slideDuration " :: Nil
)

errorTest(
"SPARK-21590: absolute value of start time greater than slide duration in time window",
testRelation.select(
TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "1 second", "-1 minute").as("window")),
"The absolute value of start time " :: " must be less than the slideDuration " :: Nil
)

errorTest(
"SPARK-21590: absolute value of start time equal to slide duration in time window",
testRelation.select(
TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "1 second", "-1 second").as("window")),
"The absolute value of start time " :: " must be less than the slideDuration " :: Nil
)

errorTest(
Expand Down Expand Up @@ -372,13 +386,6 @@ class AnalysisErrorSuite extends AnalysisTest {
"The slide duration" :: " must be greater than 0." :: Nil
)

errorTest(
"negative start time in time window",
testRelation.select(
TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "1 second", "-5 second").as("window")),
"The start time" :: "must be greater than or equal to 0." :: Nil
)

errorTest(
"generator nested in expressions",
listRelation.select(Explode('list) + 1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,19 @@ class TimeWindowSuite extends SparkFunSuite with ExpressionEvalHelper with Priva
}
}

test("SPARK-21590: Start time works with negative values and return microseconds") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you also add a DataFrame test for this with a negative value? then I'll feel a lot more comfortable that we don't have to have the start offset as a positive number in the window calculation.
Look for DataFrameTimeWindowingSuite.scala

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah thanks, I added some tests in DataFrameTimeWindowingSuite

val validDuration = "10 minutes"
for ((text, seconds) <- Seq(
("-10 seconds", -10000000), // -1e7
("-1 minute", -60000000),
("-1 hour", -3600000000L))) { // -6e7
assert(TimeWindow(Literal(10L), validDuration, validDuration, "interval " + text).startTime
=== seconds)
assert(TimeWindow(Literal(10L), validDuration, validDuration, text).startTime
=== seconds)
}
}

private val parseExpression = PrivateMethod[Long]('parseExpression)

test("parse sql expression for duration in microseconds - string") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,22 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with B
)
}

test("SPARK-21590: tumbling window using negative start time") {
val df = Seq(
("2016-03-27 19:39:30", 1, "a"),
("2016-03-27 19:39:25", 2, "a")).toDF("time", "value", "id")

checkAnswer(
df.groupBy(window($"time", "10 seconds", "10 seconds", "-5 seconds"))
.agg(count("*").as("counts"))
.orderBy($"window.start".asc)
.select($"window.start".cast("string"), $"window.end".cast("string"), $"counts"),
Seq(
Row("2016-03-27 19:39:25", "2016-03-27 19:39:35", 2)
)
)
}

test("tumbling window groupBy statement") {
val df = Seq(
("2016-03-27 19:39:34", 1, "a"),
Expand Down Expand Up @@ -72,6 +88,20 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with B
Seq(Row(1), Row(1), Row(1)))
}

test("SPARK-21590: tumbling window groupBy statement with negative startTime") {
val df = Seq(
("2016-03-27 19:39:34", 1, "a"),
("2016-03-27 19:39:56", 2, "a"),
("2016-03-27 19:39:27", 4, "b")).toDF("time", "value", "id")

checkAnswer(
df.groupBy(window($"time", "10 seconds", "10 seconds", "-5 seconds"), $"id")
.agg(count("*").as("counts"))
.orderBy($"window.start".asc)
.select("counts"),
Seq(Row(1), Row(1), Row(1)))
}

test("tumbling window with multi-column projection") {
val df = Seq(
("2016-03-27 19:39:34", 1, "a"),
Expand Down Expand Up @@ -309,4 +339,19 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with B
)
}
}

test("SPARK-21590: time window in SQL with three expressions including negative start time") {
withTempTable { table =>
checkAnswer(
spark.sql(
s"""select window(time, "10 seconds", 10000000, "-5 seconds"), value from $table""")
.select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
Seq(
Row("2016-03-27 19:39:25", "2016-03-27 19:39:35", 1),
Row("2016-03-27 19:39:25", "2016-03-27 19:39:35", 4),
Row("2016-03-27 19:39:55", "2016-03-27 19:40:05", 2)
)
)
}
}
}