Skip to content

Commit ffb47cb

Browse files
committed
Merge remote-tracking branch 'upstream/master' into arrow-toPandas-stream-SPARK-23030
2 parents 2fe46f8 + 7ad18ee commit ffb47cb

File tree

47 files changed

+1296
-273
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1296
-273
lines changed

common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -77,16 +77,16 @@ public ByteBuffer nioByteBuffer() throws IOException {
7777
return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
7878
}
7979
} catch (IOException e) {
80+
String errorMessage = "Error in reading " + this;
8081
try {
8182
if (channel != null) {
8283
long size = channel.size();
83-
throw new IOException("Error in reading " + this + " (actual file length " + size + ")",
84-
e);
84+
errorMessage = "Error in reading " + this + " (actual file length " + size + ")";
8585
}
8686
} catch (IOException ignored) {
8787
// ignore
8888
}
89-
throw new IOException("Error in opening " + this, e);
89+
throw new IOException(errorMessage, e);
9090
} finally {
9191
JavaUtils.closeQuietly(channel);
9292
}
@@ -95,26 +95,24 @@ public ByteBuffer nioByteBuffer() throws IOException {
9595
@Override
9696
public InputStream createInputStream() throws IOException {
9797
FileInputStream is = null;
98+
boolean shouldClose = true;
9899
try {
99100
is = new FileInputStream(file);
100101
ByteStreams.skipFully(is, offset);
101-
return new LimitedInputStream(is, length);
102+
InputStream r = new LimitedInputStream(is, length);
103+
shouldClose = false;
104+
return r;
102105
} catch (IOException e) {
103-
try {
104-
if (is != null) {
105-
long size = file.length();
106-
throw new IOException("Error in reading " + this + " (actual file length " + size + ")",
107-
e);
108-
}
109-
} catch (IOException ignored) {
110-
// ignore
111-
} finally {
106+
String errorMessage = "Error in reading " + this;
107+
if (is != null) {
108+
long size = file.length();
109+
errorMessage = "Error in reading " + this + " (actual file length " + size + ")";
110+
}
111+
throw new IOException(errorMessage, e);
112+
} finally {
113+
if (shouldClose) {
112114
JavaUtils.closeQuietly(is);
113115
}
114-
throw new IOException("Error in opening " + this, e);
115-
} catch (RuntimeException e) {
116-
JavaUtils.closeQuietly(is);
117-
throw e;
118116
}
119117
}
120118

common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,14 @@ public TransportServer(
7070
this.appRpcHandler = appRpcHandler;
7171
this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));
7272

73+
boolean shouldClose = true;
7374
try {
7475
init(hostToBind, portToBind);
75-
} catch (RuntimeException e) {
76-
JavaUtils.closeQuietly(this);
77-
throw e;
76+
shouldClose = false;
77+
} finally {
78+
if (shouldClose) {
79+
JavaUtils.closeQuietly(this);
80+
}
7881
}
7982
}
8083

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,6 @@ private[spark] class PythonRDD(
4949
isFromBarrier: Boolean = false)
5050
extends RDD[Array[Byte]](parent) {
5151

52-
val bufferSize = conf.getInt("spark.buffer.size", 65536)
53-
val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true)
54-
5552
override def getPartitions: Array[Partition] = firstParent.partitions
5653

5754
override val partitioner: Option[Partitioner] = {
@@ -61,7 +58,7 @@ private[spark] class PythonRDD(
6158
val asJavaRDD: JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)
6259

6360
override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
64-
val runner = PythonRunner(func, bufferSize, reuseWorker)
61+
val runner = PythonRunner(func)
6562
runner.compute(firstParent.iterator(split, context), split.index, context)
6663
}
6764

core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import scala.collection.JavaConverters._
2727

2828
import org.apache.spark._
2929
import org.apache.spark.internal.Logging
30+
import org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY
3031
import org.apache.spark.security.SocketAuthHelper
3132
import org.apache.spark.util._
3233

