Skip to content

Commit b6a9aa1

Browse files
committed
merge master
2 parents c8b4554 + 237b96b commit b6a9aa1

File tree

15 files changed

+210
-106
lines changed

15 files changed

+210
-106
lines changed

docs/running-on-yarn.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,17 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
7979
<td>(none)</td>
8080
<td>
8181
Comma-separated list of files to be placed in the working directory of each executor.
82+
<td><code>spark.yarn.executor.memoryOverhead</code></td>
83+
<td>384</code></td>
84+
<td>
85+
The amount of off heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
86+
</td>
87+
</tr>
88+
<tr>
89+
<td><code>spark.yarn.driver.memoryOverhead</code></td>
90+
<td>384</code></td>
91+
<td>
92+
The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
8293
</td>
8394
</tr>
8495
</table>

python/pyspark/sql.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,25 @@ def inferSchema(self, rdd):
7777
"""Infer and apply a schema to an RDD of L{dict}s.
7878
7979
We peek at the first row of the RDD to determine the fields names
80-
and types, and then use that to extract all the dictionaries.
80+
and types, and then use that to extract all the dictionaries. Nested
81+
collections are supported, which include array, dict, list, set, and
82+
tuple.
8183
8284
>>> srdd = sqlCtx.inferSchema(rdd)
8385
>>> srdd.collect() == [{"field1" : 1, "field2" : "row1"}, {"field1" : 2, "field2": "row2"},
8486
... {"field1" : 3, "field2": "row3"}]
8587
True
88+
89+
>>> from array import array
90+
>>> srdd = sqlCtx.inferSchema(nestedRdd1)
91+
>>> srdd.collect() == [{"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}},
92+
... {"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}]
93+
True
94+
95+
>>> srdd = sqlCtx.inferSchema(nestedRdd2)
96+
>>> srdd.collect() == [{"f1" : [[1, 2], [2, 3]], "f2" : set([1, 2]), "f3" : (1, 2)},
97+
... {"f1" : [[2, 3], [3, 4]], "f2" : set([2, 3]), "f3" : (2, 3)}]
98+
True
8699
"""
87100
if (rdd.__class__ is SchemaRDD):
88101
raise ValueError("Cannot apply schema to %s" % SchemaRDD.__name__)
@@ -413,6 +426,7 @@ def subtract(self, other, numPartitions=None):
413426

