Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
aa7120e
Initial Broadcast design
hvanhovell Feb 4, 2016
c2b7533
Fix Exchange and initial code gen attempt.
hvanhovell Feb 4, 2016
6a5568a
Move broadcast retreval to SparkPlan
hvanhovell Feb 6, 2016
9adecdd
Merge remote-tracking branch 'spark/master' into SPARK-13136
hvanhovell Feb 6, 2016
d0194fb
Fix Codegen & Add other broadcast joins.
hvanhovell Feb 6, 2016
02a61b8
Minor touchup
hvanhovell Feb 6, 2016
c12c8e6
Move broadcast relation retrieval.
hvanhovell Feb 7, 2016
c7dd7ae
Remove codegen from broadcast.
hvanhovell Feb 8, 2016
e847383
Merge remote-tracking branch 'spark/master' into SPARK-13136
hvanhovell Feb 10, 2016
d73f11c
Merge remote-tracking branch 'apache-github/master' into SPARK-13136
hvanhovell Feb 12, 2016
9c0f4bf
Remove closure passing.
hvanhovell Feb 14, 2016
da4a966
Merge remote-tracking branch 'apache-github/master' into SPARK-13136
hvanhovell Feb 14, 2016
681f347
Move transform into BroadcastMode
hvanhovell Feb 15, 2016
7db240a
Clean-up
hvanhovell Feb 15, 2016
3ad839d
Code Review.
hvanhovell Feb 16, 2016
1116768
No newline at EOF :(
hvanhovell Feb 16, 2016
a5501cf
Rename exchanges and merge Broadcast.scala into exchange.scala.
hvanhovell Feb 17, 2016
c7429bb
Merge remote-tracking branch 'apache-github/master' into SPARK-13136
hvanhovell Feb 17, 2016
b12bbc2
Merge remote-tracking branch 'apache-github/master' into SPARK-13136
hvanhovell Feb 20, 2016
9d52650
Revert renaming of variabels in LeftSemiJoinBNL.
hvanhovell Feb 20, 2016
54b558d
Revert renaming of variabels in LeftSemiJoinBNL.
hvanhovell Feb 20, 2016
f33d2cb
Move all exchange related operators into the exchange package.
hvanhovell Feb 21, 2016
28363c8
CR
hvanhovell Feb 21, 2016
f812a31
Merge remote-tracking branch 'apache-github/master' into SPARK-13136
hvanhovell Feb 21, 2016
4b5978b
put broadcast mode in a separate file.
hvanhovell Feb 21, 2016
c8c175e
Fix style in sqlcontext.
hvanhovell Feb 21, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.plans.physical

import org.apache.spark.sql.catalyst.InternalRow

/**
* Marker trait to identify the shape in which tuples are broadcasted. Typical examples of this are
* identity (tuples remain unchanged) or hashed (tuples are converted into some hash index).
*/
trait BroadcastMode {
def transform(rows: Array[InternalRow]): Any
}

/**
* IdentityBroadcastMode requires that rows are broadcasted in their original form.
*/
case object IdentityBroadcastMode extends BroadcastMode {
override def transform(rows: Array[InternalRow]): Array[InternalRow] = rows
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.plans.physical

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.{DataType, IntegerType}

Expand Down Expand Up @@ -75,6 +76,12 @@ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution {
def clustering: Set[Expression] = ordering.map(_.child).toSet
}

/**
* Represents data where tuples are broadcasted to every node. It is quite common that the
* entire set of tuples is transformed into different data structure.
*/
case class BroadcastDistribution(mode: BroadcastMode) extends Distribution
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok for now, but technically once we introduce this, "Distribution" is not just about "distribution" anymore, but rather some data property.


/**
* Describes how an operator's output is split across partitions. The `compatibleWith`,
* `guarantees`, and `satisfies` methods describe relationships between child partitionings,
Expand Down Expand Up @@ -213,7 +220,10 @@ case class RoundRobinPartitioning(numPartitions: Int) extends Partitioning {
case object SinglePartition extends Partitioning {
val numPartitions = 1

override def satisfies(required: Distribution): Boolean = true
override def satisfies(required: Distribution): Boolean = required match {
case _: BroadcastDistribution => false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this is ok for now, but technically we don't need to introduce an exchange if both sides of the join have only one partition. i guess this framework does not currently handle that.

case _ => true
}

override def compatibleWith(other: Partitioning): Boolean = other.numPartitions == 1

Expand Down Expand Up @@ -351,3 +361,21 @@ case class PartitioningCollection(partitionings: Seq[Partitioning])
partitionings.map(_.toString).mkString("(", " or ", ")")
}
}

/**
* Represents a partitioning where rows are collected, transformed and broadcasted to each
* node in the cluster.
*/
case class BroadcastPartitioning(mode: BroadcastMode) extends Partitioning {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and partitioning here isn't really just about partitioning anymore. it also includes how data is physically organized on each partition.

override val numPartitions: Int = 1

override def satisfies(required: Distribution): Boolean = required match {
case BroadcastDistribution(m) if m == mode => true
case _ => false
}

override def compatibleWith(other: Partitioning): Boolean = other match {
case BroadcastPartitioning(m) if m == mode => true
case _ => false
}
}
12 changes: 6 additions & 6 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan,
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.exchange.EnsureRequirements
import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types._
Expand All @@ -59,7 +60,6 @@ import org.apache.spark.util.Utils
* @groupname config Configuration
* @groupname dataframes Custom DataFrame Creation
* @groupname Ungrouped Support functions for language integrated queries
*
* @since 1.0.0
*/
class SQLContext private[sql](
Expand Down Expand Up @@ -313,10 +313,10 @@ class SQLContext private[sql](
}

/**
* Returns true if the [[Queryable]] is currently cached in-memory.
* @group cachemgmt
* @since 1.3.0
*/
* Returns true if the [[Queryable]] is currently cached in-memory.
* @group cachemgmt
* @since 1.3.0
*/
private[sql] def isCached(qName: Queryable): Boolean = {
cacheManager.lookupCachedData(qName).nonEmpty
}
Expand Down Expand Up @@ -364,6 +364,7 @@ class SQLContext private[sql](

/**
* Converts $"col name" into an [[Column]].
*
* @since 1.3.0
*/
// This must live here to preserve binary compatibility with Spark < 1.5.
Expand Down Expand Up @@ -728,7 +729,6 @@ class SQLContext private[sql](
* cached/persisted before, it's also unpersisted.
*
* @param tableName the name of the table to be unregistered.
*
* @group basic
* @since 1.3.0
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._

import org.apache.spark.Logging
import org.apache.spark.broadcast
import org.apache.spark.rdd.{RDD, RDDOperationScope}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
Expand Down Expand Up @@ -108,15 +109,30 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil)

/**
* Returns the result of this query as an RDD[InternalRow] by delegating to doExecute
* after adding query plan information to created RDDs for visualization.
* Concrete implementations of SparkPlan should override doExecute instead.
* Returns the result of this query as an RDD[InternalRow] by delegating to doExecute after
* preparations. Concrete implementations of SparkPlan should override doExecute.
*/
final def execute(): RDD[InternalRow] = {
final def execute(): RDD[InternalRow] = executeQuery {
doExecute()
}

/**
* Returns the result of this query as a broadcast variable by delegating to doBroadcast after
* preparations. Concrete implementations of SparkPlan should override doBroadcast.
*/
final def executeBroadcast[T](): broadcast.Broadcast[T] = executeQuery {
doExecuteBroadcast()
}

/**
* Execute a query after preparing the query and adding query plan information to created RDDs
* for visualization.
*/
private final def executeQuery[T](query: => T): T = {
RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
prepare()
waitForSubqueries()
doExecute()
query
}
}

Expand Down Expand Up @@ -187,6 +203,14 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
*/
protected def doExecute(): RDD[InternalRow]

/**
* Overridden by concrete implementations of SparkPlan.
* Produces the result of the query as a broadcast variable.
*/
protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
throw new UnsupportedOperationException(s"$nodeName does not implement doExecuteBroadcast")
}

/**
* Runs this query returning the result as an array.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@

package org.apache.spark.sql.execution

import org.apache.spark.sql.{execution, Strategy}
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.Strategy
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution
import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
import org.apache.spark.sql.execution.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
import org.apache.spark.sql.execution.datasources.{CreateTableUsing, CreateTempTableUsing, DescribeCommand => LogicalDescribeCommand, _}
Expand Down Expand Up @@ -328,7 +330,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

case logical.Repartition(numPartitions, shuffle, child) =>
if (shuffle) {
execution.Exchange(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil
ShuffleExchange(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil
} else {
execution.Coalesce(numPartitions, planLater(child)) :: Nil
}
Expand Down Expand Up @@ -367,7 +369,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case r @ logical.Range(start, end, step, numSlices, output) =>
execution.Range(start, step, numSlices, r.numElements, output) :: Nil
case logical.RepartitionByExpression(expressions, child, nPartitions) =>
execution.Exchange(HashPartitioning(
exchange.ShuffleExchange(HashPartitioning(
expressions, nPartitions.getOrElse(numPartitions)), planLater(child)) :: Nil
case e @ python.EvaluatePython(udf, child, _) =>
python.BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -171,6 +172,10 @@ case class InputAdapter(child: SparkPlan) extends LeafNode with CodegenSupport {
child.execute()
}

override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
child.doExecuteBroadcast()
}

override def supportCodegen: Boolean = false

override def upstream(): RDD[InternalRow] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.exchange

import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._

import org.apache.spark.broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPartitioning, Partitioning}
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution, UnaryNode}
import org.apache.spark.util.ThreadUtils

/**
* A [[BroadcastExchange]] collects, transforms and finally broadcasts the result of a transformed
* SparkPlan.
*/
case class BroadcastExchange(
mode: BroadcastMode,
child: SparkPlan) extends UnaryNode {

override def output: Seq[Attribute] = child.output

override def outputPartitioning: Partitioning = BroadcastPartitioning(mode)

@transient
private val timeout: Duration = {
val timeoutValue = sqlContext.conf.broadcastTimeout
if (timeoutValue < 0) {
Duration.Inf
} else {
timeoutValue.seconds
}
}

@transient
private lazy val relationFuture: Future[broadcast.Broadcast[Any]] = {
// broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here.
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
Future {
// This will run in another thread. Set the execution id so that we can connect these jobs
// with the correct execution.
SQLExecution.withExecutionId(sparkContext, executionId) {
// Note that we use .executeCollect() because we don't want to convert data to Scala types
val input: Array[InternalRow] = child.executeCollect()

// Construct and broadcast the relation.
sparkContext.broadcast(mode.transform(input))
}
}(BroadcastExchange.executionContext)
}

override protected def doPrepare(): Unit = {
// Materialize the future.
relationFuture
}

override protected def doExecute(): RDD[InternalRow] = {
throw new UnsupportedOperationException(
"BroadcastExchange does not support the execute() code path.")
}

override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
val result = Await.result(relationFuture, timeout)
result.asInstanceOf[broadcast.Broadcast[T]]
}
}

object BroadcastExchange {
private[execution] val executionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128))
}
Loading