@@ -62,14 +63,20 @@ private[spark] object PythonEvalType {
6263
*/
6364
private[spark] abstract class BasePythonRunner[IN, OUT](
6465
funcs: Seq[ChainedPythonFunctions],
65-
bufferSize: Int,
66-
reuseWorker: Boolean,
6766
evalType: Int,
6867
argOffsets: Array[Array[Int]])
6968
extends Logging {
7069

7170
require(funcs.length == argOffsets.length, "argOffsets should have the same length as funcs")
7271

72+
private val conf = SparkEnv.get.conf
73+
private val bufferSize = conf.getInt("spark.buffer.size", 65536)
74+
private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true)
75+
// each python worker gets an equal part of the allocation. the worker pool will grow to the
76+
// number of concurrent tasks, which is determined by the number of cores in this executor.
77+
private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY)
78+
.map(_ / conf.getInt("spark.executor.cores", 1))
79+
7380
// All the Python functions should have the same exec, version and envvars.
7481
protected val envVars = funcs.head.funcs.head.envVars
7582
protected val pythonExec = funcs.head.funcs.head.pythonExec
@@ -82,7 +89,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
8289
private[spark] var serverSocket: Option[ServerSocket] = None
8390

8491
// Authentication helper used when serving method calls via socket from Python side.
85-
private lazy val authHelper = new SocketAuthHelper(SparkEnv.get.conf)
92+
private lazy val authHelper = new SocketAuthHelper(conf)
8693

8794
def compute(
8895
inputIterator: Iterator[IN],
@@ -95,6 +102,9 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
95102
if (reuseWorker) {
96103
envVars.put("SPARK_REUSE_WORKER", "1")
97104
}
105+
if (memoryMb.isDefined) {
106+
envVars.put("PYSPARK_EXECUTOR_MEMORY_MB", memoryMb.get.toString)
107+
}
98108
val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap)
99109
// Whether is the worker released into idle pool
100110
val released = new AtomicBoolean(false)
@@ -485,20 +495,17 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
485495

