Skip to content

Commit 14a8b37

Browse files
committed
function unix_timestamp, from_unixtime
1 parent 3bc7055 commit 14a8b37

File tree

5 files changed

+215
-3
lines changed

5 files changed

+215
-3
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,13 +218,15 @@ object FunctionRegistry {
218218
expression[DayOfMonth]("day"),
219219
expression[DayOfYear]("dayofyear"),
220220
expression[DayOfMonth]("dayofmonth"),
221+
expression[FromUnixTime]("from_unixtime"),
221222
expression[Hour]("hour"),
222223
expression[LastDay]("last_day"),
223224
expression[Minute]("minute"),
224225
expression[Month]("month"),
225226
expression[NextDay]("next_day"),
226227
expression[Quarter]("quarter"),
227228
expression[Second]("second"),
229+
expression[UnixTimestamp]("unix_timestamp"),
228230
expression[WeekOfYear]("weekofyear"),
229231
expression[Year]("year"),
230232

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala

Lines changed: 102 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark.sql.catalyst.expressions
1919

20-
import java.sql.Date
2120
import java.text.SimpleDateFormat
2221
import java.util.{Calendar, TimeZone}
2322

@@ -28,6 +27,8 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
2827
import org.apache.spark.sql.types._
2928
import org.apache.spark.unsafe.types.UTF8String
3029

30+
import scala.util.Try
31+
3132
/**
3233
* Returns the current date at the start of query evaluation.
3334
* All calls of current_date within the same query return the same value.
@@ -254,18 +255,116 @@ case class DateFormatClass(left: Expression, right: Expression) extends BinaryEx
254255

255256
override protected def nullSafeEval(timestamp: Any, format: Any): Any = {
256257
val sdf = new SimpleDateFormat(format.toString)
257-
UTF8String.fromString(sdf.format(new Date(timestamp.asInstanceOf[Long] / 1000)))
258+
UTF8String.fromString(sdf.format(new java.util.Date(timestamp.asInstanceOf[Long] / 1000)))
258259
}
259260

260261
override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
261262
val sdf = classOf[SimpleDateFormat].getName
262263
defineCodeGen(ctx, ev, (timestamp, format) => {
263264
s"""UTF8String.fromString((new $sdf($format.toString()))
264-
.format(new java.sql.Date($timestamp / 1000)))"""
265+
.format(new java.sql.Timestamp($timestamp / 1000)))"""
265266
})
266267
}
267268
}
268269

270+
/**
271+
* Convert time string with given pattern
272+
* (see [http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html])
273+
* to Unix time stamp (in seconds), return null if fail.
274+
* If the second parameter is missing, use "yyyy-MM-dd HH:mm:ss".
275+
* If no parameters provided, the first parameter will be current_timestamp.
276+
* If the first parameter is a Date or Timestamp instead of String, we will ignore the
277+
* second parameter.
278+
*/
279+
case class UnixTimestamp(nodes: Seq[Expression])
280+
extends BinaryExpression with ExpectsInputTypes {
281+
282+
override def children: Seq[Expression] = left :: right :: Nil
283+
override def left: Expression =
284+
if (childrenLength == 0) CurrentTimestamp() else nodes.head
285+
override def right: Expression =
286+
if (childrenLength > 1) nodes(1) else Literal("yyyy-MM-dd HH:mm:ss")
287+
288+
private lazy val childrenLength = nodes.length
289+
290+
override def inputTypes: Seq[AbstractDataType] =
291+
Seq(TypeCollection(StringType, DateType, TimestampType), StringType)
292+
293+
override def dataType: DataType = LongType
294+
295+
override def nullSafeEval(time: Any, format: Any): Any = {
296+
left.dataType match {
297+
case DateType =>
298+
DateTimeUtils.daysToMillis(time.asInstanceOf[Int]) / 1000L
299+
case TimestampType =>
300+
time.asInstanceOf[Long] / 1000000L
301+
case StringType =>
302+
val formatString = format.asInstanceOf[UTF8String].toString
303+
val sdf = new SimpleDateFormat(formatString)
304+
Try(sdf.parse(time.asInstanceOf[UTF8String].toString).getTime / 1000L).getOrElse(null)
305+
}
306+
}
307+
308+
override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
309+
left.dataType match {
310+
case StringType =>
311+
val sdf = classOf[SimpleDateFormat].getName
312+
nullSafeCodeGen(ctx, ev, (string, format) => {
313+
s"""
314+
try {
315+
${ev.primitive} =
316+
(new $sdf($format.toString())).parse($string.toString()).getTime() / 1000L;
317+
} catch (java.lang.Throwable e) {
318+
${ev.isNull} = true;
319+
}
320+
"""
321+
})
322+
case TimestampType =>
323+
defineCodeGen(ctx, ev, (timestamp, format) => {
324+
s"""$timestamp / 1000000L"""
325+
})
326+
case DateType =>
327+
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
328+
defineCodeGen(ctx, ev, (date, format) => {
329+
s"""$dtu.daysToMillis($date) / 1000L"""
330+
})
331+
}
332+
}
333+
}
334+
335+
/**
336+
* Converts the number of seconds from unix epoch (1970-01-01 00:00:00 UTC) to a string
337+
* representing the timestamp of that moment in the current system time zone in the given
338+
* format. If the format is missing, using format like "1970-01-01 00:00:00".
339+
*/
340+
case class FromUnixTime(nodes: Seq[Expression])
341+
extends BinaryExpression with ImplicitCastInputTypes {
342+
343+
assert(nodes.length <= 2 && nodes.nonEmpty)
344+
override def children: Seq[Expression] = left :: right :: Nil
345+
override def left: Expression = nodes.head
346+
override def right: Expression =
347+
if (nodes.length > 1) nodes(1) else Literal("yyyy-MM-dd HH:mm:ss")
348+
349+
override def dataType: DataType = StringType
350+
351+
override def inputTypes: Seq[AbstractDataType] = Seq(LongType, StringType)
352+
353+
override protected def nullSafeEval(time: Any, format: Any): Any = {
354+
val sdf = new SimpleDateFormat(format.toString)
355+
UTF8String.fromString(sdf.format(new java.util.Date(time.asInstanceOf[Long] * 1000L)))
356+
}
357+
358+
override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
359+
val sdf = classOf[SimpleDateFormat].getName
360+
defineCodeGen(ctx, ev, (seconds, format) => {
361+
s"""UTF8String.fromString((new $sdf($format.toString())).format(
362+
new java.sql.Timestamp($seconds * 1000L)))""".stripMargin
363+
})
364+
}
365+
366+
}
367+
269368
/**
270369
* Returns the last day of the month which the date belongs to.
271370
*/

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.text.SimpleDateFormat
2222
import java.util.Calendar
2323

