Skip to content

Commit 061880f

Browse files
committed
Addressed all comments by @pwendell
1 parent 7755062 commit 061880f

File tree

16 files changed

+73
-128
lines changed

16 files changed

+73
-128
lines changed

assembly/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,11 @@
163163
<artifactId>spark-hive_${scala.binary.version}</artifactId>
164164
<version>${project.version}</version>
165165
</dependency>
166+
</dependencies>
167+
</profile>
168+
<profile>
169+
<id>hive-thriftserver</id>
170+
<dependencies>
166171
<dependency>
167172
<groupId>org.apache.spark</groupId>
168173
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>

bin/spark-sql

Lines changed: 3 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,8 @@
1717
# limitations under the License.
1818
#
1919

20-
SCALA_VERSION=2.10
21-
22-
cygwin=false
23-
case "`uname`" in
24-
CYGWIN*) cygwin=true;;
25-
esac
26-
27-
# Enter posix mode for bash
28-
set -o posix
20+
#
21+
# Shell script for starting the Spark SQL CLI
2922

3023
# Figure out where Spark is installed
3124
FWDIR="$(cd `dirname $0`/..; pwd)"
@@ -36,46 +29,5 @@ if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
3629
exit 0
3730
fi
3831

39-
ASSEMBLY_DIR="$FWDIR/assembly/target/scala-$SCALA_VERSION"
40-
41-
if [ -n "$JAVA_HOME" ]; then
42-
JAR_CMD="$JAVA_HOME/bin/jar"
43-
else
44-
JAR_CMD="jar"
45-
fi
46-
47-
# Use spark-assembly jar from either RELEASE or assembly directory
48-
if [ -f "$FWDIR/RELEASE" ]; then
49-
assembly_folder="$FWDIR"/lib
50-
else
51-
assembly_folder="$ASSEMBLY_DIR"
52-
fi
53-
54-
num_jars=$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*\.jar" | wc -l)
55-
if [ "$num_jars" -eq "0" ]; then
56-
echo "Failed to find Spark assembly in $assembly_folder"
57-
echo "You need to build Spark before running this program."
58-
exit 1
59-
fi
60-
if [ "$num_jars" -gt "1" ]; then
61-
jars_list=$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*.jar")
62-
echo "Found multiple Spark assembly jars in $assembly_folder:"
63-
echo "$jars_list"
64-
echo "Please remove all but one jar."
65-
exit 1
66-
fi
67-
68-
ASSEMBLY_JAR=$(ls "$assembly_folder"/spark-assembly*hadoop*.jar 2>/dev/null)
69-
70-
# Verify that versions of java used to build the jars and run Spark are compatible
71-
jar_error_check=$("$JAR_CMD" -tf "$ASSEMBLY_JAR" nonexistent/class/path 2>&1)
72-
if [[ "$jar_error_check" =~ "invalid CEN header" ]]; then
73-
echo "Loading Spark jar with '$JAR_CMD' failed. " 1>&2
74-
echo "This is likely because Spark was compiled with Java 7 and run " 1>&2
75-
echo "with Java 6. (see SPARK-1703). Please use Java 7 to run Spark " 1>&2
76-
echo "or build Spark with Java 6." 1>&2
77-
exit 1
78-
fi
79-
8032
CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
81-
exec "$FWDIR"/bin/spark-submit --class $CLASS $@ $ASSEMBLY_JAR
33+
exec "$FWDIR"/bin/spark-submit --class $CLASS $@ spark-internal

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ object SparkSubmit {
4646
private val CLUSTER = 2
4747
private val ALL_DEPLOY_MODES = CLIENT | CLUSTER
4848

49+
// A special jar name that indicates the class being run is inside of Spark itself, and therefore
50+
// no user jar is needed.
51+
private val SPARK_INTERNAL = "spark-internal"
52+
4953
// Special primary resource names that represent shells rather than application jars.
5054
private val SPARK_SHELL = "spark-shell"
5155
private val PYSPARK_SHELL = "pyspark-shell"
@@ -257,7 +261,9 @@ object SparkSubmit {
257261
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
258262
if (clusterManager == YARN && deployMode == CLUSTER) {
259263
childMainClass = "org.apache.spark.deploy.yarn.Client"
260-
childArgs += ("--jar", args.primaryResource)
264+
if (args.primaryResource != SPARK_INTERNAL) {
265+
childArgs += ("--jar", args.primaryResource)
266+
}
261267
childArgs += ("--class", args.mainClass)
262268
if (args.childArgs != null) {
263269
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
@@ -332,7 +338,7 @@ object SparkSubmit {
332338
* Return whether the given primary resource represents a user jar.
333339
*/
334340
private def isUserJar(primaryResource: String): Boolean = {
335-
!isShell(primaryResource) && !isPython(primaryResource)
341+
!isShell(primaryResource) && !isPython(primaryResource) && !isInternal(primaryResource)
336342
}
337343

338344
/**
@@ -349,6 +355,10 @@ object SparkSubmit {
349355
primaryResource.endsWith(".py") || primaryResource == PYSPARK_SHELL
350356
}
351357

358+
private[spark] def isInternal(primaryResource: String): Boolean = {
359+
primaryResource == SPARK_INTERNAL
360+
}
361+
352362
/**
353363
* Merge a sequence of comma-separated file lists, some of which may be null to indicate
354364
* no files, into a single comma-separated string.

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
318318
SparkSubmit.printErrorAndExit(errMessage)
319319
case v =>
320320
primaryResource =
321-
if (!SparkSubmit.isShell(v)) {
321+
if (!SparkSubmit.isShell(v) && !SparkSubmit.isInternal(v)) {
322322
Utils.resolveURI(v).toString
323323
} else {
324324
v

docs/sql-programming-guide.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -605,7 +605,7 @@ You may also use the beeline script comes with Hive.
605605

606606
#### Reducer number
607607

608-
In Shark, default reducer number is 1, and can be tuned by property `mapred.reduce.tasks`. In Spark SQL, reducer number is default to 200, and can be customized by the `spark.sql.shuffle.partitions` property:
608+
In Shark, default reducer number is 1 and is controlled by the property `mapred.reduce.tasks`. Spark SQL deprecates this property by a new property `spark.sql.shuffle.partitions`, whose default value is 200. Users may customize this property via `SET`:
609609

610610
```
611611
SET spark.sql.shuffle.partitions=10;
@@ -615,6 +615,8 @@ GROUP BY page ORDER BY c DESC LIMIT 10;
615615

616616
You may also put this property in `hive-site.xml` to override the default value.
617617

618+
For now, the `mapred.reduce.tasks` property is still recognized, and is converted to `spark.sql.shuffle.partitions` automatically.
619+
618620
#### Caching
619621

620622
The `shark.cache` table property no longer exists, and tables whose name end with `_cached` are no longer automcatically cached. Instead, we provide `CACHE TABLE` and `UNCACHE TABLE` statements to let user control table caching explicitly:
@@ -697,7 +699,7 @@ Spark SQL supports the vast majority of Hive features, such as:
697699

698700
#### Unsupported Hive Functionality
699701

700-
Below is a list of Hive features that we don't support yet. Most of these features are rarely used in Hive deployments.
702+
Below is a list of Hive features that we don't support yet. Most of these features are rarely used in Hive deployments.
701703

702704
**Major Hive Features**
703705

@@ -723,7 +725,7 @@ A handful of Hive optimizations are not yet included in Spark. Some of these (su
723725

724726
* Block level bitmap indexes and virtual columns (used to build indexes)
725727
* Automatically convert a join to map join: For joining a large table with multiple small tables, Hive automatically converts the join into a map join. We are adding this auto conversion in the next release.
726-
* Automatically determine the number of reducers for joins and groupbys: Currently in Spark SQL, you need to control the degree of parallelism post-shuffle using "set mapred.reduce.tasks=[num_tasks];". We are going to add auto-setting of parallelism in the next release.
728+
* Automatically determine the number of reducers for joins and groupbys: Currently in Spark SQL, you need to control the degree of parallelism post-shuffle using "SET spark.sql.shuffle.partitions=[num_tasks];". We are going to add auto-setting of parallelism in the next release.
727729
* Meta-data only query: For queries that can be answered by using only meta data, Spark SQL still launches tasks to compute the result.
728730
* Skew data flag: Spark SQL does not follow the skew data flags in Hive.
729731
* `STREAMTABLE` hint in join: Spark SQL does not follow the `STREAMTABLE` hint.

sbin/start-thriftserver.sh

Lines changed: 4 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -20,65 +20,17 @@
2020
#
2121
# Shell script for starting the Spark SQL Thrift server
2222

23-
SCALA_VERSION=2.10
24-
25-
cygwin=false
26-
case "`uname`" in
27-
CYGWIN*) cygwin=true;;
28-
esac
29-
3023
# Enter posix mode for bash
3124
set -o posix
3225

26+
# Figure out where Spark is installed
27+
FWDIR="$(cd `dirname $0`/..; pwd)"
28+
3329
if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
3430
echo "Usage: ./sbin/start-thriftserver [options]"
3531
$FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
3632
exit 0
3733
fi
3834

39-
# Figure out where Spark is installed
40-
FWDIR="$(cd `dirname $0`/..; pwd)"
41-
42-
ASSEMBLY_DIR="$FWDIR/assembly/target/scala-$SCALA_VERSION"
43-
44-
if [ -n "$JAVA_HOME" ]; then
45-
JAR_CMD="$JAVA_HOME/bin/jar"
46-
else
47-
JAR_CMD="jar"
48-
fi
49-
50-
# Use spark-assembly jar from either RELEASE or assembly directory
51-
if [ -f "$FWDIR/RELEASE" ]; then
52-
assembly_folder="$FWDIR"/lib
53-
else
54-
assembly_folder="$ASSEMBLY_DIR"
55-
fi
56-
57-
num_jars=$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*\.jar" | wc -l)
58-
if [ "$num_jars" -eq "0" ]; then
59-
echo "Failed to find Spark assembly in $assembly_folder"
60-
echo "You need to build Spark before running this program."
61-
exit 1
62-
fi
63-
if [ "$num_jars" -gt "1" ]; then
64-
jars_list=$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*.jar")
65-
echo "Found multiple Spark assembly jars in $assembly_folder:"
66-
echo "$jars_list"
67-
echo "Please remove all but one jar."
68-
exit 1
69-
fi
70-
71-
ASSEMBLY_JAR=$(ls "$assembly_folder"/spark-assembly*hadoop*.jar 2>/dev/null)
72-
73-
# Verify that versions of java used to build the jars and run Spark are compatible
74-
jar_error_check=$("$JAR_CMD" -tf "$ASSEMBLY_JAR" nonexistent/class/path 2>&1)
75-
if [[ "$jar_error_check" =~ "invalid CEN header" ]]; then
76-
echo "Loading Spark jar with '$JAR_CMD' failed. " 1>&2
77-
echo "This is likely because Spark was compiled with Java 7 and run " 1>&2
78-
echo "with Java 6. (see SPARK-1703). Please use Java 7 to run Spark " 1>&2
79-
echo "or build Spark with Java 6." 1>&2
80-
exit 1
81-
fi
82-
8335
CLASS="org.apache.spark.sql.hive.thriftserver.HiveThriftServer2"
84-
exec "$FWDIR"/bin/spark-submit --class $CLASS $@ $ASSEMBLY_JAR
36+
exec "$FWDIR"/bin/spark-submit --class $CLASS $@ spark-internal

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,24 +30,24 @@ import scala.collection.JavaConverters._
3030
* SQLConf is thread-safe (internally synchronized so safe to be used in multiple threads).
3131
*/
3232
trait SQLConf {
33+
import SQLConf._
3334

3435
/** ************************ Spark SQL Params/Hints ******************* */
3536
// TODO: refactor so that these hints accessors don't pollute the name space of SQLContext?
3637

3738
/** Number of partitions to use for shuffle operators. */
38-
private[spark] def numShufflePartitions: Int = get("spark.sql.shuffle.partitions", "200").toInt
39+
private[spark] def numShufflePartitions: Int = get(SHUFFLE_PARTITIONS, "200").toInt
3940

4041
/**
4142
* Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to
4243
* a broadcast value during the physical executions of join operations. Setting this to 0
4344
* effectively disables auto conversion.
4445
* Hive setting: hive.auto.convert.join.noconditionaltask.size.
4546
*/
46-
private[spark] def autoConvertJoinSize: Int =
47-
get("spark.sql.auto.convert.join.size", "10000").toInt
47+
private[spark] def autoConvertJoinSize: Int = get(AUTO_CONVERT_JOIN_SIZE, "10000").toInt
4848

4949
/** A comma-separated list of table names marked to be broadcasted during joins. */
50-
private[spark] def joinBroadcastTables: String = get("spark.sql.join.broadcastTables", "")
50+
private[spark] def joinBroadcastTables: String = get(JOIN_BROADCAST_TABLES, "")
5151

5252
/** ********************** SQLConf functionality methods ************ */
5353

@@ -61,7 +61,7 @@ trait SQLConf {
6161

6262
def set(key: String, value: String): Unit = {
6363
require(key != null, "key cannot be null")
64-
require(value != null, s"value cannot be null for ${key}")
64+
require(value != null, s"value cannot be null for $key")
6565
settings.put(key, value)
6666
}
6767

@@ -90,3 +90,13 @@ trait SQLConf {
9090
}
9191

9292
}
93+
94+
object SQLConf {
95+
val AUTO_CONVERT_JOIN_SIZE = "spark.sql.auto.convert.join.size"
96+
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
97+
val JOIN_BROADCAST_TABLES = "spark.sql.join.broadcastTables"
98+
99+
object Deprecated {
100+
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
101+
}
102+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@
1717

1818
package org.apache.spark.sql.execution
1919

20+
import org.apache.spark.Logging
2021
import org.apache.spark.annotation.DeveloperApi
2122
import org.apache.spark.rdd.RDD
2223
import org.apache.spark.sql.catalyst.errors.TreeNodeException
2324
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow}
2425
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
25-
import org.apache.spark.sql.{Row, SQLContext}
26+
import org.apache.spark.sql.{Row, SQLConf, SQLContext}
2627

2728
trait Command {
2829
/**
@@ -44,13 +45,20 @@ trait Command {
4445
case class SetCommand(
4546
key: Option[String], value: Option[String], output: Seq[Attribute])(
4647
@transient context: SQLContext)
47-
extends LeafNode with Command {
48+
extends LeafNode with Command with Logging {
4849

4950
override protected[sql] lazy val sideEffectResult: Seq[String] = (key, value) match {
5051
// Set value for key k.
5152
case (Some(k), Some(v)) =>
52-
context.set(k, v)
53-
Array(s"$k=$v")
53+
if (k == SQLConf.Deprecated.MAPRED_REDUCE_TASKS) {
54+
logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
55+
s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.")
56+
context.set(SQLConf.SHUFFLE_PARTITIONS, v)
57+
Array(s"${SQLConf.SHUFFLE_PARTITIONS}=$v")
58+
} else {
59+
context.set(k, v)
60+
Array(s"$k=$v")
61+
}
5462

5563
// Query the value bound to key k.
5664
case (Some(k), _) =>

sql/hive-thriftserver/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,13 @@
7070
<groupId>org.scalatest</groupId>
7171
<artifactId>scalatest-maven-plugin</artifactId>
7272
</plugin>
73+
<plugin>
74+
<groupId>org.apache.maven.plugins</groupId>
75+
<artifactId>maven-deploy-plugin</artifactId>
76+
<configuration>
77+
<skip>true</skip>
78+
</configuration>
79+
</plugin>
7380
</plugins>
7481
</build>
7582
</project>

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
3030
* The main entry point for the Spark SQL port of HiveServer2. Starts up a `SparkSQLContext` and a
3131
* `HiveThriftServer2` thrift server.
3232
*/
33-
object HiveThriftServer2 extends Logging {
33+
private[hive] object HiveThriftServer2 extends Logging {
3434
var LOG = LogFactory.getLog(classOf[HiveServer2])
3535

3636
def main(args: Array[String]) {
@@ -73,7 +73,7 @@ object HiveThriftServer2 extends Logging {
7373
}
7474
}
7575

76-
class HiveThriftServer2(hiveContext: HiveContext)
76+
private[hive] class HiveThriftServer2(hiveContext: HiveContext)
7777
extends HiveServer2
7878
with ReflectedCompositeService {
7979

0 commit comments

Comments
 (0)