486496
private[spark] object PythonRunner {
487497

488-
def apply(func: PythonFunction, bufferSize: Int, reuseWorker: Boolean): PythonRunner = {
489-
new PythonRunner(Seq(ChainedPythonFunctions(Seq(func))), bufferSize, reuseWorker)
498+
def apply(func: PythonFunction): PythonRunner = {
499+
new PythonRunner(Seq(ChainedPythonFunctions(Seq(func))))
490500
}
491501
}
492502

493503
/**
494504
* A helper class to run Python mapPartition in Spark.
495505
*/
496-
private[spark] class PythonRunner(
497-
funcs: Seq[ChainedPythonFunctions],
498-
bufferSize: Int,
499-
reuseWorker: Boolean)
506+
private[spark] class PythonRunner(funcs: Seq[ChainedPythonFunctions])
500507
extends BasePythonRunner[Array[Byte], Array[Byte]](
501-
funcs, bufferSize, reuseWorker, PythonEvalType.NON_UDF, Array(Array(0))) {
508+
funcs, PythonEvalType.NON_UDF, Array(Array(0))) {
502509

503510
protected override def newWriterThread(
504511
env: SparkEnv,

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,10 @@ package object config {
114114
.checkValue(_ >= 0, "The off-heap memory size must not be negative")
115115
.createWithDefault(0)
116116

117+
private[spark] val PYSPARK_EXECUTOR_MEMORY = ConfigBuilder("spark.executor.pyspark.memory")
118+
.bytesConf(ByteUnit.MiB)
119+
.createOptional
120+
117121
private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal()
118122
.booleanConf.createWithDefault(false)
119123

core/src/main/scala/org/apache/spark/security/SocketAuthHelper.scala

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -42,43 +42,59 @@ private[spark] class SocketAuthHelper(conf: SparkConf) {
4242
* Read the auth secret from the socket and compare to the expected value. Write the reply back
4343
* to the socket.
4444
*
45-
* If authentication fails, this method will close the socket.
45+
* If authentication fails or error is thrown, this method will close the socket.
4646
*
4747
* @param s The client socket.
4848
* @throws IllegalArgumentException If authentication fails.
4949
*/
5050
def authClient(s: Socket): Unit = {
51-
// Set the socket timeout while checking the auth secret. Reset it before returning.
52-
val currentTimeout = s.getSoTimeout()
51+
var shouldClose = true
5352
try {
54-
s.setSoTimeout(10000)
55-
val clientSecret = readUtf8(s)
56-
if (secret == clientSecret) {
57-
writeUtf8("ok", s)
58-
} else {
59-
writeUtf8("err", s)
60-
JavaUtils.closeQuietly(s)
53+
// Set the socket timeout while checking the auth secret. Reset it before returning.
54+
val currentTimeout = s.getSoTimeout()
55+
try {
56+
s.setSoTimeout(10000)
57+
val clientSecret = readUtf8(s)
58+
if (secret == clientSecret) {
59+
writeUtf8("ok", s)
60+
shouldClose = false
61+
} else {
62+
writeUtf8("err", s)
63+
throw new IllegalArgumentException("Authentication failed.")
64+
}
65+
} finally {
66+
s.setSoTimeout(currentTimeout)
6167
}
6268
} finally {
63-
s.setSoTimeout(currentTimeout)
69+
if (shouldClose) {
70+
JavaUtils.closeQuietly(s)
71+
}
6472
}
6573
}
6674

6775
/**
6876
* Authenticate with a server by writing the auth secret and checking the server's reply.
6977
*
70-
* If authentication fails, this method will close the socket.
78+
* If authentication fails or error is thrown, this method will close the socket.
7179
*
7280
* @param s The socket connected to the server.
7381
* @throws IllegalArgumentException If authentication fails.
7482
*/
7583
def authToServer(s: Socket): Unit = {
76-
writeUtf8(secret, s)
84+
var shouldClose = true
85+
try {
86+
writeUtf8(secret, s)
7787

78-
val reply = readUtf8(s)
79-
if (reply != "ok") {
80-
JavaUtils.closeQuietly(s)
81-
throw new IllegalArgumentException("Authentication failed.")
88+
val reply = readUtf8(s)
89+
if (reply != "ok") {
90+
throw new IllegalArgumentException("Authentication failed.")
91+
} else {
92+
shouldClose = false
93+
}
94+
} finally {
95+
if (shouldClose) {
96+
JavaUtils.closeQuietly(s)
97+
}
8298
}
8399
}
84100

core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,4 +194,50 @@ class OpenHashMapSuite extends SparkFunSuite with Matchers {
194194
val numInvalidValues = map.iterator.count(_._2 == 0)
195195
assertResult(0)(numInvalidValues)
196196
}
197+
198+
test("distinguish between the 0/0.0/0L and null") {
199+
val specializedMap1 = new OpenHashMap[String, Long]
200+
specializedMap1("a") = null.asInstanceOf[Long]
201+
specializedMap1("b") = 0L
202+
assert(specializedMap1.contains("a"))
203+
assert(!specializedMap1.contains("c"))
204+
// null.asInstance[Long] will return 0L
205+
assert(specializedMap1("a") === 0L)
206+
assert(specializedMap1("b") === 0L)
207+
// If the data type is in @specialized annotation, and
208+
// the `key` is not be contained, the `map(key)` will return 0
209+
assert(specializedMap1("c") === 0L)
210+
211+
val specializedMap2 = new OpenHashMap[String, Double]
212+
specializedMap2("a") = null.asInstanceOf[Double]
213+
specializedMap2("b") = 0.toDouble
214+
assert(specializedMap2.contains("a"))
215+
assert(!specializedMap2.contains("c"))
216+
// null.asInstance[Double] will return 0.0
217+
assert(specializedMap2("a") === 0.0)
218+
assert(specializedMap2("b") === 0.0)
219+
assert(specializedMap2("c") === 0.0)
220+
221+
val map1 = new OpenHashMap[String, Short]
222+
map1("a") = null.asInstanceOf[Short]
223+
map1("b") = 0.toShort
224+
assert(map1.contains("a"))
225+
assert(!map1.contains("c"))
226+
// null.asInstance[Short] will return 0
227+
assert(map1("a") === 0)
228+
assert(map1("b") === 0)
229+
// If the data type is not in @specialized annotation, and
230+
// the `key` is not be contained, the `map(key)` will return null
231+
assert(map1("c") === null)
232+
233+
val map2 = new OpenHashMap[String, Float]
234+
map2("a") = null.asInstanceOf[Float]
235+
map2("b") = 0.toFloat
236+
assert(map2.contains("a"))
237+
assert(!map2.contains("c"))
238+
// null.asInstance[Float] will return 0.0
239+
assert(map2("a") === 0.0)
240+
assert(map2("b") === 0.0)
241+
assert(map2("c") === null)
242+
}
197243
}

docs/configuration.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,18 @@ of the most common options to set are:
179179
(e.g. <code>2g</code>, <code>8g</code>).
180180
</td>
181181
</tr>
182+
<tr>
183+
<td><code>spark.executor.pyspark.memory</code></td>
184+
<td>Not set</td>
185+
<td>
186+
The amount of memory to be allocated to PySpark in each executor, in MiB
187+
unless otherwise specified. If set, PySpark memory for an executor will be
188+
limited to this amount. If not set, Spark will not limit Python's memory use
189+
and it is up to the application to avoid exceeding the overhead memory space
190+
shared with other non-JVM processes. When PySpark is run in YARN, this memory
191+
is added to executor resource requests.
192+
</td>
193+
</tr>
182194
<tr>
183195
<td><code>spark.executor.memoryOverhead</code></td>
184196
<td>executorMemory * 0.10, with minimum of 384 </td>

docs/running-on-kubernetes.md

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,36 @@ To use a secret through an environment variable use the following options to the
185185
--conf spark.kubernetes.executor.secretKeyRef.ENV_NAME=name:key
186186
```
187187

188+
## Using Kubernetes Volumes
189+
190+
Starting with Spark 2.4.0, users can mount the following types of Kubernetes [volumes](https://kubernetes.io/docs/concepts/storage/volumes/) into the driver and executor pods:
191+
* [hostPath](https://kubernetes.io/docs/concepts/storage/volumes/#hostpath): mounts a file or directory from the host node’s filesystem into a pod.
192+
* [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir): an initially empty volume created when a pod is assigned to a node.
193+
* [persistentVolumeClaim](https://kubernetes.io/docs/concepts/storage/volumes/#persistentvolumeclaim): used to mount a `PersistentVolume` into a pod.
194+
195+
To mount a volume of any of the types above into the driver pod, use the following configuration property:
196+
197+
```
198+
--conf spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.path=<mount path>
199+
--conf spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.readOnly=<true|false>
200+
```
201+
202+
Specifically, `VolumeType` can be one of the following values: `hostPath`, `emptyDir`, and `persistentVolumeClaim`. `VolumeName` is the name you want to use for the volume under the `volumes` field in the pod specification.
203+
204+
Each supported type of volumes may have some specific configuration options, which can be specified using configuration properties of the following form:
205+
206+
```
207+
spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].options.[OptionName]=<value>
208+
```
209+
210+
For example, the claim name of a `persistentVolumeClaim` with volume name `checkpointpvc` can be specified using the following property:
211+
212+
```
213+
spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.options.claimName=check-point-pvc-claim
214+
```
215+
216+
The configuration properties for mounting volumes into the executor pods use prefix `spark.kubernetes.executor.` instead of `spark.kubernetes.driver.`. For a complete list of available options for each supported type of volumes, please refer to the [Spark Properties](#spark-properties) section below.
217+
188218
## Introspection and Debugging
189219

190220
These are the different ways in which you can investigate a running/completed Spark application, monitor progress, and
@@ -299,21 +329,15 @@ RBAC authorization and how to configure Kubernetes service accounts for pods, pl
299329

300330
## Future Work
301331

302-
There are several Spark on Kubernetes features that are currently being incubated in a fork -
303-
[apache-spark-on-k8s/spark](https://github.com/apache-spark-on-k8s/spark), which are expected to eventually make it into
304-
future versions of the spark-kubernetes integration.
332+
There are several Spark on Kubernetes features that are currently being worked on or planned to be worked on. Those features are expected to eventually make it into future versions of the spark-kubernetes integration.
305333

306334
Some of these include:
307335

308-
* R
309-
* Dynamic Executor Scaling
336+
* Dynamic Resource Allocation and External Shuffle Service
310337
* Local File Dependency Management
311338
* Spark Application Management
312339
* Job Queues and Resource Management
313340

314-
You can refer to the [documentation](https://apache-spark-on-k8s.github.io/userdocs/) if you want to try these features
315-
and provide feedback to the development team.
316-
317341
# Configuration
318342

319343
See the [configuration page](configuration.html) for information on Spark configurations. The following configurations are

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ class KafkaContinuousPartitionReader(
227227

228228
// This is a failOnDataLoss exception. Retry if nextKafkaOffset is within the data range,
229229
// or if it's the endpoint of the data range (i.e. the "true" next offset).
230-
case e: IllegalStateException if e.getCause.isInstanceOf[OffsetOutOfRangeException] =>
230+
case e: IllegalStateException if e.getCause.isInstanceOf[OffsetOutOfRangeException] =>
231231
val range = consumer.getAvailableOffsetRange()
232232
if (range.latest >= nextKafkaOffset && range.earliest <= nextKafkaOffset) {
233233
// retry

0 commit comments

Comments
 (0)