From 1e1ad1970a8bf3d9076165074f18ee7f28ab4acd Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 8 Dec 2015 20:08:17 -0800 Subject: [PATCH 1/5] show decoded values. --- .../org/apache/spark/sql/DataFrame.scala | 57 ++--------------- .../scala/org/apache/spark/sql/Dataset.scala | 36 ++++++++++- .../spark/sql/execution/Queryable.scala | 64 +++++++++++++++++++ 3 files changed, 105 insertions(+), 52 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 243a8c853f90e..8702e4cdbdaa0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -161,17 +161,15 @@ class DataFrame private[sql]( } /** - * Compose the string representing rows for output - * @param _numRows Number of rows to show - * @param truncate Whether truncate long strings and align cells right - */ - private[sql] def showString(_numRows: Int, truncate: Boolean = true): String = { + * Compose the string representing rows for output + * @param _numRows Number of rows to show + * @param truncate Whether truncate long strings and align cells right + */ + override private[sql] def showString(_numRows: Int, truncate: Boolean = true): String = { val numRows = _numRows.max(0) - val sb = new StringBuilder val takeResult = take(numRows + 1) val hasMoreData = takeResult.length > numRows val data = takeResult.take(numRows) - val numCols = schema.fieldNames.length // For array values, replace Seq and Array with square brackets // For cells that are beyond 20 characters, replace it with the first 17 and "..." @@ -187,50 +185,7 @@ class DataFrame private[sql]( }: Seq[String] } - // Initialise the width of each column to a minimum value of '3' - val colWidths = Array.fill(numCols)(3) - - // Compute the width of each column - for (row <- rows) { - for ((cell, i) <- row.zipWithIndex) { - colWidths(i) = math.max(colWidths(i), cell.length) - } - } - - // Create SeparateLine - val sep: String = colWidths.map("-" * _).addString(sb, "+", "+", "+\n").toString() - - // column names - rows.head.zipWithIndex.map { case (cell, i) => - if (truncate) { - StringUtils.leftPad(cell, colWidths(i)) - } else { - StringUtils.rightPad(cell, colWidths(i)) - } - }.addString(sb, "|", "|", "|\n") - - sb.append(sep) - - // data - rows.tail.map { - _.zipWithIndex.map { case (cell, i) => - if (truncate) { - StringUtils.leftPad(cell.toString, colWidths(i)) - } else { - StringUtils.rightPad(cell.toString, colWidths(i)) - } - }.addString(sb, "|", "|", "|\n") - } - - sb.append(sep) - - // For Data that has more than "numRows" records - if (hasMoreData) { - val rowsString = if (numRows == 1) "row" else "rows" - sb.append(s"only showing top $numRows $rowsString\n") - } - - sb.toString() + formatString ( rows, numRows, hasMoreData, truncate ) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 3bd18a14f9e8f..2badcfd55452e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -225,7 +225,41 @@ class Dataset[T] private[sql]( * * @since 1.6.0 */ - def show(numRows: Int, truncate: Boolean): Unit = toDF().show(numRows, truncate) + // scalastyle:off println + def show(numRows: Int, truncate: Boolean): Unit = println(showString(numRows, truncate)) + // scalastyle:on println + + /** + * Compose the string representing rows for output + * @param _numRows Number of rows to show + * @param truncate Whether truncate long strings and align cells right + */ + override private[sql] def showString(_numRows: Int, truncate: Boolean = true): String = { + val numRows = _numRows.max(0) + val takeResult = take(numRows + 1) + val hasMoreData = takeResult.length > numRows + val data = takeResult.take(numRows) + + // For array values, replace Seq and Array with square brackets + // For cells that are beyond 20 characters, replace it with the first 17 and "..." + val rows: Seq[Seq[String]] = schema.fieldNames.toSeq +: (data.map { + case r: Row => r + case tuple: Product => Row.fromTuple(tuple) + case o => Row(o) + } map { row => + row.toSeq.map { cell => + val str = cell match { + case null => "null" + case array: Array[_] => array.mkString("[", ", ", "]") + case seq: Seq[_] => seq.mkString("[", ", ", "]") + case _ => cell.toString + } + if (truncate && str.length > 20) str.substring(0, 17) + "..." else str + }: Seq[String] + }) + + formatString ( rows, numRows, hasMoreData, truncate ) + } /** * Returns a new [[Dataset]] that has exactly `numPartitions` partitions. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Queryable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Queryable.scala index f2f5997d1b7c6..668acef02ab3f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Queryable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Queryable.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution import scala.util.control.NonFatal +import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.SQLContext import org.apache.spark.sql.types.StructType @@ -42,4 +43,67 @@ private[sql] trait Queryable { def explain(extended: Boolean): Unit def explain(): Unit + + private[sql] def showString(_numRows: Int, truncate: Boolean = true): String + + /** + * Format the string representing rows for output + * @param rows The rows to show + * @param numRows Number of rows to show + * @param hasMoreData Whether some rows are not shown due to the limit + * @param truncate Whether truncate long strings and align cells right + * + */ + private[sql] def formatString (rows: Seq[Seq[String]], + numRows: Int, + hasMoreData : Boolean, + truncate: Boolean = true): String = { + val sb = new StringBuilder + val numCols = schema.fieldNames.length + + // Initialise the width of each column to a minimum value of '3' + val colWidths = Array.fill(numCols)(3) + + // Compute the width of each column + for (row <- rows) { + for ((cell, i) <- row.zipWithIndex) { + colWidths(i) = math.max(colWidths(i), cell.length) + } + } + + // Create SeparateLine + val sep: String = colWidths.map("-" * _).addString(sb, "+", "+", "+\n").toString() + + // column names + rows.head.zipWithIndex.map { case (cell, i) => + if (truncate) { + StringUtils.leftPad(cell, colWidths(i)) + } else { + StringUtils.rightPad(cell, colWidths(i)) + } + }.addString(sb, "|", "|", "|\n") + + sb.append(sep) + + // data + rows.tail.map { + _.zipWithIndex.map { case (cell, i) => + if (truncate) { + StringUtils.leftPad(cell.toString, colWidths(i)) + } else { + StringUtils.rightPad(cell.toString, colWidths(i)) + } + }.addString(sb, "|", "|", "|\n") + } + + sb.append(sep) + + // For Data that has more than "numRows" records + if (hasMoreData) { + val rowsString = if (numRows == 1) "row" else "rows" + sb.append(s"only showing top $numRows $rowsString\n") + } + + sb.toString() + } } From 48e049b843582641103ad5befc2f6d3355d3bab8 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 11 Dec 2015 13:52:25 -0800 Subject: [PATCH 2/5] fixed the indenting issue. --- .../src/main/scala/org/apache/spark/sql/DataFrame.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 8702e4cdbdaa0..94ce4bb0adb81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -161,10 +161,10 @@ class DataFrame private[sql]( } /** - * Compose the string representing rows for output - * @param _numRows Number of rows to show - * @param truncate Whether truncate long strings and align cells right - */ + * Compose the string representing rows for output + * @param _numRows Number of rows to show + * @param truncate Whether truncate long strings and align cells right + */ override private[sql] def showString(_numRows: Int, truncate: Boolean = true): String = { val numRows = _numRows.max(0) val takeResult = take(numRows + 1) From ad295d3a4f5e11add109fb8379fca29b41714aa3 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 14 Dec 2015 17:41:03 -0800 Subject: [PATCH 3/5] fixing indenting issues. --- .../scala/org/apache/spark/sql/Dataset.scala | 66 +++++++++---------- .../spark/sql/execution/Queryable.scala | 23 +++---- 2 files changed, 45 insertions(+), 44 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 2badcfd55452e..8392a23a5b076 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -230,10 +230,10 @@ class Dataset[T] private[sql]( // scalastyle:on println /** - * Compose the string representing rows for output - * @param _numRows Number of rows to show - * @param truncate Whether truncate long strings and align cells right - */ + * Compose the string representing rows for output + * @param _numRows Number of rows to show + * @param truncate Whether truncate long strings and align cells right + */ override private[sql] def showString(_numRows: Int, truncate: Boolean = true): String = { val numRows = _numRows.max(0) val takeResult = take(numRows + 1) @@ -262,20 +262,20 @@ class Dataset[T] private[sql]( } /** - * Returns a new [[Dataset]] that has exactly `numPartitions` partitions. - * @since 1.6.0 - */ + * Returns a new [[Dataset]] that has exactly `numPartitions` partitions. + * @since 1.6.0 + */ def repartition(numPartitions: Int): Dataset[T] = withPlan { Repartition(numPartitions, shuffle = true, _) } /** - * Returns a new [[Dataset]] that has exactly `numPartitions` partitions. - * Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g. - * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of - * the 100 new partitions will claim 10 of the current partitions. - * @since 1.6.0 - */ + * Returns a new [[Dataset]] that has exactly `numPartitions` partitions. + * Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g. + * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of + * the 100 new partitions will claim 10 of the current partitions. + * @since 1.6.0 + */ def coalesce(numPartitions: Int): Dataset[T] = withPlan { Repartition(numPartitions, shuffle = false, _) } @@ -732,47 +732,47 @@ class Dataset[T] private[sql]( def takeAsList(num: Int): java.util.List[T] = java.util.Arrays.asList(take(num) : _*) /** - * Persist this [[Dataset]] with the default storage level (`MEMORY_AND_DISK`). - * @since 1.6.0 - */ + * Persist this [[Dataset]] with the default storage level (`MEMORY_AND_DISK`). + * @since 1.6.0 + */ def persist(): this.type = { sqlContext.cacheManager.cacheQuery(this) this } /** - * Persist this [[Dataset]] with the default storage level (`MEMORY_AND_DISK`). - * @since 1.6.0 - */ + * Persist this [[Dataset]] with the default storage level (`MEMORY_AND_DISK`). + * @since 1.6.0 + */ def cache(): this.type = persist() /** - * Persist this [[Dataset]] with the given storage level. - * @param newLevel One of: `MEMORY_ONLY`, `MEMORY_AND_DISK`, `MEMORY_ONLY_SER`, - * `MEMORY_AND_DISK_SER`, `DISK_ONLY`, `MEMORY_ONLY_2`, - * `MEMORY_AND_DISK_2`, etc. - * @group basic - * @since 1.6.0 - */ + * Persist this [[Dataset]] with the given storage level. + * @param newLevel One of: `MEMORY_ONLY`, `MEMORY_AND_DISK`, `MEMORY_ONLY_SER`, + * `MEMORY_AND_DISK_SER`, `DISK_ONLY`, `MEMORY_ONLY_2`, + * `MEMORY_AND_DISK_2`, etc. + * @group basic + * @since 1.6.0 + */ def persist(newLevel: StorageLevel): this.type = { sqlContext.cacheManager.cacheQuery(this, None, newLevel) this } /** - * Mark the [[Dataset]] as non-persistent, and remove all blocks for it from memory and disk. - * @param blocking Whether to block until all blocks are deleted. - * @since 1.6.0 - */ + * Mark the [[Dataset]] as non-persistent, and remove all blocks for it from memory and disk. + * @param blocking Whether to block until all blocks are deleted. + * @since 1.6.0 + */ def unpersist(blocking: Boolean): this.type = { sqlContext.cacheManager.tryUncacheQuery(this, blocking) this } /** - * Mark the [[Dataset]] as non-persistent, and remove all blocks for it from memory and disk. - * @since 1.6.0 - */ + * Mark the [[Dataset]] as non-persistent, and remove all blocks for it from memory and disk. + * @since 1.6.0 + */ def unpersist(): this.type = unpersist(blocking = false) /* ******************** * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Queryable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Queryable.scala index 668acef02ab3f..b397d42612cf0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Queryable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Queryable.scala @@ -47,17 +47,18 @@ private[sql] trait Queryable { private[sql] def showString(_numRows: Int, truncate: Boolean = true): String /** - * Format the string representing rows for output - * @param rows The rows to show - * @param numRows Number of rows to show - * @param hasMoreData Whether some rows are not shown due to the limit - * @param truncate Whether truncate long strings and align cells right - * - */ - private[sql] def formatString (rows: Seq[Seq[String]], - numRows: Int, - hasMoreData : Boolean, - truncate: Boolean = true): String = { + * Format the string representing rows for output + * @param rows The rows to show + * @param numRows Number of rows to show + * @param hasMoreData Whether some rows are not shown due to the limit + * @param truncate Whether truncate long strings and align cells right + * + */ + private[sql] def formatString ( + rows: Seq[Seq[String]], + numRows: Int, + hasMoreData : Boolean, + truncate: Boolean = true): String = { val sb = new StringBuilder val numCols = schema.fieldNames.length From 1c3344169d9c17239a807ccfb550239823322612 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 14 Dec 2015 22:44:19 -0800 Subject: [PATCH 4/5] added test cases. --- .../scala/org/apache/spark/sql/DataFrame.scala | 1 + .../main/scala/org/apache/spark/sql/Dataset.scala | 1 + .../org/apache/spark/sql/DataFrameSuite.scala | 15 +++++++++++++++ .../scala/org/apache/spark/sql/DatasetSuite.scala | 14 ++++++++++++++ 4 files changed, 31 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 94ce4bb0adb81..2bcc499726fc2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -177,6 +177,7 @@ class DataFrame private[sql]( row.toSeq.map { cell => val str = cell match { case null => "null" + case binary: Array[Byte] => binary.map("%02X".format(_)).mkString("[", " ", "]") case array: Array[_] => array.mkString("[", ", ", "]") case seq: Seq[_] => seq.mkString("[", ", ", "]") case _ => cell.toString diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 8392a23a5b076..3d7915d62ebeb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -250,6 +250,7 @@ class Dataset[T] private[sql]( row.toSeq.map { cell => val str = cell match { case null => "null" + case binary: Array[Byte] => binary.map("%02X".format(_)).mkString("[", " ", "]") case array: Array[_] => array.mkString("[", ", ", "]") case seq: Seq[_] => seq.mkString("[", ", ", "]") case _ => cell.toString diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 605a6549dd686..479bab1e7d449 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -585,6 +585,21 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { assert(df.showString(10) === expectedAnswer) } + test("showString: binary") { + val df = Seq( + ("12".getBytes, "ABC.".getBytes), + ("34".getBytes, "12346".getBytes) + ).toDF() + val expectedAnswer = """+-------+----------------+ + || _1| _2| + |+-------+----------------+ + ||[31 32]| [41 42 43 2E]| + ||[33 34]|[31 32 33 34 36]| + |+-------+----------------+ + |""".stripMargin + assert(df.showString(10) === expectedAnswer) + } + test("showString: minimum column width") { val df = Seq( (1, 1), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 542e4d6c43b9f..7e76aa00c2c5e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -426,6 +426,20 @@ class DatasetSuite extends QueryTest with SharedSQLContext { assert(ds.toString == "[_1: int, _2: int]") } + test("showString: Kryo encoder") { + implicit val kryoEncoder = Encoders.kryo[KryoData] + val ds = Seq(KryoData(1), KryoData(2)).toDS() + + val expectedAnswer = """+-----------+ + || value| + |+-----------+ + ||KryoData(1)| + ||KryoData(2)| + |+-----------+ + |""".stripMargin + assert(ds.showString(10) === expectedAnswer) + } + test("Kryo encoder") { implicit val kryoEncoder = Encoders.kryo[KryoData] val ds = Seq(KryoData(1), KryoData(2)).toDS() From 5553b0b8f5f064cdd5676fcf14008fd5036f6d4e Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 15 Dec 2015 11:01:24 -0800 Subject: [PATCH 5/5] undo the changes on the comment indenting. --- .../scala/org/apache/spark/sql/Dataset.scala | 58 +++++++++---------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 3d7915d62ebeb..c0bfc30a1d8ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -263,20 +263,20 @@ class Dataset[T] private[sql]( } /** - * Returns a new [[Dataset]] that has exactly `numPartitions` partitions. - * @since 1.6.0 - */ + * Returns a new [[Dataset]] that has exactly `numPartitions` partitions. + * @since 1.6.0 + */ def repartition(numPartitions: Int): Dataset[T] = withPlan { Repartition(numPartitions, shuffle = true, _) } /** - * Returns a new [[Dataset]] that has exactly `numPartitions` partitions. - * Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g. - * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of - * the 100 new partitions will claim 10 of the current partitions. - * @since 1.6.0 - */ + * Returns a new [[Dataset]] that has exactly `numPartitions` partitions. + * Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g. + * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of + * the 100 new partitions will claim 10 of the current partitions. + * @since 1.6.0 + */ def coalesce(numPartitions: Int): Dataset[T] = withPlan { Repartition(numPartitions, shuffle = false, _) } @@ -733,47 +733,47 @@ class Dataset[T] private[sql]( def takeAsList(num: Int): java.util.List[T] = java.util.Arrays.asList(take(num) : _*) /** - * Persist this [[Dataset]] with the default storage level (`MEMORY_AND_DISK`). - * @since 1.6.0 - */ + * Persist this [[Dataset]] with the default storage level (`MEMORY_AND_DISK`). + * @since 1.6.0 + */ def persist(): this.type = { sqlContext.cacheManager.cacheQuery(this) this } /** - * Persist this [[Dataset]] with the default storage level (`MEMORY_AND_DISK`). - * @since 1.6.0 - */ + * Persist this [[Dataset]] with the default storage level (`MEMORY_AND_DISK`). + * @since 1.6.0 + */ def cache(): this.type = persist() /** - * Persist this [[Dataset]] with the given storage level. - * @param newLevel One of: `MEMORY_ONLY`, `MEMORY_AND_DISK`, `MEMORY_ONLY_SER`, - * `MEMORY_AND_DISK_SER`, `DISK_ONLY`, `MEMORY_ONLY_2`, - * `MEMORY_AND_DISK_2`, etc. - * @group basic - * @since 1.6.0 - */ + * Persist this [[Dataset]] with the given storage level. + * @param newLevel One of: `MEMORY_ONLY`, `MEMORY_AND_DISK`, `MEMORY_ONLY_SER`, + * `MEMORY_AND_DISK_SER`, `DISK_ONLY`, `MEMORY_ONLY_2`, + * `MEMORY_AND_DISK_2`, etc. + * @group basic + * @since 1.6.0 + */ def persist(newLevel: StorageLevel): this.type = { sqlContext.cacheManager.cacheQuery(this, None, newLevel) this } /** - * Mark the [[Dataset]] as non-persistent, and remove all blocks for it from memory and disk. - * @param blocking Whether to block until all blocks are deleted. - * @since 1.6.0 - */ + * Mark the [[Dataset]] as non-persistent, and remove all blocks for it from memory and disk. + * @param blocking Whether to block until all blocks are deleted. + * @since 1.6.0 + */ def unpersist(blocking: Boolean): this.type = { sqlContext.cacheManager.tryUncacheQuery(this, blocking) this } /** - * Mark the [[Dataset]] as non-persistent, and remove all blocks for it from memory and disk. - * @since 1.6.0 - */ + * Mark the [[Dataset]] as non-persistent, and remove all blocks for it from memory and disk. + * @since 1.6.0 + */ def unpersist(): this.type = unpersist(blocking = false) /* ******************** *