Skip to content

Commit c795342

Browse files
committed
Merge remote-tracking branch 'upstream/master' into expr_binary_log
2 parents f373bac + 57c60c5 commit c795342

File tree

14 files changed

+143
-120
lines changed

14 files changed

+143
-120
lines changed

core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ function renderDagVizForJob(svgContainer) {
235235
// them separately later. Note that we cannot draw them now because we need to
236236
// put these edges in a separate container that is on top of all stage graphs.
237237
metadata.selectAll(".incoming-edge").each(function(v) {
238-
var edge = d3.select(this).text().split(","); // e.g. 3,4 => [3, 4]
238+
var edge = d3.select(this).text().trim().split(","); // e.g. 3,4 => [3, 4]
239239
crossStageEdges.push(edge);
240240
});
241241
});

docs/hadoop-provided.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
---
2+
layout: global
3+
displayTitle: Using Spark's "Hadoop Free" Build
4+
title: Using Spark's "Hadoop Free" Build
5+
---
6+
7+
Spark uses Hadoop client libraries for HDFS and YARN. Starting in version Spark 1.4, the project packages "Hadoop free" builds that lets you more easily connect a single Spark binary to any Hadoop version. To use these builds, you need to modify `SPARK_DIST_CLASSPATH` to include Hadoop's package jars. The most convenient place to do this is by adding an entry in `conf/spark-env.sh`.
8+
9+
This page describes how to connect Spark to Hadoop for different types of distributions.
10+
11+
# Apache Hadoop
12+
For Apache distributions, you can use Hadoop's 'classpath' command. For instance:
13+
14+
{% highlight bash %}
15+
### in conf/spark-env.sh ###
16+
17+
# If 'hadoop' binary is on your PATH
18+
export SPARK_DIST_CLASSPATH=$(hadoop classpath)
19+
20+
# With explicit path to 'hadoop' binary
21+
export SPARK_DIST_CLASSPATH=$(/path/to/hadoop/bin/hadoop classpath)
22+
23+
# Passing a Hadoop configuration directory
24+
export SPARK_DIST_CLASSPATH=$(hadoop classpath --config /path/to/configs)
25+
26+
{% endhighlight %}

