Skip to content

Commit 8800eba

Browse files
committed
Merge branch 'master' of https://github.com/apache/spark into allocateExecutors
2 parents 32ac7af + a9ec033 commit 8800eba

File tree

45 files changed

+567
-69
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+567
-69
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,9 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
360360
|
361361
| --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
362362
|
363+
| --help, -h Show this help message and exit
364+
| --verbose, -v Print additional debug output
365+
|
363366
| Spark standalone with cluster deploy mode only:
364367
| --driver-cores NUM Cores for driver (Default: 1).
365368
| --supervise If given, restarts the driver on failure.

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,22 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
141141
assert(sched.finishedManagers.contains(manager))
142142
}
143143

144+
test("skip unsatisfiable locality levels") {
145+
sc = new SparkContext("local", "test")
146+
val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execC", "host2"))
147+
val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "execB")))
148+
val clock = new FakeClock
149+
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
150+
151+
// An executor that is not NODE_LOCAL should be rejected.
152+
assert(manager.resourceOffer("execC", "host2", ANY) === None)
153+
154+
// Because there are no alive PROCESS_LOCAL executors, the base locality level should be
155+
// NODE_LOCAL. So, we should schedule the task on this offered NODE_LOCAL executor before
156+
// any of the locality wait timers expire.
157+
assert(manager.resourceOffer("execA", "host1", ANY).get.index === 0)
158+
}
159+
144160
test("basic delay scheduling") {
145161
sc = new SparkContext("local", "test")
146162
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
131131
protected val OUTER = Keyword("OUTER")
132132
protected val RIGHT = Keyword("RIGHT")
133133
protected val SELECT = Keyword("SELECT")
134+
protected val SEMI = Keyword("SEMI")
134135
protected val STRING = Keyword("STRING")
135136
protected val SUM = Keyword("SUM")
136137
protected val TRUE = Keyword("TRUE")
@@ -241,6 +242,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
241242

242243
protected lazy val joinType: Parser[JoinType] =
243244
INNER ^^^ Inner |
245+
LEFT ~ SEMI ^^^ LeftSemi |
244246
LEFT ~ opt(OUTER) ^^^ LeftOuter |
245247
RIGHT ~ opt(OUTER) ^^^ RightOuter |
246248
FULL ~ opt(OUTER) ^^^ FullOuter

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,11 @@ object HashFilteredJoin extends Logging with PredicateHelper {
119119
case FilteredOperation(predicates, join @ Join(left, right, Inner, condition)) =>
120120
logger.debug(s"Considering hash inner join on: ${predicates ++ condition}")
121121
splitPredicates(predicates ++ condition, join)
122+
// All predicates can be evaluated for left semi join (those that are in the WHERE
123+
// clause can only from left table, so they can all be pushed down.)
124+
case FilteredOperation(predicates, join @ Join(left, right, LeftSemi, condition)) =>
125+
logger.debug(s"Considering hash left semi join on: ${predicates ++ condition}")
126+
splitPredicates(predicates ++ condition, join)
122127
case join @ Join(left, right, joinType, condition) =>
123128
logger.debug(s"Considering hash join on: $condition")
124129
splitPredicates(condition.toSeq, join)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,4 @@ case object Inner extends JoinType
2222
case object LeftOuter extends JoinType
2323
case object RightOuter extends JoinType
2424
case object FullOuter extends JoinType
25+
case object LeftSemi extends JoinType

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical
2020
import org.apache.spark.sql.catalyst.errors.TreeNodeException
2121
import org.apache.spark.sql.catalyst.expressions._
2222
import org.apache.spark.sql.catalyst.plans.QueryPlan
23-
import org.apache.spark.sql.catalyst.types.StructType
23+
import org.apache.spark.sql.catalyst.types.{StringType, StructType}
2424
import org.apache.spark.sql.catalyst.trees
2525

2626
abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
@@ -102,7 +102,7 @@ abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] {
102102
*/
103103
abstract class Command extends LeafNode {
104104
self: Product =>
105-
def output = Seq.empty
105+
def output: Seq[Attribute] = Seq.empty
106106
}
107107

108108
/**
@@ -115,7 +115,9 @@ case class NativeCommand(cmd: String) extends Command
115115
* Returned by a parser when the users only wants to see what query plan would be executed, without
116116
* actually performing the execution.
117117
*/
118-
case class ExplainCommand(plan: LogicalPlan) extends Command
118+
case class ExplainCommand(plan: LogicalPlan) extends Command {
119+
override def output = Seq(AttributeReference("plan", StringType, nullable = false)())
120+
}
119121

120122
/**
121123
* A logical plan node with single child.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.catalyst.plans.logical
1919

2020
import org.apache.spark.sql.catalyst.expressions._
21-
import org.apache.spark.sql.catalyst.plans.JoinType
21+
import org.apache.spark.sql.catalyst.plans.{LeftSemi, JoinType}
2222
import org.apache.spark.sql.catalyst.types._
2323

2424
case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
@@ -81,7 +81,12 @@ case class Join(
8181
condition: Option[Expression]) extends BinaryNode {
8282

8383
def references = condition.map(_.references).getOrElse(Set.empty)
84-
def output = left.output ++ right.output
84+
def output = joinType match {
85+
case LeftSemi =>
86+
left.output
87+
case _ =>
88+
left.output ++ right.output
89+
}
8590
}
8691

8792
case class InsertIntoTable(

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -191,8 +191,10 @@ class SQLContext(@transient val sparkContext: SparkContext)
191191
val sparkContext = self.sparkContext
192192

193193
val strategies: Seq[Strategy] =
194+
CommandStrategy(self) ::
194195
TakeOrdered ::
195196
PartialAggregation ::
197+
LeftSemiJoin ::
196198
HashJoin ::
197199
ParquetOperations ::
198200
BasicOperators ::
@@ -255,6 +257,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
255257
Batch("Prepare Expressions", Once, new BindReferences[SparkPlan]) :: Nil
256258
}
257259

260+
// TODO: or should we make QueryExecution protected[sql]?
261+
protected[sql] def mkQueryExecution(plan: LogicalPlan) = new QueryExecution {
262+
val logical = plan
263+
}
264+
258265
/**
259266
* The primary workflow for executing relational queries using Spark. Designed to allow easy
260267
* access to the intermediate phases of query execution for developers.
@@ -284,11 +291,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
284291
|== Physical Plan ==
285292
|${stringOrError(executedPlan)}
286293
""".stripMargin.trim
287-
288-
/**
289-
* Runs the query after interposing operators that print the result of each intermediate step.
290-
*/
291-
def debugExec() = DebugQuery(executedPlan).execute().collect()
292294
}
293295

294296
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,22 @@ import org.apache.spark.sql.parquet._
2828
private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
2929
self: SQLContext#SparkPlanner =>
3030

31+
object LeftSemiJoin extends Strategy with PredicateHelper {
32+
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
33+
// Find left semi joins where at least some predicates can be evaluated by matching hash
34+
// keys using the HashFilteredJoin pattern.
35+
case HashFilteredJoin(LeftSemi, leftKeys, rightKeys, condition, left, right) =>
36+
val semiJoin = execution.LeftSemiJoinHash(
37+
leftKeys, rightKeys, planLater(left), planLater(right))
38+
condition.map(Filter(_, semiJoin)).getOrElse(semiJoin) :: Nil
39+
// no predicate can be evaluated by matching hash keys
40+
case logical.Join(left, right, LeftSemi, condition) =>
41+
execution.LeftSemiJoinBNL(
42+
planLater(left), planLater(right), condition)(sparkContext) :: Nil
43+
case _ => Nil
44+
}
45+
}
46+
3147
object HashJoin extends Strategy with PredicateHelper {
3248
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
3349
// Find inner joins where at least some predicates can be evaluated by matching hash keys
@@ -217,4 +233,15 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
217233
case _ => Nil
218234
}
219235
}
236+
237+
// TODO: this should be merged with SPARK-1508's SetCommandStrategy
238+
case class CommandStrategy(context: SQLContext) extends Strategy {
239+
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
240+
case logical.ExplainCommand(child) =>
241+
val qe = context.mkQueryExecution(child)
242+
Seq(execution.ExplainCommandPhysical(qe.executedPlan, plan.output)(context))
243+
case _ => Nil
244+
}
245+
}
246+
220247
}

