Skip to content

Commit 83adb2f

Browse files
committed
Merge remote-tracking branch 'upstream/master' into SPARK-2177
2 parents 366f891 + 640c294 commit 83adb2f

File tree

10 files changed

+43
-27
lines changed

10 files changed

+43
-27
lines changed

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ abstract class RDD[T: ClassTag](
446446
* Return this RDD sorted by the given key function.
447447
*/
448448
def sortBy[K](
449-
f: (T) K,
449+
f: (T) => K,
450450
ascending: Boolean = true,
451451
numPartitions: Int = this.partitions.size)
452452
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] =

core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging {
120120
// on SparkConf settings.
121121

122122
def testAppenderSelection[ExpectedAppender: ClassTag, ExpectedRollingPolicy](
123-
properties: Seq[(String, String)], expectedRollingPolicyParam: Long = -1): FileAppender = {
123+
properties: Seq[(String, String)], expectedRollingPolicyParam: Long = -1): Unit = {
124124

125125
// Set spark conf properties
126126
val conf = new SparkConf
@@ -129,8 +129,9 @@ class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging {
129129
}
130130

131131
// Create and test file appender
132-
val inputStream = new PipedInputStream(new PipedOutputStream())
133-
val appender = FileAppender(inputStream, new File("stdout"), conf)
132+
val testOutputStream = new PipedOutputStream()
133+
val testInputStream = new PipedInputStream(testOutputStream)
134+
val appender = FileAppender(testInputStream, testFile, conf)
134135
assert(appender.isInstanceOf[ExpectedAppender])
135136
assert(appender.getClass.getSimpleName ===
136137
classTag[ExpectedAppender].runtimeClass.getSimpleName)
@@ -144,7 +145,8 @@ class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging {
144145
}
145146
assert(policyParam === expectedRollingPolicyParam)
146147
}
147-
appender
148+
testOutputStream.close()
149+
appender.awaitTermination()
148150
}
149151

150152
import RollingFileAppender._

mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ class RowMatrix(
419419
/** Updates or verifies the number of rows. */
420420
private def updateNumRows(m: Long) {
421421
if (nRows <= 0) {
422-
nRows == m
422+
nRows = m
423423
} else {
424424
require(nRows == m,
425425
s"The number of rows $m is different from what specified or previously computed: ${nRows}.")

python/pyspark/java_gateway.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,19 @@ def launch_gateway():
4343
# Don't send ctrl-c / SIGINT to the Java gateway:
4444
def preexec_func():
4545
signal.signal(signal.SIGINT, signal.SIG_IGN)
46-
proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func)
46+
proc = Popen(command, stdout=PIPE, stdin=PIPE, stderr=PIPE, preexec_fn=preexec_func)
4747
else:
4848
# preexec_fn not supported on Windows
49-
proc = Popen(command, stdout=PIPE, stdin=PIPE)
50-
# Determine which ephemeral port the server started on:
51-
gateway_port = int(proc.stdout.readline())
49+
proc = Popen(command, stdout=PIPE, stdin=PIPE, stderr=PIPE)
50+
51+
try:
52+
# Determine which ephemeral port the server started on:
53+
gateway_port = int(proc.stdout.readline())
54+
except:
55+
error_code = proc.poll()
56+
raise Exception("Launching GatewayServer failed with exit code %d: %s" %
57+
(error_code, "".join(proc.stderr.readlines())))
58+
5259
# Create a thread to echo output from the GatewayServer, which is required
5360
# for Java log output to show up:
5461
class EchoOutputThread(Thread):

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ class BindReferences[TreeNode <: QueryPlan[TreeNode]] extends Rule[TreeNode] {
6868
}
6969

7070
object BindReferences extends Logging {
71-
def bindReference(expression: Expression, input: Seq[Attribute]): Expression = {
71+
def bindReference[A <: Expression](expression: A, input: Seq[Attribute]): A = {
7272
expression.transform { case a: AttributeReference =>
7373
attachTree(a, "Binding attribute") {
7474
val ordinal = input.indexWhere(_.exprId == a.exprId)
@@ -83,6 +83,6 @@ object BindReferences extends Logging {
8383
BoundReference(ordinal, a)
8484
}
8585
}
86-
}
86+
}.asInstanceOf[A] // Kind of a hack, but safe. TODO: Tighten return type when possible.
8787
}
8888
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,9 @@ class GenericMutableRow(size: Int) extends GenericRow(size) with MutableRow {
208208

209209

210210
class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[Row] {
211+
def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
212+
this(ordering.map(BindReferences.bindReference(_, inputSchema)))
213+
211214
def compare(a: Row, b: Row): Int = {
212215
var i = 0
213216
while (i < ordering.size) {

sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.spark.{HashPartitioner, RangePartitioner, SparkConf}
2222
import org.apache.spark.rdd.ShuffledRDD
2323
import org.apache.spark.sql.{SQLConf, SQLContext, Row}
2424
import org.apache.spark.sql.catalyst.errors.attachTree
25-
import org.apache.spark.sql.catalyst.expressions.{MutableProjection, RowOrdering}
25+
import org.apache.spark.sql.catalyst.expressions.{NoBind, MutableProjection, RowOrdering}
2626
import org.apache.spark.sql.catalyst.plans.physical._
2727
import org.apache.spark.sql.catalyst.rules.Rule
2828
import org.apache.spark.util.MutablePair
@@ -31,7 +31,7 @@ import org.apache.spark.util.MutablePair
3131
* :: DeveloperApi ::
3232
*/
3333
@DeveloperApi
34-
case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode {
34+
case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode with NoBind {
3535

3636
override def outputPartitioning = newPartitioning
3737

@@ -42,7 +42,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
4242
case HashPartitioning(expressions, numPartitions) =>
4343
// TODO: Eliminate redundant expressions in grouping key and value.
4444
val rdd = child.execute().mapPartitions { iter =>
45-
val hashExpressions = new MutableProjection(expressions)
45+
val hashExpressions = new MutableProjection(expressions, child.output)
4646
val mutablePair = new MutablePair[Row, Row]()
4747
iter.map(r => mutablePair.update(hashExpressions(r), r))
4848
}
@@ -53,7 +53,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
5353

5454
case RangePartitioning(sortingExpressions, numPartitions) =>
5555
// TODO: RangePartitioner should take an Ordering.
56-
implicit val ordering = new RowOrdering(sortingExpressions)
56+
implicit val ordering = new RowOrdering(sortingExpressions, child.output)
5757

5858
val rdd = child.execute().mapPartitions { iter =>
5959
val mutablePair = new MutablePair[Row, Null](null, null)

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,9 +250,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
250250
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
251251
case logical.SetCommand(key, value) =>
252252
Seq(execution.SetCommand(key, value, plan.output)(context))
253-
case logical.ExplainCommand(child) =>
254-
val sparkPlan = context.executePlan(child).sparkPlan
255-
Seq(execution.ExplainCommand(sparkPlan, plan.output)(context))
253+
case logical.ExplainCommand(logicalPlan) =>
254+
Seq(execution.ExplainCommand(logicalPlan, plan.output)(context))
256255
case logical.CacheCommand(tableName, cache) =>
257256
Seq(execution.CacheCommand(tableName, cache)(context))
258257
case _ => Nil

sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.apache.spark.annotation.DeveloperApi
2121
import org.apache.spark.rdd.RDD
2222
import org.apache.spark.sql.{SQLContext, Row}
2323
import org.apache.spark.sql.catalyst.expressions.{GenericRow, Attribute}
24+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2425

2526
trait Command {
2627
/**
@@ -71,16 +72,23 @@ case class SetCommand(
7172
}
7273

7374
/**
75+
* An explain command for users to see how a command will be executed.
76+
*
77+
* Note that this command takes in a logical plan, runs the optimizer on the logical plan
78+
* (but do NOT actually execute it).
79+
*
7480
* :: DeveloperApi ::
7581
*/
7682
@DeveloperApi
7783
case class ExplainCommand(
78-
child: SparkPlan, output: Seq[Attribute])(
84+
logicalPlan: LogicalPlan, output: Seq[Attribute])(
7985
@transient context: SQLContext)
80-
extends UnaryNode with Command {
86+
extends LeafNode with Command {
8187

82-
// Actually "EXPLAIN" command doesn't cause any side effect.
83-
override protected[sql] lazy val sideEffectResult: Seq[String] = this.toString.split("\n")
88+
// Run through the optimizer to generate the physical plan.
89+
override protected[sql] lazy val sideEffectResult: Seq[String] = {
90+
"Physical execution plan:" +: context.executePlan(logicalPlan).executedPlan.toString.split("\n")
91+
}
8492

8593
def execute(): RDD[Row] = {
8694
val explanation = sideEffectResult.map(row => new GenericRow(Array[Any](row)))

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -202,12 +202,9 @@ class HiveQuerySuite extends HiveComparisonTest {
202202
}
203203
}
204204

205-
private val explainCommandClassName =
206-
classOf[execution.ExplainCommand].getSimpleName.stripSuffix("$")
207-
208205
def isExplanation(result: SchemaRDD) = {
209206
val explanation = result.select('plan).collect().map { case Row(plan: String) => plan }
210-
explanation.size > 1 && explanation.head.startsWith(explainCommandClassName)
207+
explanation.size > 1 && explanation.head.startsWith("Physical execution plan")
211208
}
212209

213210
test("SPARK-1704: Explain commands as a SchemaRDD") {

0 commit comments

Comments
 (0)