Skip to content

Commit 8d7b82f

Browse files
committed
Merge branch 'master' of https://github.com/apache/spark into yarn_ClientBase
2 parents 1048549 + 7dd9fc6 commit 8d7b82f

File tree

40 files changed

+326
-199
lines changed

40 files changed

+326
-199
lines changed

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,13 @@ class HadoopRDD[K, V](
139139
// Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
140140
// local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
141141
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
142-
val newJobConf = new JobConf(broadcastedConf.value.value)
143-
initLocalJobConfFuncOpt.map(f => f(newJobConf))
144-
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
145-
newJobConf
142+
// synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456)
143+
broadcastedConf.synchronized {
144+
val newJobConf = new JobConf(broadcastedConf.value.value)
145+
initLocalJobConfFuncOpt.map(f => f(newJobConf))
146+
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
147+
newJobConf
148+
}
146149
}
147150
}
148151

core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,15 @@ private object ParallelCollectionRDD {
117117
if (numSlices < 1) {
118118
throw new IllegalArgumentException("Positive number of slices required")
119119
}
120+
// Sequences need to be sliced at the same set of index positions for operations
121+
// like RDD.zip() to behave as expected
122+
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
123+
(0 until numSlices).iterator.map(i => {
124+
val start = ((i * length) / numSlices).toInt
125+
val end = (((i + 1) * length) / numSlices).toInt
126+
(start, end)
127+
})
128+
}
120129
seq match {
121130
case r: Range.Inclusive => {
122131
val sign = if (r.step < 0) {
@@ -128,30 +137,28 @@ private object ParallelCollectionRDD {
128137
r.start, r.end + sign, r.step).asInstanceOf[Seq[T]], numSlices)
129138
}
130139
case r: Range => {
131-
(0 until numSlices).map(i => {
132-
val start = ((i * r.length.toLong) / numSlices).toInt
133-
val end = (((i + 1) * r.length.toLong) / numSlices).toInt
134-
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
135-
}).asInstanceOf[Seq[Seq[T]]]
140+
positions(r.length, numSlices).map({
141+
case (start, end) =>
142+
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
143+
}).toSeq.asInstanceOf[Seq[Seq[T]]]
136144
}
137145
case nr: NumericRange[_] => {
138146
// For ranges of Long, Double, BigInteger, etc
139147
val slices = new ArrayBuffer[Seq[T]](numSlices)
140-
val sliceSize = (nr.size + numSlices - 1) / numSlices // Round up to catch everything
141148
var r = nr
142-
for (i <- 0 until numSlices) {
149+
for ((start, end) <- positions(nr.length, numSlices)) {
150+
val sliceSize = end - start
143151
slices += r.take(sliceSize).asInstanceOf[Seq[T]]
144152
r = r.drop(sliceSize)
145153
}
146154
slices
147155
}
148156
case _ => {
149157
val array = seq.toArray // To prevent O(n^2) operations for List etc
150-
(0 until numSlices).map(i => {
151-
val start = ((i * array.length.toLong) / numSlices).toInt
152-
val end = (((i + 1) * array.length.toLong) / numSlices).toInt
153-
array.slice(start, end).toSeq
154-
})
158+
positions(array.length, numSlices).map({
159+
case (start, end) =>
160+
array.slice(start, end).toSeq
161+
}).toSeq
155162
}
156163
}
157164
}

core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,24 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
111111
assert(slices.forall(_.isInstanceOf[Range]))
112112
}
113113

