From cf5963c961e7eba37bdd58658ed4dfff66ce3c72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=83=AD=E5=B0=8F=E9=BE=99=2010207633?= Date: Sat, 1 Apr 2017 11:48:58 +0100 Subject: [PATCH 1/3] =?UTF-8?q?[SPARK-20177]=20Document=20about=20compress?= =?UTF-8?q?ion=20way=20has=20some=20little=20detail=20ch=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …anges. ## What changes were proposed in this pull request? Document compression way little detail changes. 1.spark.eventLog.compress add 'Compression will use spark.io.compression.codec.' 2.spark.broadcast.compress add 'Compression will use spark.io.compression.codec.' 3,spark.rdd.compress add 'Compression will use spark.io.compression.codec.' 4.spark.io.compression.codec add 'event log describe'. eg Through the documents, I don't know what is compression mode about 'event log'. ## How was this patch tested? manual tests Please review http://spark.apache.org/contributing.html before opening a pull request. Author: 郭小龙 10207633 Closes #17498 from guoxiaolongzte/SPARK-20177. --- docs/configuration.md | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index a9753925407d..2687f542b8bd 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -639,6 +639,7 @@ Apart from these, the following properties are also available, and may be useful false Whether to compress logged events, if spark.eventLog.enabled is true. + Compression will use spark.io.compression.codec. @@ -773,14 +774,15 @@ Apart from these, the following properties are also available, and may be useful true Whether to compress broadcast variables before sending them. Generally a good idea. + Compression will use spark.io.compression.codec. spark.io.compression.codec lz4 - The codec used to compress internal data such as RDD partitions, broadcast variables and - shuffle outputs. By default, Spark provides three codecs: lz4, lzf, + The codec used to compress internal data such as RDD partitions, event log, broadcast variables + and shuffle outputs. By default, Spark provides three codecs: lz4, lzf, and snappy. You can also use fully qualified class names to specify the codec, e.g. org.apache.spark.io.LZ4CompressionCodec, @@ -881,6 +883,7 @@ Apart from these, the following properties are also available, and may be useful StorageLevel.MEMORY_ONLY_SER in Java and Scala or StorageLevel.MEMORY_ONLY in Python). Can save substantial space at the cost of some extra CPU time. + Compression will use spark.io.compression.codec. From 89d6822f722912d2b05571a95a539092091650b5 Mon Sep 17 00:00:00 2001 From: Xiao Li Date: Sat, 1 Apr 2017 20:43:13 +0800 Subject: [PATCH 2/3] [SPARK-19148][SQL][FOLLOW-UP] do not expose the external table concept in Catalog ### What changes were proposed in this pull request? After we renames `Catalog`.`createExternalTable` to `createTable` in the PR: https://github.com/apache/spark/pull/16528, we also need to deprecate the corresponding functions in `SQLContext`. ### How was this patch tested? N/A Author: Xiao Li Closes #17502 from gatorsmile/deprecateCreateExternalTable. --- .../org/apache/spark/sql/SQLContext.scala | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 234ef2dffc6b..cc2983987eb9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql -import java.beans.BeanInfo import java.util.Properties import scala.collection.immutable @@ -527,8 +526,9 @@ class SQLContext private[sql](val sparkSession: SparkSession) * @group ddl_ops * @since 1.3.0 */ + @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0") def createExternalTable(tableName: String, path: String): DataFrame = { - sparkSession.catalog.createExternalTable(tableName, path) + sparkSession.catalog.createTable(tableName, path) } /** @@ -538,11 +538,12 @@ class SQLContext private[sql](val sparkSession: SparkSession) * @group ddl_ops * @since 1.3.0 */ + @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0") def createExternalTable( tableName: String, path: String, source: String): DataFrame = { - sparkSession.catalog.createExternalTable(tableName, path, source) + sparkSession.catalog.createTable(tableName, path, source) } /** @@ -552,11 +553,12 @@ class SQLContext private[sql](val sparkSession: SparkSession) * @group ddl_ops * @since 1.3.0 */ + @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0") def createExternalTable( tableName: String, source: String, options: java.util.Map[String, String]): DataFrame = { - sparkSession.catalog.createExternalTable(tableName, source, options) + sparkSession.catalog.createTable(tableName, source, options) } /** @@ -567,11 +569,12 @@ class SQLContext private[sql](val sparkSession: SparkSession) * @group ddl_ops * @since 1.3.0 */ + @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0") def createExternalTable( tableName: String, source: String, options: Map[String, String]): DataFrame = { - sparkSession.catalog.createExternalTable(tableName, source, options) + sparkSession.catalog.createTable(tableName, source, options) } /** @@ -581,12 +584,13 @@ class SQLContext private[sql](val sparkSession: SparkSession) * @group ddl_ops * @since 1.3.0 */ + @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0") def createExternalTable( tableName: String, source: String, schema: StructType, options: java.util.Map[String, String]): DataFrame = { - sparkSession.catalog.createExternalTable(tableName, source, schema, options) + sparkSession.catalog.createTable(tableName, source, schema, options) } /** @@ -597,12 +601,13 @@ class SQLContext private[sql](val sparkSession: SparkSession) * @group ddl_ops * @since 1.3.0 */ + @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0") def createExternalTable( tableName: String, source: String, schema: StructType, options: Map[String, String]): DataFrame = { - sparkSession.catalog.createExternalTable(tableName, source, schema, options) + sparkSession.catalog.createTable(tableName, source, schema, options) } /** @@ -1089,9 +1094,9 @@ object SQLContext { * method for internal use. */ private[sql] def beansToRows( - data: Iterator[_], - beanClass: Class[_], - attrs: Seq[AttributeReference]): Iterator[InternalRow] = { + data: Iterator[_], + beanClass: Class[_], + attrs: Seq[AttributeReference]): Iterator[InternalRow] = { val extractors = JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod) val methodsToConverts = extractors.zip(attrs).map { case (e, attr) => From 2287f3d0b85730995bedc489a017de5700d6e1e4 Mon Sep 17 00:00:00 2001 From: wangzhenhua Date: Sat, 1 Apr 2017 22:19:08 +0800 Subject: [PATCH 3/3] [SPARK-20186][SQL] BroadcastHint should use child's stats ## What changes were proposed in this pull request? `BroadcastHint` should use child's statistics and set `isBroadcastable` to true. ## How was this patch tested? Added a new stats estimation test for `BroadcastHint`. Author: wangzhenhua Closes #17504 from wzhfy/broadcastHintEstimation. --- .../plans/logical/basicLogicalOperators.scala | 2 +- .../BasicStatsEstimationSuite.scala | 21 ++++++++++++++++++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 5cbf263d1ce4..19db42c80895 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -383,7 +383,7 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode { // set isBroadcastable to true so the child will be broadcasted override def computeStats(conf: CatalystConf): Statistics = - super.computeStats(conf).copy(isBroadcastable = true) + child.stats(conf).copy(isBroadcastable = true) } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala index e5dc811c8b7d..0d92c1e35565 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala @@ -35,6 +35,23 @@ class BasicStatsEstimationSuite extends StatsEstimationTestBase { // row count * (overhead + column size) size = Some(10 * (8 + 4))) + test("BroadcastHint estimation") { + val filter = Filter(Literal(true), plan) + val filterStatsCboOn = Statistics(sizeInBytes = 10 * (8 +4), isBroadcastable = false, + rowCount = Some(10), attributeStats = AttributeMap(Seq(attribute -> colStat))) + val filterStatsCboOff = Statistics(sizeInBytes = 10 * (8 +4), isBroadcastable = false) + checkStats( + filter, + expectedStatsCboOn = filterStatsCboOn, + expectedStatsCboOff = filterStatsCboOff) + + val broadcastHint = BroadcastHint(filter) + checkStats( + broadcastHint, + expectedStatsCboOn = filterStatsCboOn.copy(isBroadcastable = true), + expectedStatsCboOff = filterStatsCboOff.copy(isBroadcastable = true)) + } + test("limit estimation: limit < child's rowCount") { val localLimit = LocalLimit(Literal(2), plan) val globalLimit = GlobalLimit(Literal(2), plan) @@ -97,8 +114,10 @@ class BasicStatsEstimationSuite extends StatsEstimationTestBase { plan: LogicalPlan, expectedStatsCboOn: Statistics, expectedStatsCboOff: Statistics): Unit = { - assert(plan.stats(conf.copy(cboEnabled = true)) == expectedStatsCboOn) // Invalidate statistics + plan.invalidateStatsCache() + assert(plan.stats(conf.copy(cboEnabled = true)) == expectedStatsCboOn) + plan.invalidateStatsCache() assert(plan.stats(conf.copy(cboEnabled = false)) == expectedStatsCboOff) }