Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,7 @@ Apart from these, the following properties are also available, and may be useful
<td>false</td>
<td>
Whether to compress logged events, if <code>spark.eventLog.enabled</code> is true.
Compression will use <code>spark.io.compression.codec</code>.
</td>
</tr>
<tr>
Expand Down Expand Up @@ -773,14 +774,15 @@ Apart from these, the following properties are also available, and may be useful
<td>true</td>
<td>
Whether to compress broadcast variables before sending them. Generally a good idea.
Compression will use <code>spark.io.compression.codec</code>.
</td>
</tr>
<tr>
<td><code>spark.io.compression.codec</code></td>
<td>lz4</td>
<td>
The codec used to compress internal data such as RDD partitions, broadcast variables and
shuffle outputs. By default, Spark provides three codecs: <code>lz4</code>, <code>lzf</code>,
The codec used to compress internal data such as RDD partitions, event log, broadcast variables
and shuffle outputs. By default, Spark provides three codecs: <code>lz4</code>, <code>lzf</code>,
and <code>snappy</code>. You can also use fully qualified class names to specify the codec,
e.g.
<code>org.apache.spark.io.LZ4CompressionCodec</code>,
Expand Down Expand Up @@ -881,6 +883,7 @@ Apart from these, the following properties are also available, and may be useful
<code>StorageLevel.MEMORY_ONLY_SER</code> in Java
and Scala or <code>StorageLevel.MEMORY_ONLY</code> in Python).
Can save substantial space at the cost of some extra CPU time.
Compression will use <code>spark.io.compression.codec</code>.
</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode {

// set isBroadcastable to true so the child will be broadcasted
override def computeStats(conf: CatalystConf): Statistics =
super.computeStats(conf).copy(isBroadcastable = true)
child.stats(conf).copy(isBroadcastable = true)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,23 @@ class BasicStatsEstimationSuite extends StatsEstimationTestBase {
// row count * (overhead + column size)
size = Some(10 * (8 + 4)))

test("BroadcastHint estimation") {
val filter = Filter(Literal(true), plan)
val filterStatsCboOn = Statistics(sizeInBytes = 10 * (8 +4), isBroadcastable = false,
rowCount = Some(10), attributeStats = AttributeMap(Seq(attribute -> colStat)))
val filterStatsCboOff = Statistics(sizeInBytes = 10 * (8 +4), isBroadcastable = false)
checkStats(
filter,
expectedStatsCboOn = filterStatsCboOn,
expectedStatsCboOff = filterStatsCboOff)

val broadcastHint = BroadcastHint(filter)
checkStats(
broadcastHint,
expectedStatsCboOn = filterStatsCboOn.copy(isBroadcastable = true),
expectedStatsCboOff = filterStatsCboOff.copy(isBroadcastable = true))
}

test("limit estimation: limit < child's rowCount") {
val localLimit = LocalLimit(Literal(2), plan)
val globalLimit = GlobalLimit(Literal(2), plan)
Expand Down Expand Up @@ -97,8 +114,10 @@ class BasicStatsEstimationSuite extends StatsEstimationTestBase {
plan: LogicalPlan,
expectedStatsCboOn: Statistics,
expectedStatsCboOff: Statistics): Unit = {
assert(plan.stats(conf.copy(cboEnabled = true)) == expectedStatsCboOn)
// Invalidate statistics
plan.invalidateStatsCache()
assert(plan.stats(conf.copy(cboEnabled = true)) == expectedStatsCboOn)

plan.invalidateStatsCache()
assert(plan.stats(conf.copy(cboEnabled = false)) == expectedStatsCboOff)
}
Expand Down
25 changes: 15 additions & 10 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql

import java.beans.BeanInfo
import java.util.Properties

import scala.collection.immutable
Expand Down Expand Up @@ -527,8 +526,9 @@ class SQLContext private[sql](val sparkSession: SparkSession)
* @group ddl_ops
* @since 1.3.0
*/
@deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
def createExternalTable(tableName: String, path: String): DataFrame = {
sparkSession.catalog.createExternalTable(tableName, path)
sparkSession.catalog.createTable(tableName, path)
}

/**
Expand All @@ -538,11 +538,12 @@ class SQLContext private[sql](val sparkSession: SparkSession)
* @group ddl_ops
* @since 1.3.0
*/
@deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
def createExternalTable(
tableName: String,
path: String,
source: String): DataFrame = {
sparkSession.catalog.createExternalTable(tableName, path, source)
sparkSession.catalog.createTable(tableName, path, source)
}

/**
Expand All @@ -552,11 +553,12 @@ class SQLContext private[sql](val sparkSession: SparkSession)
* @group ddl_ops
* @since 1.3.0
*/
@deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
def createExternalTable(
tableName: String,
source: String,
options: java.util.Map[String, String]): DataFrame = {
sparkSession.catalog.createExternalTable(tableName, source, options)
sparkSession.catalog.createTable(tableName, source, options)
}

/**
Expand All @@ -567,11 +569,12 @@ class SQLContext private[sql](val sparkSession: SparkSession)
* @group ddl_ops
* @since 1.3.0
*/
@deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
def createExternalTable(
tableName: String,
source: String,
options: Map[String, String]): DataFrame = {
sparkSession.catalog.createExternalTable(tableName, source, options)
sparkSession.catalog.createTable(tableName, source, options)
}

/**
Expand All @@ -581,12 +584,13 @@ class SQLContext private[sql](val sparkSession: SparkSession)
* @group ddl_ops
* @since 1.3.0
*/
@deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
def createExternalTable(
tableName: String,
source: String,
schema: StructType,
options: java.util.Map[String, String]): DataFrame = {
sparkSession.catalog.createExternalTable(tableName, source, schema, options)
sparkSession.catalog.createTable(tableName, source, schema, options)
}

/**
Expand All @@ -597,12 +601,13 @@ class SQLContext private[sql](val sparkSession: SparkSession)
* @group ddl_ops
* @since 1.3.0
*/
@deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
def createExternalTable(
tableName: String,
source: String,
schema: StructType,
options: Map[String, String]): DataFrame = {
sparkSession.catalog.createExternalTable(tableName, source, schema, options)
sparkSession.catalog.createTable(tableName, source, schema, options)
}

/**
Expand Down Expand Up @@ -1089,9 +1094,9 @@ object SQLContext {
* method for internal use.
*/
private[sql] def beansToRows(
data: Iterator[_],
beanClass: Class[_],
attrs: Seq[AttributeReference]): Iterator[InternalRow] = {
data: Iterator[_],
beanClass: Class[_],
attrs: Seq[AttributeReference]): Iterator[InternalRow] = {
val extractors =
JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod)
val methodsToConverts = extractors.zip(attrs).map { case (e, attr) =>
Expand Down