114+
test("identical slice sizes between Range and NumericRange") {
115+
val r = ParallelCollectionRDD.slice(1 to 7, 4)
116+
val nr = ParallelCollectionRDD.slice(1L to 7L, 4)
117+
assert(r.size === 4)
118+
for (i <- 0 until r.size) {
119+
assert(r(i).size === nr(i).size)
120+
}
121+
}
122+
123+
test("identical slice sizes between List and NumericRange") {
124+
val r = ParallelCollectionRDD.slice(List(1, 2), 4)
125+
val nr = ParallelCollectionRDD.slice(1L to 2L, 4)
126+
assert(r.size === 4)
127+
for (i <- 0 until r.size) {
128+
assert(r(i).size === nr(i).size)
129+
}
130+
}
131+
114132
test("large ranges don't overflow") {
115133
val N = 100 * 1000 * 1000
116134
val data = 0 until N

docs/programming-guide.md

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -377,13 +377,15 @@ Some notes on reading files with Spark:
377377

378378
* The `textFile` method also takes an optional second argument for controlling the number of slices of the file. By default, Spark creates one slice for each block of the file (blocks being 64MB by default in HDFS), but you can also ask for a higher number of slices by passing a larger value. Note that you cannot have fewer slices than blocks.
379379

380-
Apart from reading files as a collection of lines,
381-
`SparkContext.wholeTextFiles` lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with `textFile`, which would return one record per line in each file.
380+
Apart from text files, Spark's Python API also supports several other data formats:
382381

383-
### SequenceFile and Hadoop InputFormats
382+
* `SparkContext.wholeTextFiles` lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with `textFile`, which would return one record per line in each file.
383+
384+
* `RDD.saveAsPickleFile` and `SparkContext.pickleFile` support saving an RDD in a simple format consisting of pickled Python objects. Batching is used on pickle serialization, with default batch size 10.
384385

385-
In addition to reading text files, PySpark supports reading ```SequenceFile```
386-
and any arbitrary ```InputFormat```.
386+
* Details on reading `SequenceFile` and arbitrary Hadoop `InputFormat` are given below.
387+
388+
### SequenceFile and Hadoop InputFormats
387389

388390
**Note** this feature is currently marked ```Experimental``` and is intended for advanced users. It may be replaced in future with read/write support based on SparkSQL, in which case SparkSQL is the preferred approach.
389391

docs/streaming-programming-guide.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -950,7 +950,7 @@ is 200 milliseconds.
950950

951951
An alternative to receiving data with multiple input streams / receivers is to explicitly repartition
952952
the input data stream (using `inputStream.repartition(<number of partitions>)`).
953-
This distributes the received batches of data across all the machines in the cluster
953+
This distributes the received batches of data across specified number of machines in the cluster
954954
before further processing.
955955

956956
### Level of Parallelism in Data Processing

python/pyspark/sql.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#
1717

1818
from pyspark.rdd import RDD
19+
from pyspark.serializers import BatchedSerializer, PickleSerializer
1920

2021
from py4j.protocol import Py4JError
2122

@@ -346,7 +347,8 @@ def _toPython(self):
346347
# TODO: This is inefficient, we should construct the Python Row object
347348
# in Java land in the javaToPython function. May require a custom
348349
# pickle serializer in Pyrolite
349-
return RDD(jrdd, self._sc, self._sc.serializer).map(lambda d: Row(d))
350+
return RDD(jrdd, self._sc, BatchedSerializer(
351+
PickleSerializer())).map(lambda d: Row(d))
350352

351353
# We override the default cache/persist/checkpoint behavior as we want to cache the underlying
352354
# SchemaRDD object in the JVM, not the PythonRDD checkpointed by the super class

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.catalyst.plans.logical
1919

20-
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute}
20+
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BoundReference}
2121
import org.apache.spark.sql.catalyst.types.StringType
2222

2323
/**
@@ -26,35 +26,37 @@ import org.apache.spark.sql.catalyst.types.StringType
2626
*/
2727
abstract class Command extends LeafNode {
2828
self: Product =>
29-
def output: Seq[Attribute] = Seq.empty // TODO: SPARK-2081 should fix this
29+
def output: Seq[Attribute] = Seq.empty
3030
}
3131

3232
/**
3333
* Returned for commands supported by a given parser, but not catalyst. In general these are DDL
3434
* commands that are passed directly to another system.
3535
*/
36-
case class NativeCommand(cmd: String) extends Command
36+
case class NativeCommand(cmd: String) extends Command {
37+
override def output =
38+
Seq(BoundReference(0, AttributeReference("result", StringType, nullable = false)()))
39+
}
3740

3841
/**
3942
* Commands of the form "SET (key) (= value)".
4043
*/
4144
case class SetCommand(key: Option[String], value: Option[String]) extends Command {
4245
override def output = Seq(
43-
AttributeReference("key", StringType, nullable = false)(),
44-
AttributeReference("value", StringType, nullable = false)()
45-
)
46+
BoundReference(0, AttributeReference("key", StringType, nullable = false)()),
47+
BoundReference(1, AttributeReference("value", StringType, nullable = false)()))
4648
}
4749

4850
/**
4951
* Returned by a parser when the users only wants to see what query plan would be executed, without
5052
* actually performing the execution.
5153
*/
5254
case class ExplainCommand(plan: LogicalPlan) extends Command {
53-
override def output = Seq(AttributeReference("plan", StringType, nullable = false)())
55+
override def output =
56+
Seq(BoundReference(0, AttributeReference("plan", StringType, nullable = false)()))
5457
}
5558