2424
import org.apache.spark.SparkFunSuite
25+
import org.apache.spark.sql.catalyst.InternalRow
2526
import org.apache.spark.sql.catalyst.util.DateTimeUtils
2627
import org.apache.spark.sql.types.{StringType, TimestampType, DateType}
2728

@@ -274,4 +275,38 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
274275
NextDay(Literal(Date.valueOf("2015-07-23")), Literal("th")),
275276
DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-30")))
276277
}
278+
279+
test("from_unixtime") {
280+
val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
281+
val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
282+
val sdf2 = new SimpleDateFormat(fmt2)
283+
checkEvaluation(FromUnixTime(Seq(Literal(0L))), sdf1.format(new Timestamp(0)))
284+
checkEvaluation(FromUnixTime(Seq(Literal(1000L))), sdf1.format(new Timestamp(1000000)))
285+
checkEvaluation(
286+
FromUnixTime(Seq(Literal(-1000L), Literal(fmt2))), sdf2.format(new Timestamp(-1000000)))
287+
}
288+
289+
test("unix_timestamp") {
290+
val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
291+
val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
292+
val sdf2 = new SimpleDateFormat(fmt2)
293+
val fmt3 = "yy-MM-dd"
294+
val sdf3 = new SimpleDateFormat(fmt3)
295+
val date1 = Date.valueOf("2015-07-24")
296+
checkEvaluation(UnixTimestamp(Seq(Literal(sdf1.format(new Timestamp(0))))), 0L)
297+
checkEvaluation(UnixTimestamp(Seq(Literal(sdf1.format(new Timestamp(1000000))))), 1000L)
298+
checkEvaluation(UnixTimestamp(Seq(Literal(new Timestamp(1000000)))), 1000L)
299+
checkEvaluation(
300+
UnixTimestamp(Seq(Literal(date1))),
301+
DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1)) / 1000L)
302+
checkEvaluation(
303+
UnixTimestamp(Seq(Literal(sdf2.format(new Timestamp(-1000000))), Literal(fmt2))), -1000L)
304+
checkEvaluation(UnixTimestamp(
305+
Seq(Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3))),
306+
DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24"))) / 1000L)
307+
val t1 = UnixTimestamp(Seq()).eval(InternalRow.empty).asInstanceOf[Long]
308+
val t2 = UnixTimestamp(Seq()).eval(InternalRow.empty).asInstanceOf[Long]
309+
assert(t2 - t1 <= 1)
310+
}
311+
277312
}

