Skip to content

Commit 9fb6b83

Browse files
adrian-wangrxin
authored andcommitted
[SPARK-8192] [SPARK-8193] [SQL] udf current_date, current_timestamp
Author: Daoyuan Wang <[email protected]> Closes #6985 from adrian-wang/udfcurrent and squashes the following commits: 6a20b64 [Daoyuan Wang] remove codegen and add lazy in testsuite 27c9f95 [Daoyuan Wang] refine tests.. e11ae75 [Daoyuan Wang] refine tests 61ed3d5 [Daoyuan Wang] add in functions 98e8550 [Daoyuan Wang] fix sytle 427d9dc [Daoyuan Wang] add tests and codegen 0b69a1f [Daoyuan Wang] udf current
1 parent 4a22bce commit 9fb6b83

File tree

5 files changed

+159
-1
lines changed

5 files changed

+159
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,11 @@ object FunctionRegistry {
163163
expression[Substring]("substring"),
164164
expression[Upper]("ucase"),
165165
expression[UnHex]("unhex"),
166-
expression[Upper]("upper")
166+
expression[Upper]("upper"),
167+
168+
// datetime functions
169+
expression[CurrentDate]("current_date"),
170+
expression[CurrentTimestamp]("current_timestamp")
167171
)
168172

169173
val builtin: FunctionRegistry = {
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.expressions
19+
20+
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
21+
import org.apache.spark.sql.catalyst.util.DateTimeUtils
22+
import org.apache.spark.sql.types._
23+
24+
/**
25+
* Returns the current date at the start of query evaluation.
26+
* All calls of current_date within the same query return the same value.
27+
*/
28+
case class CurrentDate() extends LeafExpression {
29+
override def foldable: Boolean = true
30+
override def nullable: Boolean = false
31+
32+
override def dataType: DataType = DateType
33+
34+
override def eval(input: InternalRow): Any = {
35+
DateTimeUtils.millisToDays(System.currentTimeMillis())
36+
}
37+
}
38+
39+
/**
40+
* Returns the current timestamp at the start of query evaluation.
41+
* All calls of current_timestamp within the same query return the same value.
42+
*/
43+
case class CurrentTimestamp() extends LeafExpression {
44+
override def foldable: Boolean = true
45+
override def nullable: Boolean = false
46+
47+
override def dataType: DataType = TimestampType
48+
49+
override def eval(input: InternalRow): Any = {
50+
System.currentTimeMillis() * 10000L
51+
}
52+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.expressions
19+
20+
import org.apache.spark.SparkFunSuite
21+
import org.apache.spark.sql.catalyst.util.DateTimeUtils
22+
23+
class DatetimeFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
24+
test("datetime function current_date") {
25+
val d0 = DateTimeUtils.millisToDays(System.currentTimeMillis())
26+
val cd = CurrentDate().eval(EmptyRow).asInstanceOf[Int]
27+
val d1 = DateTimeUtils.millisToDays(System.currentTimeMillis())
28+
assert(d0 <= cd && cd <= d1 && d1 - d0 <= 1)
29+
}
30+
31+
test("datetime function current_timestamp") {
32+
val ct = DateTimeUtils.toJavaTimestamp(CurrentTimestamp().eval(EmptyRow).asInstanceOf[Long])
33+
val t1 = System.currentTimeMillis()
34+
assert(math.abs(t1 - ct.getTime) < 5000)
35+
}
36+
37+
}

sql/core/src/main/scala/org/apache/spark/sql/functions.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.spark.util.Utils
3535
*
3636
* @groupname udf_funcs UDF functions
3737
* @groupname agg_funcs Aggregate functions
38+
* @groupname datetime_funcs Date time functions
3839
* @groupname sort_funcs Sorting functions
3940
* @groupname normal_funcs Non-aggregate functions
4041
* @groupname math_funcs Math functions
@@ -991,6 +992,22 @@ object functions {
991992
*/
992993
def cosh(columnName: String): Column = cosh(Column(columnName))
993994

995+
/**
996+
* Returns the current date.
997+
*
998+
* @group datetime_funcs
999+
* @since 1.5.0
1000+
*/
1001+
def current_date(): Column = CurrentDate()
1002+
1003+
/**
1004+
* Returns the current timestamp.
1005+
*
1006+
* @group datetime_funcs
1007+
* @since 1.5.0
1008+
*/
1009+
def current_timestamp(): Column = CurrentTimestamp()
1010+
9941011
/**
9951012
* Computes the exponential of the given value.
9961013
*
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql
19+
20+
import org.apache.spark.sql.catalyst.util.DateTimeUtils
21+
import org.apache.spark.sql.functions._
22+
23+
class DatetimeExpressionsSuite extends QueryTest {
24+
private lazy val ctx = org.apache.spark.sql.test.TestSQLContext
25+
26+
import ctx.implicits._
27+
28+
lazy val df1 = Seq((1, 2), (3, 1)).toDF("a", "b")
29+
30+
test("function current_date") {
31+
val d0 = DateTimeUtils.millisToDays(System.currentTimeMillis())
32+
val d1 = DateTimeUtils.fromJavaDate(df1.select(current_date()).collect().head.getDate(0))
33+
val d2 = DateTimeUtils.fromJavaDate(
34+
ctx.sql("""SELECT CURRENT_DATE()""").collect().head.getDate(0))
35+
val d3 = DateTimeUtils.millisToDays(System.currentTimeMillis())
36+
assert(d0 <= d1 && d1 <= d2 && d2 <= d3 && d3 - d0 <= 1)
37+
}
38+
39+
test("function current_timestamp") {
40+
checkAnswer(df1.select(countDistinct(current_timestamp())), Row(1))
41+
// Execution in one query should return the same value
42+
checkAnswer(ctx.sql("""SELECT CURRENT_TIMESTAMP() = CURRENT_TIMESTAMP()"""),
43+
Row(true))
44+
assert(math.abs(ctx.sql("""SELECT CURRENT_TIMESTAMP()""").collect().head.getTimestamp(
45+
0).getTime - System.currentTimeMillis()) < 5000)
46+
}
47+
48+
}

0 commit comments

Comments
 (0)