docs/index.md

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,13 @@ It also supports a rich set of higher-level tools including [Spark SQL](sql-prog
1212

1313
# Downloading
1414

15-
Get Spark from the [downloads page](http://spark.apache.org/downloads.html) of the project website. This documentation is for Spark version {{site.SPARK_VERSION}}. The downloads page
16-
contains Spark packages for many popular HDFS versions. If you'd like to build Spark from
17-
scratch, visit [Building Spark](building-spark.html).
15+
Get Spark from the [downloads page](http://spark.apache.org/downloads.html) of the project website. This documentation is for Spark version {{site.SPARK_VERSION}}. Spark uses Hadoop's client libraries for HDFS and YARN. Downloads are pre-packaged for a handful of popular Hadoop versions.
16+
Users can also download a "Hadoop free" binary and run Spark with any Hadoop version
17+
[by augmenting Spark's classpath](hadoop-provided.html).
18+
19+
If you'd like to build Spark from
20+
source, visit [Building Spark](building-spark.html).
21+
1822

1923
Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS). It's easy to run
2024
locally on one machine --- all you need is to have `java` installed on your system `PATH`,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
6868
protected val FULL = Keyword("FULL")
6969
protected val GROUP = Keyword("GROUP")
7070
protected val HAVING = Keyword("HAVING")
71-
protected val IF = Keyword("IF")
7271
protected val IN = Keyword("IN")
7372
protected val INNER = Keyword("INNER")
7473
protected val INSERT = Keyword("INSERT")
@@ -277,6 +276,7 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
277276
lexical.normalizeKeyword(udfName) match {
278277
case "sum" => SumDistinct(exprs.head)
279278
case "count" => CountDistinct(exprs)
279+
case _ => throw new AnalysisException(s"function $udfName does not support DISTINCT")
280280
}
281281
}
282282
| APPROXIMATE ~> ident ~ ("(" ~ DISTINCT ~> expression <~ ")") ^^ { case udfName ~ exp =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ class Analyzer(
460460
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
461461
case q: LogicalPlan =>
462462
q transformExpressions {
463-
case u @ UnresolvedFunction(name, children) if u.childrenResolved =>
463+
case u @ UnresolvedFunction(name, children) =>
464464
withPosition(u) {
465465
registry.lookupFunction(name, children)
466466
}
@@ -494,20 +494,21 @@ class Analyzer(
494494
object UnresolvedHavingClauseAttributes extends Rule[LogicalPlan] {
495495
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
496496
case filter @ Filter(havingCondition, aggregate @ Aggregate(_, originalAggExprs, _))
497-
if aggregate.resolved && containsAggregate(havingCondition) => {
497+
if aggregate.resolved && containsAggregate(havingCondition) =>
498+
498499
val evaluatedCondition = Alias(havingCondition, "havingCondition")()
499500
val aggExprsWithHaving = evaluatedCondition +: originalAggExprs
500501

501502
Project(aggregate.output,
502503
Filter(evaluatedCondition.toAttribute,
503504
aggregate.copy(aggregateExpressions = aggExprsWithHaving)))
504-
}
505505
}
506506

507-
protected def containsAggregate(condition: Expression): Boolean =
507+
protected def containsAggregate(condition: Expression): Boolean = {
508508
condition
509509
.collect { case ae: AggregateExpression => ae }
510510
.nonEmpty
511+
}
511512
}
512513

513514
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@
1717

1818
package org.apache.spark.sql.catalyst.analysis
1919

20+
import java.util.concurrent.ConcurrentHashMap
21+
22+
import scala.collection.JavaConversions._
2023
import scala.collection.mutable
24+
import scala.collection.mutable.ArrayBuffer
2125

2226
import org.apache.spark.sql.catalyst.CatalystConf
2327
import org.apache.spark.sql.catalyst.EmptyConf
@@ -81,18 +85,18 @@ trait Catalog {
8185
}
8286

8387
class SimpleCatalog(val conf: CatalystConf) extends Catalog {
84-
val tables = new mutable.HashMap[String, LogicalPlan]()
88+
val tables = new ConcurrentHashMap[String, LogicalPlan]
8589

8690
override def registerTable(
8791
tableIdentifier: Seq[String],
8892
plan: LogicalPlan): Unit = {
8993
val tableIdent = processTableIdentifier(tableIdentifier)
90-
tables += ((getDbTableName(tableIdent), plan))
94+
tables.put(getDbTableName(tableIdent), plan)
9195
}
9296

9397
override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
9498
val tableIdent = processTableIdentifier(tableIdentifier)
95-
tables -= getDbTableName(tableIdent)
99+
tables.remove(getDbTableName(tableIdent))
96100
}
97101

98102
override def unregisterAllTables(): Unit = {
@@ -101,18 +105,18 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
101105

102106
override def tableExists(tableIdentifier: Seq[String]): Boolean = {
103107
val tableIdent = processTableIdentifier(tableIdentifier)
104-
tables.get(getDbTableName(tableIdent)) match {
105-
case Some(_) => true
106-
case None => false
107-
}
108+
tables.containsKey(getDbTableName(tableIdent))
108109
}
109110

110111
override def lookupRelation(
111112
tableIdentifier: Seq[String],
112113
alias: Option[String] = None): LogicalPlan = {
113114
val tableIdent = processTableIdentifier(tableIdentifier)
114115
val tableFullName = getDbTableName(tableIdent)
115-
val table = tables.getOrElse(tableFullName, sys.error(s"Table Not Found: $tableFullName"))
116+
val table = tables.get(tableFullName)
117+
if (table == null) {
118+
sys.error(s"Table Not Found: $tableFullName")
119+
}
116120
val tableWithQualifiers = Subquery(tableIdent.last, table)
117121

118122
// If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
@@ -121,9 +125,11 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
121125
}
122126

123127
override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
124-
tables.map {
125-
case (name, _) => (name, true)
126-
}.toSeq
128+
val result = ArrayBuffer.empty[(String, Boolean)]
129+
for (name <- tables.keySet()) {
130+
result += ((name, true))
131+
}
132+
result
127133
}
128134

129135
override def refreshTable(databaseName: String, tableName: String): Unit = {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,16 +35,16 @@ trait FunctionRegistry {
3535
def lookupFunction(name: String, children: Seq[Expression]): Expression
3636
}
3737

38-
trait OverrideFunctionRegistry extends FunctionRegistry {
38+
class OverrideFunctionRegistry(underlying: FunctionRegistry) extends FunctionRegistry {
3939

4040
private val functionBuilders = StringKeyHashMap[FunctionBuilder](caseSensitive = false)
4141

4242
override def registerFunction(name: String, builder: FunctionBuilder): Unit = {
4343
functionBuilders.put(name, builder)
4444
}
4545

46-
abstract override def lookupFunction(name: String, children: Seq[Expression]): Expression = {
47-
functionBuilders.get(name).map(_(children)).getOrElse(super.lookupFunction(name, children))
46+
override def lookupFunction(name: String, children: Seq[Expression]): Expression = {
47+
functionBuilders.get(name).map(_(children)).getOrElse(underlying.lookupFunction(name, children))
4848
}
4949
}
5050

@@ -134,6 +134,12 @@ object FunctionRegistry {
134134
expression[Sum]("sum")
135135
)
136136

137+
val builtin: FunctionRegistry = {
138+
val fr = new SimpleFunctionRegistry
139+
expressions.foreach { case (name, builder) => fr.registerFunction(name, builder) }
140+
fr
141+
}
142+
137143
/** See usage above. */
138144
private def expression[T <: Expression](name: String)
139145
(implicit tag: ClassTag[T]): (String, FunctionBuilder) = {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@ import org.apache.spark.sql.types._
2525

2626

2727
/**
28-
* For Catalyst to work correctly, concrete implementations of [[Expression]]s must be case classes
29-
* whose constructor arguments are all Expressions types. In addition, if we want to support more
30-
* than one constructor, define those constructors explicitly as apply methods in the companion
31-
* object.
28+
* If an expression wants to be exposed in the function registry (so users can call it with
29+
* "name(arguments...)", the concrete implementation must be a case class whose constructor
30+
* arguments are all Expressions types. In addition, if it needs to support more than one
31+
* constructor, define those constructors explicitly as apply methods in the companion object.
3232
*
3333
* See [[Substring]] for an example.
3434
*/

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
120120

121121
// TODO how to handle the temp function per user session?
122122
@transient
123-
protected[sql] lazy val functionRegistry: FunctionRegistry = {
124-
val fr = new SimpleFunctionRegistry
125-
FunctionRegistry.expressions.foreach { case (name, func) => fr.registerFunction(name, func) }
126-
fr
127-
}
123+
protected[sql] lazy val functionRegistry: FunctionRegistry =
124+
new OverrideFunctionRegistry(FunctionRegistry.builtin)
128125

129126
@transient
130127
protected[sql] lazy val analyzer: Analyzer =

sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala

Lines changed: 36 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ private[spark] case class PythonUDF(
5757
def nullable: Boolean = true
5858

5959
override def eval(input: Row): Any = {
60-
sys.error("PythonUDFs can not be directly evaluated.")
60+
throw new UnsupportedOperationException("PythonUDFs can not be directly evaluated.")
6161
}
6262
}
6363

@@ -71,43 +71,49 @@ private[spark] case class PythonUDF(
7171
private[spark] object ExtractPythonUdfs extends Rule[LogicalPlan] {
7272
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
7373
// Skip EvaluatePython nodes.
74-
case p: EvaluatePython => p
74+
case plan: EvaluatePython => plan
7575

76-
case l: LogicalPlan =>
76+
case plan: LogicalPlan =>
7777
// Extract any PythonUDFs from the current operator.
78-
val udfs = l.expressions.flatMap(_.collect { case udf: PythonUDF => udf})
78+
val udfs = plan.expressions.flatMap(_.collect { case udf: PythonUDF => udf })
7979
if (udfs.isEmpty) {
8080
// If there aren't any, we are done.
81-
l
81+
plan
8282
} else {
8383
// Pick the UDF we are going to evaluate (TODO: Support evaluating multiple UDFs at a time)
8484
// If there is more than one, we will add another evaluation operator in a subsequent pass.
85-
val udf = udfs.head
86-
87-
var evaluation: EvaluatePython = null
88-
89-
// Rewrite the child that has the input required for the UDF
90-
val newChildren = l.children.map { child =>
91-
// Check to make sure that the UDF can be evaluated with only the input of this child.
92-
// Other cases are disallowed as they are ambiguous or would require a cartisian product.
93-
if (udf.references.subsetOf(child.outputSet)) {
94-
evaluation = EvaluatePython(udf, child)
95-
evaluation
96-
} else if (udf.references.intersect(child.outputSet).nonEmpty) {
97-
sys.error(s"Invalid PythonUDF $udf, requires attributes from more than one child.")
98-
} else {
99-
child
100-
}
85+
udfs.find(_.resolved) match {
86+
case Some(udf) =>
87+
var evaluation: EvaluatePython = null
88+
89+
// Rewrite the child that has the input required for the UDF
90+
val newChildren = plan.children.map { child =>
91+
// Check to make sure that the UDF can be evaluated with only the input of this child.
92+
// Other cases are disallowed as they are ambiguous or would require a cartesian
93+
// product.
94+
if (udf.references.subsetOf(child.outputSet)) {
95+
evaluation = EvaluatePython(udf, child)
96+
evaluation
97+
} else if (udf.references.intersect(child.outputSet).nonEmpty) {
98+
sys.error(s"Invalid PythonUDF $udf, requires attributes from more than one child.")
99+
} else {
100+
child
101+
}
102+
}
103+
104+
assert(evaluation != null, "Unable to evaluate PythonUDF. Missing input attributes.")
105+
106+
// Trim away the new UDF value if it was only used for filtering or something.
107+
logical.Project(
108+
plan.output,
109+
plan.transformExpressions {
110+
case p: PythonUDF if p.fastEquals(udf) => evaluation.resultAttribute
111+
}.withNewChildren(newChildren))
112+
113+
case None =>
114+
// If there is no Python UDF that is resolved, skip this round.
115+
plan
101116
}
102-
103-
assert(evaluation != null, "Unable to evaluate PythonUDF. Missing input attributes.")
104-
105-
// Trim away the new UDF value if it was only used for filtering or something.
106-
logical.Project(
107-
l.output,
108-
l.transformExpressions {
109-
case p: PythonUDF if p.fastEquals(udf) => evaluation.resultAttribute
110-
}.withNewChildren(newChildren))
111117
}
112118
}
113119
}

0 commit comments

Comments
 (0)