From 9aa1829a5c6b5b0d7646c241828e6f09041a1c77 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Sat, 16 May 2020 19:33:08 +0200 Subject: [PATCH 1/2] [SPARK-31735][CORE] Include date/timestamp in the summary report Currently dates are missing from the export: from datetime import datetime, timedelta, timezone from pyspark.sql import types as T from pyspark.sql import Row from pyspark.sql import functions as F START = datetime(2014, 1, 1, tzinfo=timezone.utc) n_days = 22 date_range = [Row(date=(START + timedelta(days=n))) for n in range(0, n_days)] schema = T.StructType([T.StructField(name="date", dataType=T.DateType(), nullable=False)]) rdd = spark.sparkContext.parallelize(date_range) df = spark.createDataFrame(data=rdd, schema=schema) df.agg(F.max("date")).show() df.summary().show() +-------+ |summary| +-------+ | count| | mean| | stddev| | min| | 25%| | 50%| | 75%| | max| +-------+ Would be nice to include these as well Signed-off-by: Fokko Driesprong --- .../org/apache/spark/sql/execution/stat/StatFunctions.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index 5094e5eab5955..69439ca2f30ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -264,7 +264,10 @@ object StatFunctions extends Logging { } val selectedCols = ds.logicalPlan.output - .filter(a => a.dataType.isInstanceOf[NumericType] || a.dataType.isInstanceOf[StringType]) + .filter(a => a.dataType.isInstanceOf[NumericType] + || a.dataType.isInstanceOf[StringType] + || a.dataType.isInstanceOf[DateType] + || a.dataType.isInstanceOf[TimestampType]) val aggExprs = statisticFns.flatMap { func => selectedCols.map(c => Column(Cast(func(c), StringType)).as(c.name)) From 51c3df1ecd3185478080041a560876b819456499 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 3 Jun 2020 14:32:56 +0200 Subject: [PATCH 2/2] Add a test --- .../org/apache/spark/sql/DataFrameSuite.scala | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 8359dff674a87..330cfb15fe6bf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -832,30 +832,30 @@ class DataFrameSuite extends QueryTest } private lazy val person2: DataFrame = Seq( - ("Bob", 16, 176), - ("Alice", 32, 164), - ("David", 60, 192), - ("Amy", 24, 180)).toDF("name", "age", "height") + ("Bob", 16, 176, new Date(2020, 1, 1)), + ("Alice", 32, 164, new Date(2020, 1, 5)), + ("David", 60, 192, new Date(2020, 1, 19)), + ("Amy", 24, 180, new Date(2020, 1, 25))).toDF("name", "age", "height", "birthday") test("describe") { val describeResult = Seq( - Row("count", "4", "4", "4"), - Row("mean", null, "33.0", "178.0"), - Row("stddev", null, "19.148542155126762", "11.547005383792516"), - Row("min", "Alice", "16", "164"), - Row("max", "David", "60", "192")) + Row("count", "4", "4", "4", "4"), + Row("mean", null, "33.0", "178.0", "2020-1-25"), + Row("stddev", null, "19.148542155126762", "11.547005383792516", null), + Row("min", "Alice", "16", "164", "2020-1-1"), + Row("max", "David", "60", "192", "2020-1-25")) val emptyDescribeResult = Seq( - Row("count", "0", "0", "0"), - Row("mean", null, null, null), - Row("stddev", null, null, null), - Row("min", null, null, null), - Row("max", null, null, null)) + Row("count", "0", "0", "0", "0"), + Row("mean", null, null, null, null), + Row("stddev", null, null, null, null), + Row("min", null, null, null, null), + Row("max", null, null, null, null)) def getSchemaAsSeq(df: DataFrame): Seq[String] = df.schema.map(_.name) val describeAllCols = person2.describe() - assert(getSchemaAsSeq(describeAllCols) === Seq("summary", "name", "age", "height")) + assert(getSchemaAsSeq(describeAllCols) === Seq("summary", "name", "age", "height", "birthday")) checkAnswer(describeAllCols, describeResult) // All aggregate value should have been cast to string describeAllCols.collect().foreach { row => @@ -875,7 +875,7 @@ class DataFrameSuite extends QueryTest checkAnswer(describeNoCol, describeResult.map { case Row(s, _, _, _) => Row(s)} ) val emptyDescription = person2.limit(0).describe() - assert(getSchemaAsSeq(emptyDescription) === Seq("summary", "name", "age", "height")) + assert(getSchemaAsSeq(emptyDescription) === Seq("summary", "name", "age", "height", "birthday")) checkAnswer(emptyDescription, emptyDescribeResult) }