sql/core/src/main/scala/org/apache/spark/sql/execution/debug.scala renamed to sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,29 +17,16 @@
1717

1818
package org.apache.spark.sql.execution
1919

20-
private[sql] object DebugQuery {
21-
def apply(plan: SparkPlan): SparkPlan = {
22-
val visited = new collection.mutable.HashSet[Long]()
23-
plan transform {
24-
case s: SparkPlan if !visited.contains(s.id) =>
25-
visited += s.id
26-
DebugNode(s)
27-
}
28-
}
29-
}
20+
import org.apache.spark.rdd.RDD
21+
import org.apache.spark.sql.{SQLContext, Row}
22+
import org.apache.spark.sql.catalyst.expressions.{GenericRow, Attribute}
3023

31-
private[sql] case class DebugNode(child: SparkPlan) extends UnaryNode {
32-
def references = Set.empty
33-
def output = child.output
34-
def execute() = {
35-
val childRdd = child.execute()
36-
println(
37-
s"""
38-
|=========================
39-
|${child.simpleString}
40-
|=========================
41-
""".stripMargin)
42-
childRdd.foreach(println(_))
43-
childRdd
24+
case class ExplainCommandPhysical(child: SparkPlan, output: Seq[Attribute])
25+
(@transient context: SQLContext) extends UnaryNode {
26+
def execute(): RDD[Row] = {
27+
val planString = new GenericRow(Array[Any](child.toString))
28+
context.sparkContext.parallelize(Seq(planString))
4429
}
30+
31+
override def otherCopyArgs = context :: Nil
4532
}

0 commit comments

Comments
 (0)