sql/core/src/main/scala/org/apache/spark/sql/functions.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2091,6 +2091,44 @@ object functions {
20912091
*/
20922092
def weekofyear(columnName: String): Column = weekofyear(Column(columnName))
20932093

2094+
/**
2095+
* Gets current Unix timestamp in seconds.
2096+
* @group datetime_funcs
2097+
* @since 1.5.0
2098+
*/
2099+
def from_unixtime(ut: Column): Column = FromUnixTime(Seq(ut.expr))
2100+
2101+
/**
2102+
* Gets current Unix timestamp in seconds.
2103+
* @group datetime_funcs
2104+
* @since 1.5.0
2105+
*/
2106+
def from_unixtime(ut: Column, f: Column): Column = FromUnixTime(Seq(ut.expr, f.expr))
2107+
2108+
/**
2109+
* Gets current Unix timestamp in seconds.
2110+
* @group datetime_funcs
2111+
* @since 1.5.0
2112+
*/
2113+
def unix_timestamp(): Column = UnixTimestamp(Nil: Seq[Expression])
2114+
2115+
/**
2116+
* Converts time string in format yyyy-MM-dd HH:mm:ss to Unix timestamp (in seconds),
2117+
* using the default timezone and the default locale, return null if fail.
2118+
* @group datetime_funcs
2119+
* @since 1.5.0
2120+
*/
2121+
def unix_timestamp(s: Column): Column = UnixTimestamp(Seq(s.expr))
2122+
2123+
/**
2124+
* Convert time string with given pattern
2125+
* (see [http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html])
2126+
* to Unix time stamp (in seconds), return null if fail.
2127+
* @group datetime_funcs
2128+
* @since 1.5.0
2129+
*/
2130+
def unix_timestamp(s: Column, p: Column): Column = UnixTimestamp(Seq(s.expr, p.expr))
2131+
20942132
//////////////////////////////////////////////////////////////////////////////////////////////
20952133
// Collection functions
20962134
//////////////////////////////////////////////////////////////////////////////////////////////

sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,4 +206,42 @@ class DateFunctionsSuite extends QueryTest {
206206
Seq(Row(Date.valueOf("2015-07-30")), Row(Date.valueOf("2015-07-30"))))
207207
}
208208

209+
test("from_unixtime") {
210+
val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
211+
val sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
212+
val fmt3 = "yy-MM-dd HH-mm-ss"
213+
val sdf3 = new SimpleDateFormat(fmt3)
214+
val df = Seq((1000, "yyyy-MM-dd HH:mm:ss.SSS"), (-1000, "yy-MM-dd HH-mm-ss")).toDF("a", "b")
215+
checkAnswer(
216+
df.select(from_unixtime(col("a"))),
217+
Seq(Row(sdf1.format(new Timestamp(1000000))), Row(sdf1.format(new Timestamp(-1000000)))))
218+
checkAnswer(
219+
df.select(from_unixtime(col("a"), col("b"))),
220+
Seq(Row(sdf2.format(new Timestamp(1000000))), Row(sdf3.format(new Timestamp(-1000000)))))
221+
checkAnswer(
222+
df.select(from_unixtime(col("a"), lit(fmt3))),
223+
Seq(Row(sdf3.format(new Timestamp(1000000))), Row(sdf3.format(new Timestamp(-1000000)))))
224+
}
225+
226+
test("unix_timestamp") {
227+
val date1 = Date.valueOf("2015-07-24")
228+
val date2 = Date.valueOf("2015-07-25")
229+
val ts1 = Timestamp.valueOf("2015-07-24 10:00:00.3")
230+
val ts2 = Timestamp.valueOf("2015-07-25 02:02:02.2")
231+
val s1 = "2015/07/24 10:00:00.5"
232+
val s2 = "2015/07/25 02:02:02.6"
233+
val ss1 = "2015-07-24 10:00:00"
234+
val ss2 = "2015-07-25 02:02:02"
235+
val fmt = "yyyy/MM/dd HH:mm:ss.S"
236+
val df = Seq((date1, ts1, s1, ss1), (date2, ts2, s2, ss2)).toDF("d", "ts", "s", "ss")
237+
checkAnswer(df.select(unix_timestamp(col("ts"))), Seq(
238+
Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L)))
239+
checkAnswer(df.select(unix_timestamp(col("ss"))), Seq(
240+
Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L)))
241+
checkAnswer(df.select(unix_timestamp(col("d"), lit(fmt))), Seq(
242+
Row(date1.getTime / 1000L), Row(date2.getTime / 1000L)))
243+
checkAnswer(df.select(unix_timestamp(col("s"), lit(fmt))), Seq(
244+
Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L)))
245+
}
246+
209247
}

0 commit comments

Comments
 (0)