414427
def _test():
415428
import doctest
429+
from array import array
416430
from pyspark.context import SparkContext
417431
globs = globals().copy()
418432
# The small batch size here ensures that we see multiple batches,
@@ -422,6 +436,12 @@ def _test():
422436
globs['sqlCtx'] = SQLContext(sc)
423437
globs['rdd'] = sc.parallelize([{"field1" : 1, "field2" : "row1"},
424438
{"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}])
439+
globs['nestedRdd1'] = sc.parallelize([
440+
{"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}},
441+
{"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}])
442+
globs['nestedRdd2'] = sc.parallelize([
443+
{"f1" : [[1, 2], [2, 3]], "f2" : set([1, 2]), "f3" : (1, 2)},
444+
{"f1" : [[2, 3], [3, 4]], "f2" : set([2, 3]), "f3" : (2, 3)}])
425445
(failure_count, test_count) = doctest.testmod(globs=globs,optionflags=doctest.ELLIPSIS)
426446
globs['sc'].stop()
427447
if failure_count:

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -298,19 +298,28 @@ class SQLContext(@transient val sparkContext: SparkContext)
298298

299299
/**
300300
* Peek at the first row of the RDD and infer its schema.
301-
* TODO: We only support primitive types, add support for nested types.
301+
* TODO: consolidate this with the type system developed in SPARK-2060.
302302
*/
303303
private[sql] def inferSchema(rdd: RDD[Map[String, _]]): SchemaRDD = {
304+
import scala.collection.JavaConversions._
305+
def typeFor(obj: Any): DataType = obj match {
306+
case c: java.lang.String => StringType
307+
case c: java.lang.Integer => IntegerType
308+
case c: java.lang.Long => LongType
309+
case c: java.lang.Double => DoubleType
310+
case c: java.lang.Boolean => BooleanType
311+
case c: java.util.List[_] => ArrayType(typeFor(c.head))
312+
case c: java.util.Set[_] => ArrayType(typeFor(c.head))
313+
case c: java.util.Map[_, _] =>
314+
val (key, value) = c.head
315+
MapType(typeFor(key), typeFor(value))
316+
case c if c.getClass.isArray =>
317+
val elem = c.asInstanceOf[Array[_]].head
318+
ArrayType(typeFor(elem))
319+
case c => throw new Exception(s"Object of type $c cannot be used")
320+
}
304321
val schema = rdd.first().map { case (fieldName, obj) =>
305-
val dataType = obj.getClass match {
306-
case c: Class[_] if c == classOf[java.lang.String] => StringType
307-
case c: Class[_] if c == classOf[java.lang.Integer] => IntegerType
308-
case c: Class[_] if c == classOf[java.lang.Long] => LongType
309-
case c: Class[_] if c == classOf[java.lang.Double] => DoubleType
310-
case c: Class[_] if c == classOf[java.lang.Boolean] => BooleanType
311-
case c => throw new Exception(s"Object of type $c cannot be used")
312-
}
313-
AttributeReference(fieldName, dataType, true)()
322+
AttributeReference(fieldName, typeFor(obj), true)()
314323
}.toSeq
315324

316325
val rowRdd = rdd.mapPartitions { iter =>

sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,19 +40,13 @@ class JavaSQLContext(val sqlContext: SQLContext) {
4040
/**
4141
* Executes a query expressed in SQL, returning the result as a JavaSchemaRDD
4242
*/
43-
def sql(sqlQuery: String): JavaSchemaRDD = {
44-
val result = new JavaSchemaRDD(sqlContext, sqlContext.parseSql(sqlQuery))
45-
// We force query optimization to happen right away instead of letting it happen lazily like
46-
// when using the query DSL. This is so DDL commands behave as expected. This is only
47-
// generates the RDD lineage for DML queries, but do not perform any execution.
48-
result.queryExecution.toRdd
49-
result
50-
}
43+
def sql(sqlQuery: String): JavaSchemaRDD =
44+
new JavaSchemaRDD(sqlContext, sqlContext.parseSql(sqlQuery))
5145

5246
/**
5347
* :: Experimental ::
5448
* Creates an empty parquet file with the schema of class `beanClass`, which can be registered as
55-
* a table. This registered table can be used as the target of future insertInto` operations.
49+
* a table. This registered table can be used as the target of future `insertInto` operations.
5650
*
5751
* {{{
5852
* JavaSQLContext sqlCtx = new JavaSQLContext(...)
@@ -62,7 +56,7 @@ class JavaSQLContext(val sqlContext: SQLContext) {
6256
* }}}
6357
*
6458
* @param beanClass A java bean class object that will be used to determine the schema of the
65-
* parquet file. s
59+
* parquet file.
6660
* @param path The path where the directory containing parquet metadata should be created.
6761
* Data inserted into this table will also be stored at this location.
6862
* @param allowExisting When false, an exception will be thrown if this directory already exists.
@@ -100,14 +94,12 @@ class JavaSQLContext(val sqlContext: SQLContext) {
10094
new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd)))
10195
}
10296

103-
10497
/**
10598
* Loads a parquet file, returning the result as a [[JavaSchemaRDD]].
10699
*/
107100
def parquetFile(path: String): JavaSchemaRDD =
108101
new JavaSchemaRDD(sqlContext, ParquetRelation(path))
109102

110-
111103
/**
112104
* Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
113105
* during the lifetime of this instance of SQLContext.

sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ case class ExplainCommand(
8383
override protected[sql] lazy val sideEffectResult: Seq[String] = this.toString.split("\n")
8484

8585
def execute(): RDD[Row] = {
86-
val explanation = sideEffectResult.mkString("\n")
87-
context.sparkContext.parallelize(Seq(new GenericRow(Array[Any](explanation))), 1)
86+
val explanation = sideEffectResult.map(row => new GenericRow(Array[Any](row)))
87+
context.sparkContext.parallelize(explanation, 1)
8888
}
8989

9090
override def otherCopyArgs = context :: Nil

sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,6 @@ class JavaHiveContext(sparkContext: JavaSparkContext) extends JavaSQLContext(spa
3131
/**
3232
* Executes a query expressed in HiveQL, returning the result as a JavaSchemaRDD.
3333
*/
34-
def hql(hqlQuery: String): JavaSchemaRDD = {
35-
val result = new JavaSchemaRDD(sqlContext, HiveQl.parseSql(hqlQuery))
36-
// We force query optimization to happen right away instead of letting it happen lazily like
37-
// when using the query DSL. This is so DDL commands behave as expected. This is only
38-
// generates the RDD lineage for DML queries, but do not perform any execution.
39-
result.queryExecution.toRdd
40-
result
41-
}
34+
def hql(hqlQuery: String): JavaSchemaRDD =
35+
new JavaSchemaRDD(sqlContext, HiveQl.parseSql(hqlQuery))
4236
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive.api.java
19+
20+
import scala.util.Try
21+
22+
import org.scalatest.FunSuite
23+
24+
import org.apache.spark.api.java.JavaSparkContext
25+
import org.apache.spark.sql.api.java.JavaSchemaRDD
26+
import org.apache.spark.sql.execution.ExplainCommand
27+
import org.apache.spark.sql.hive.test.TestHive
28+
import org.apache.spark.sql.test.TestSQLContext
29+
30+
// Implicits
31+
import scala.collection.JavaConversions._
32+
33+
class JavaHiveQLSuite extends FunSuite {
34+
lazy val javaCtx = new JavaSparkContext(TestSQLContext.sparkContext)
35+
36+
// There is a little trickery here to avoid instantiating two HiveContexts in the same JVM
37+
lazy val javaHiveCtx = new JavaHiveContext(javaCtx) {
38+
override val sqlContext = TestHive
39+
}
40+
41+
ignore("SELECT * FROM src") {
42+
assert(
43+
javaHiveCtx.hql("SELECT * FROM src").collect().map(_.getInt(0)) ===
44+
TestHive.sql("SELECT * FROM src").collect().map(_.getInt(0)).toSeq)
45+
}
46+
47+
private val explainCommandClassName =
48+
classOf[ExplainCommand].getSimpleName.stripSuffix("$")
49+
50+
def isExplanation(result: JavaSchemaRDD) = {
51+
val explanation = result.collect().map(_.getString(0))
52+
explanation.size > 1 && explanation.head.startsWith(explainCommandClassName)
53+
}
54+
55+
ignore("Query Hive native command execution result") {
56+
val tableName = "test_native_commands"
57+
58+
assertResult(0) {
59+
javaHiveCtx.hql(s"DROP TABLE IF EXISTS $tableName").count()
60+
}
61+
62+
assertResult(0) {
63+
javaHiveCtx.hql(s"CREATE TABLE $tableName(key INT, value STRING)").count()
64+
}
65+
66+
javaHiveCtx.hql("SHOW TABLES").registerAsTable("show_tables")
67+
68+
assert(
69+
javaHiveCtx
70+
.hql("SELECT result FROM show_tables")
71+
.collect()
72+
.map(_.getString(0))
73+
.contains(tableName))
74+
75+
assertResult(Array(Array("key", "int", "None"), Array("value", "string", "None"))) {
76+
javaHiveCtx.hql(s"DESCRIBE $tableName").registerAsTable("describe_table")
77+
78+
javaHiveCtx
79+
.hql("SELECT result FROM describe_table")
80+
.collect()
81+
.map(_.getString(0).split("\t").map(_.trim))
82+
.toArray
83+
}
84+
85+
assert(isExplanation(javaHiveCtx.hql(
86+
s"EXPLAIN SELECT key, COUNT(*) FROM $tableName GROUP BY key")))
87+
88+
TestHive.reset()
89+
}
90+
91+
ignore("Exactly once semantics for DDL and command statements") {
92+
val tableName = "test_exactly_once"
93+
val q0 = javaHiveCtx.hql(s"CREATE TABLE $tableName(key INT, value STRING)")
94+
95+
// If the table was not created, the following assertion would fail
96+
assert(Try(TestHive.table(tableName)).isSuccess)
97+
98+
// If the CREATE TABLE command got executed again, the following assertion would fail
99+
assert(Try(q0.count()).isSuccess)
100+
}
101+
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveSuite.scala

Lines changed: 0 additions & 41 deletions
This file was deleted.

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ class HiveQuerySuite extends HiveComparisonTest {
169169

170170
def isExplanation(result: SchemaRDD) = {
171171
val explanation = result.select('plan).collect().map { case Row(plan: String) => plan }
172-
explanation.size == 1 && explanation.head.startsWith(explainCommandClassName)
172+
explanation.size > 1 && explanation.head.startsWith(explainCommandClassName)
173173
}
174174

175175
test("SPARK-1704: Explain commands as a SchemaRDD") {
@@ -184,25 +184,29 @@ class HiveQuerySuite extends HiveComparisonTest {
184184
test("Query Hive native command execution result") {
185185
val tableName = "test_native_commands"
186186

187-
val q0 = hql(s"DROP TABLE IF EXISTS $tableName")
188-
assert(q0.count() == 0)
187+
assertResult(0) {
188+
hql(s"DROP TABLE IF EXISTS $tableName").count()
189+
}
189190

190-
val q1 = hql(s"CREATE TABLE $tableName(key INT, value STRING)")
191-
assert(q1.count() == 0)
191+
assertResult(0) {
192+
hql(s"CREATE TABLE $tableName(key INT, value STRING)").count()
193+
}
192194

193-
val q2 = hql("SHOW TABLES")
194-
val tables = q2.select('result).collect().map { case Row(table: String) => table }
195-
assert(tables.contains(tableName))
195+
assert(
196+
hql("SHOW TABLES")
197+
.select('result)
198+
.collect()
199+
.map(_.getString(0))
200+
.contains(tableName))
196201

197-
val q3 = hql(s"DESCRIBE $tableName")
198202
assertResult(Array(Array("key", "int", "None"), Array("value", "string", "None"))) {
199-
q3.select('result).collect().map { case Row(fieldDesc: String) =>
200-
fieldDesc.split("\t").map(_.trim)
201-
}
203+
hql(s"DESCRIBE $tableName")
204+
.select('result)
205+
.collect()
206+
.map(_.getString(0).split("\t").map(_.trim))
202207
}
203208

204-
val q4 = hql(s"EXPLAIN SELECT key, COUNT(*) FROM $tableName GROUP BY key")
205-
assert(isExplanation(q4))
209+
assert(isExplanation(hql(s"EXPLAIN SELECT key, COUNT(*) FROM $tableName GROUP BY key")))
206210

207211
TestHive.reset()
208212
}

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
7171

7272
val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
7373
// Memory for the ApplicationMaster.
74-
capability.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
74+
capability.setMemory(args.amMemory + memoryOverhead)
7575
amContainer.setResource(capability)
7676

7777
appContext.setQueue(args.amQueue)
@@ -115,7 +115,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
115115
val minResMemory = newApp.getMinimumResourceCapability().getMemory()
116116
val amMemory = ((args.amMemory / minResMemory) * minResMemory) +
117117
((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) -
118-
YarnAllocationHandler.MEMORY_OVERHEAD)
118+
memoryOverhead)
119119
amMemory
120120
}
121121

0 commit comments

Comments
 (0)