5659
/**
5760
* Returned for the "CACHE TABLE tableName" and "UNCACHE TABLE tableName" command.
5861
*/
5962
case class CacheCommand(tableName: String, doCache: Boolean) extends Command
60-

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ class FilterPushdownSuite extends OptimizerTest {
161161

162162
comparePlans(optimized, correctAnswer)
163163
}
164-
164+
165165
test("joins: push down left outer join #1") {
166166
val x = testRelation.subquery('x)
167167
val y = testRelation.subquery('y)

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 5 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.{ScalaReflection, dsl}
3131
import org.apache.spark.sql.catalyst.expressions._
3232
import org.apache.spark.sql.catalyst.types._
3333
import org.apache.spark.sql.catalyst.optimizer.Optimizer
34-
import org.apache.spark.sql.catalyst.plans.logical.{SetCommand, LogicalPlan}
34+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3535
import org.apache.spark.sql.catalyst.rules.RuleExecutor
3636

3737
import org.apache.spark.sql.columnar.InMemoryRelation
@@ -147,14 +147,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
147147
*
148148
* @group userf
149149
*/
150-
def sql(sqlText: String): SchemaRDD = {
151-
val result = new SchemaRDD(this, parseSql(sqlText))
152-
// We force query optimization to happen right away instead of letting it happen lazily like
153-
// when using the query DSL. This is so DDL commands behave as expected. This is only
154-
// generates the RDD lineage for DML queries, but do not perform any execution.
155-
result.queryExecution.toRdd
156-
result
157-
}
150+
def sql(sqlText: String): SchemaRDD = new SchemaRDD(this, parseSql(sqlText))
158151

159152
/** Returns the specified table as a SchemaRDD */
160153
def table(tableName: String): SchemaRDD =
@@ -259,8 +252,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
259252
protected[sql] val planner = new SparkPlanner
260253

261254
@transient
262-
protected[sql] lazy val emptyResult =
263-
sparkContext.parallelize(Seq(new GenericRow(Array[Any]()): Row), 1)
255+
protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[Row], 1)
264256

265257
/**
266258
* Prepares a planned SparkPlan for execution by binding references to specific ordinals, and
@@ -280,35 +272,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
280272
protected abstract class QueryExecution {
281273
def logical: LogicalPlan
282274

283-
def eagerlyProcess(plan: LogicalPlan): RDD[Row] = plan match {
284-
case SetCommand(key, value) =>
285-
// Only this case needs to be executed eagerly. The other cases will
286-
// be taken care of when the actual results are being extracted.
287-
// In the case of HiveContext, sqlConf is overridden to also pass the
288-
// pair into its HiveConf.
289-
if (key.isDefined && value.isDefined) {
290-
set(key.get, value.get)
291-
}
292-
// It doesn't matter what we return here, since this is only used
293-
// to force the evaluation to happen eagerly. To query the results,
294-
// one must use SchemaRDD operations to extract them.
295-
emptyResult
296-
case _ => executedPlan.execute()
297-
}
298-
299275
lazy val analyzed = analyzer(logical)
300276
lazy val optimizedPlan = optimizer(analyzed)
301277
// TODO: Don't just pick the first one...
302278
lazy val sparkPlan = planner(optimizedPlan).next()
303279
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
304280

305281
/** Internal version of the RDD. Avoids copies and has no schema */
306-
lazy val toRdd: RDD[Row] = {
307-
logical match {
308-
case s: SetCommand => eagerlyProcess(s)
309-
case _ => executedPlan.execute()
310-
}
311-
}
282+
lazy val toRdd: RDD[Row] = executedPlan.execute()
312283

313284
protected def stringOrError[A](f: => A): String =
314285
try f.toString catch { case e: Throwable => e.toString }
@@ -330,7 +301,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
330301
* TODO: We only support primitive types, add support for nested types.
331302
*/
332303
private[sql] def inferSchema(rdd: RDD[Map[String, _]]): SchemaRDD = {
333-
val schema = rdd.first.map { case (fieldName, obj) =>
304+
val schema = rdd.first().map { case (fieldName, obj) =>
334305
val dataType = obj.getClass match {
335306
case c: Class[_] if c == classOf[java.lang.String] => StringType
336307
case c: Class[_] if c == classOf[java.lang.Integer] => IntegerType

sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ import java.util.{Map => JMap}
9797
@AlphaComponent
9898
class SchemaRDD(
9999
@transient val sqlContext: SQLContext,
100-
@transient protected[spark] val logicalPlan: LogicalPlan)
100+
@transient val baseLogicalPlan: LogicalPlan)
101101
extends RDD[Row](sqlContext.sparkContext, Nil) with SchemaRDDLike {
102102

103103
def baseSchemaRDD = this
@@ -347,16 +347,11 @@ class SchemaRDD(
347347
val pickle = new Pickler
348348
iter.map { row =>
349349
val map: JMap[String, Any] = new java.util.HashMap
350-
// TODO: We place the map in an ArrayList so that the object is pickled to a List[Dict].
351-
// Ideally we should be able to pickle an object directly into a Python collection so we
352-
// don't have to create an ArrayList every time.
353-
val arr: java.util.ArrayList[Any] = new java.util.ArrayList
354350
row.zip(fieldNames).foreach { case (obj, name) =>
355351
map.put(name, obj)
356352
}
357-
arr.add(map)
358-
pickle.dumps(arr)
359-
}
353+
map
354+
}.grouped(10).map(batched => pickle.dumps(batched.toArray))
360355
}
361356
}
362357

0 commit comments

Comments
 (0)