Skip to content

Commit 207933b

Browse files
committed
Merge remote-tracking branch 'apache/master' into SPARK-23581
# Conflicts: # sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
2 parents 8ed0695 + 4c587eb commit 207933b

File tree

45 files changed

+647
-699
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+647
-699
lines changed

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 0 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -256,51 +256,6 @@ private[spark] class SecurityManager(
256256
// the default SSL configuration - it will be used by all communication layers unless overwritten
257257
private val defaultSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl", defaults = None)
258258

259-
// SSL configuration for the file server. This is used by Utils.setupSecureURLConnection().
260-
val fileServerSSLOptions = getSSLOptions("fs")
261-
val (sslSocketFactory, hostnameVerifier) = if (fileServerSSLOptions.enabled) {
262-
val trustStoreManagers =
263-
for (trustStore <- fileServerSSLOptions.trustStore) yield {
264-
val input = Files.asByteSource(fileServerSSLOptions.trustStore.get).openStream()
265-
266-
try {
267-
val ks = KeyStore.getInstance(KeyStore.getDefaultType)
268-
ks.load(input, fileServerSSLOptions.trustStorePassword.get.toCharArray)
269-
270-
val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
271-
tmf.init(ks)
272-
tmf.getTrustManagers
273-
} finally {
274-
input.close()
275-
}
276-
}
277-
278-
lazy val credulousTrustStoreManagers = Array({
279-
logWarning("Using 'accept-all' trust manager for SSL connections.")
280-
new X509TrustManager {
281-
override def getAcceptedIssuers: Array[X509Certificate] = null
282-
283-
override def checkClientTrusted(x509Certificates: Array[X509Certificate], s: String) {}
284-
285-
override def checkServerTrusted(x509Certificates: Array[X509Certificate], s: String) {}
286-
}: TrustManager
287-
})
288-
289-
require(fileServerSSLOptions.protocol.isDefined,
290-
"spark.ssl.protocol is required when enabling SSL connections.")
291-
292-
val sslContext = SSLContext.getInstance(fileServerSSLOptions.protocol.get)
293-
sslContext.init(null, trustStoreManagers.getOrElse(credulousTrustStoreManagers), null)
294-
295-
val hostVerifier = new HostnameVerifier {
296-
override def verify(s: String, sslSession: SSLSession): Boolean = true
297-
}
298-
299-
(Some(sslContext.getSocketFactory), Some(hostVerifier))
300-
} else {
301-
(None, None)
302-
}
303-
304259
def getSSLOptions(module: String): SSLOptions = {
305260
val opts = SSLOptions.parse(sparkConf, s"spark.ssl.$module", Some(defaultSSLOptions))
306261
logDebug(s"Created SSL options for $module: $opts")

core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ private[spark] class BlockStoreShuffleReader[K, C](
9494
}
9595

9696
// Sort the output if there is a sort ordering defined.
97-
dep.keyOrdering match {
97+
val resultIter = dep.keyOrdering match {
9898
case Some(keyOrd: Ordering[K]) =>
9999
// Create an ExternalSorter to sort the data.
100100
val sorter =
@@ -103,9 +103,16 @@ private[spark] class BlockStoreShuffleReader[K, C](
103103
context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
104104
context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
105105
context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
106+
// Use completion callback to stop sorter if task was finished/cancelled.
107+
context.addTaskCompletionListener(_ => {
108+
sorter.stop()
109+
})
106110
CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
107111
case None =>
108112
aggregatedIter
109113
}
114+
// Use another interruptible iterator here to support task cancellation as aggregator or(and)
115+
// sorter may have consumed previous interruptible iterator.
116+
new InterruptibleIterator[Product2[K, C]](context, resultIter)
110117
}
111118
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -673,7 +673,6 @@ private[spark] object Utils extends Logging {
673673
logDebug("fetchFile not using security")
674674
uc = new URL(url).openConnection()
675675
}
676-
Utils.setupSecureURLConnection(uc, securityMgr)
677676

678677
val timeoutMs =
679678
conf.getTimeAsSeconds("spark.files.fetchTimeout", "60s").toInt * 1000
@@ -2363,20 +2362,6 @@ private[spark] object Utils extends Logging {
23632362
PropertyConfigurator.configure(pro)
23642363
}
23652364

2366-
/**
2367-
* If the given URL connection is HttpsURLConnection, it sets the SSL socket factory and
2368-
* the host verifier from the given security manager.
2369-
*/
2370-
def setupSecureURLConnection(urlConnection: URLConnection, sm: SecurityManager): URLConnection = {
2371-
urlConnection match {
2372-
case https: HttpsURLConnection =>
2373-
sm.sslSocketFactory.foreach(https.setSSLSocketFactory)
2374-
sm.hostnameVerifier.foreach(https.setHostnameVerifier)
2375-
https
2376-
case connection => connection
2377-
}
2378-
}
2379-
23802365
def invoke(
23812366
clazz: Class[_],
23822367
obj: AnyRef,

core/src/test/scala/org/apache/spark/JobCancellationSuite.scala

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark
1919

2020
import java.util.concurrent.Semaphore
21+
import java.util.concurrent.atomic.AtomicInteger
2122

2223
import scala.concurrent.ExecutionContext.Implicits.global
2324
import scala.concurrent.Future
@@ -26,7 +27,7 @@ import scala.concurrent.duration._
2627
import org.scalatest.BeforeAndAfter
2728
import org.scalatest.Matchers
2829

29-
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart}
30+
import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted, SparkListenerTaskEnd, SparkListenerTaskStart}
3031
import org.apache.spark.util.ThreadUtils
3132

3233
/**
@@ -40,6 +41,10 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
4041
override def afterEach() {
4142
try {
4243
resetSparkContext()
44+
JobCancellationSuite.taskStartedSemaphore.drainPermits()
45+
JobCancellationSuite.taskCancelledSemaphore.drainPermits()
46+
JobCancellationSuite.twoJobsSharingStageSemaphore.drainPermits()
47+
JobCancellationSuite.executionOfInterruptibleCounter.set(0)
4348
} finally {
4449
super.afterEach()
4550
}
@@ -320,6 +325,62 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
320325
f2.get()
321326
}
322327

328+
test("interruptible iterator of shuffle reader") {
329+
// In this test case, we create a Spark job of two stages. The second stage is cancelled during
330+
// execution and a counter is used to make sure that the corresponding tasks are indeed
331+
// cancelled.
332+
import JobCancellationSuite._
333+
sc = new SparkContext("local[2]", "test interruptible iterator")
334+
335+
val taskCompletedSem = new Semaphore(0)
336+
337+
sc.addSparkListener(new SparkListener {
338+
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
339+
// release taskCancelledSemaphore when cancelTasks event has been posted
340+
if (stageCompleted.stageInfo.stageId == 1) {
341+
taskCancelledSemaphore.release(1000)
342+
}
343+
}
344+
345+
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
346+
if (taskEnd.stageId == 1) { // make sure tasks are completed
347+
taskCompletedSem.release()
348+
}
349+
}
350+
})
351+
352+
val f = sc.parallelize(1 to 1000).map { i => (i, i) }
353+
.repartitionAndSortWithinPartitions(new HashPartitioner(1))
354+
.mapPartitions { iter =>
355+
taskStartedSemaphore.release()
356+
iter
357+
}.foreachAsync { x =>
358+
if (x._1 >= 10) {
359+
// This block of code is partially executed. It will be blocked when x._1 >= 10 and the
360+
// next iteration will be cancelled if the source iterator is interruptible. Then in this
361+
// case, the maximum num of increment would be 10(|1...10|)
362+
taskCancelledSemaphore.acquire()
363+
}
364+
executionOfInterruptibleCounter.getAndIncrement()
365+
}
366+
367+
taskStartedSemaphore.acquire()
368+
// Job is cancelled when:
369+
// 1. task in reduce stage has been started, guaranteed by previous line.
370+
// 2. task in reduce stage is blocked after processing at most 10 records as
371+
// taskCancelledSemaphore is not released until cancelTasks event is posted
372+
// After job being cancelled, task in reduce stage will be cancelled and no more iteration are
373+
// executed.
374+
f.cancel()
375+
376+
val e = intercept[SparkException](f.get()).getCause
377+
assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
378+
379+
// Make sure tasks are indeed completed.
380+
taskCompletedSem.acquire()
381+
assert(executionOfInterruptibleCounter.get() <= 10)
382+
}
383+
323384
def testCount() {
324385
// Cancel before launching any tasks
325386
{
@@ -381,7 +442,9 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
381442

382443

383444
object JobCancellationSuite {
445+
// To avoid any headaches, reset these global variables in the companion class's afterEach block
384446
val taskStartedSemaphore = new Semaphore(0)
385447
val taskCancelledSemaphore = new Semaphore(0)
386448
val twoJobsSharingStageSemaphore = new Semaphore(0)
449+
val executionOfInterruptibleCounter = new AtomicInteger(0)
387450
}

core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala

Lines changed: 0 additions & 68 deletions
This file was deleted.

core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala

Lines changed: 0 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -370,51 +370,6 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
370370
assert(securityManager.checkModifyPermissions("user1") === false)
371371
}
372372

373-
test("ssl on setup") {
374-
val conf = SSLSampleConfigs.sparkSSLConfig()
375-
val expectedAlgorithms = Set(
376-
"TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384",
377-
"TLS_RSA_WITH_AES_256_CBC_SHA256",
378-
"TLS_DHE_RSA_WITH_AES_256_CBC_SHA256",
379-
"TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256",
380-
"TLS_DHE_RSA_WITH_AES_128_CBC_SHA256",
381-
"SSL_ECDHE_RSA_WITH_AES_256_CBC_SHA384",
382-
"SSL_RSA_WITH_AES_256_CBC_SHA256",
383-
"SSL_DHE_RSA_WITH_AES_256_CBC_SHA256",
384-
"SSL_ECDHE_RSA_WITH_AES_128_CBC_SHA256",
385-
"SSL_DHE_RSA_WITH_AES_128_CBC_SHA256")
386-
387-
val securityManager = new SecurityManager(conf)
388-
389-
assert(securityManager.fileServerSSLOptions.enabled === true)
390-
391-
assert(securityManager.sslSocketFactory.isDefined === true)
392-
assert(securityManager.hostnameVerifier.isDefined === true)
393-
394-
assert(securityManager.fileServerSSLOptions.trustStore.isDefined === true)
395-
assert(securityManager.fileServerSSLOptions.trustStore.get.getName === "truststore")
396-
assert(securityManager.fileServerSSLOptions.keyStore.isDefined === true)
397-
assert(securityManager.fileServerSSLOptions.keyStore.get.getName === "keystore")
398-
assert(securityManager.fileServerSSLOptions.trustStorePassword === Some("password"))
399-
assert(securityManager.fileServerSSLOptions.keyStorePassword === Some("password"))
400-
assert(securityManager.fileServerSSLOptions.keyPassword === Some("password"))
401-
assert(securityManager.fileServerSSLOptions.protocol === Some("TLSv1.2"))
402-
assert(securityManager.fileServerSSLOptions.enabledAlgorithms === expectedAlgorithms)
403-
}
404-
405-
test("ssl off setup") {
406-
val file = File.createTempFile("SSLOptionsSuite", "conf", Utils.createTempDir())
407-
408-
System.setProperty("spark.ssl.configFile", file.getAbsolutePath)
409-
val conf = new SparkConf()
410-
411-
val securityManager = new SecurityManager(conf)
412-
413-
assert(securityManager.fileServerSSLOptions.enabled === false)
414-
assert(securityManager.sslSocketFactory.isDefined === false)
415-
assert(securityManager.hostnameVerifier.isDefined === false)
416-
}
417-
418373
test("missing secret authentication key") {
419374
val conf = new SparkConf().set("spark.authenticate", "true")
420375
val mgr = new SecurityManager(conf)

dev/create-release/release-build.sh

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,6 @@ if [[ "$1" == "package" ]]; then
164164
tar cvzf spark-$SPARK_VERSION.tgz spark-$SPARK_VERSION
165165
echo $GPG_PASSPHRASE | $GPG --passphrase-fd 0 --armour --output spark-$SPARK_VERSION.tgz.asc \
166166
--detach-sig spark-$SPARK_VERSION.tgz
167-
echo $GPG_PASSPHRASE | $GPG --passphrase-fd 0 --print-md MD5 spark-$SPARK_VERSION.tgz > \
168-
spark-$SPARK_VERSION.tgz.md5
169167
echo $GPG_PASSPHRASE | $GPG --passphrase-fd 0 --print-md \
170168
SHA512 spark-$SPARK_VERSION.tgz > spark-$SPARK_VERSION.tgz.sha512
171169
rm -rf spark-$SPARK_VERSION
@@ -215,9 +213,6 @@ if [[ "$1" == "package" ]]; then
215213
echo $GPG_PASSPHRASE | $GPG --passphrase-fd 0 --armour \
216214
--output $R_DIST_NAME.asc \
217215
--detach-sig $R_DIST_NAME
218-
echo $GPG_PASSPHRASE | $GPG --passphrase-fd 0 --print-md \
219-
MD5 $R_DIST_NAME > \
220-
$R_DIST_NAME.md5
221216
echo $GPG_PASSPHRASE | $GPG --passphrase-fd 0 --print-md \
222217
SHA512 $R_DIST_NAME > \
223218
$R_DIST_NAME.sha512
@@ -234,9 +229,6 @@ if [[ "$1" == "package" ]]; then
234229
echo $GPG_PASSPHRASE | $GPG --passphrase-fd 0 --armour \
235230
--output $PYTHON_DIST_NAME.asc \
236231
--detach-sig $PYTHON_DIST_NAME
237-
echo $GPG_PASSPHRASE | $GPG --passphrase-fd 0 --print-md \
238-
MD5 $PYTHON_DIST_NAME > \
239-
$PYTHON_DIST_NAME.md5
240232
echo $GPG_PASSPHRASE | $GPG --passphrase-fd 0 --print-md \
241233
SHA512 $PYTHON_DIST_NAME > \
242234
$PYTHON_DIST_NAME.sha512
@@ -247,9 +239,6 @@ if [[ "$1" == "package" ]]; then
247239
echo $GPG_PASSPHRASE | $GPG --passphrase-fd 0 --armour \
248240
--output spark-$SPARK_VERSION-bin-$NAME.tgz.asc \
249241
--detach-sig spark-$SPARK_VERSION-bin-$NAME.tgz
250-
echo $GPG_PASSPHRASE | $GPG --passphrase-fd 0 --print-md \
251-
MD5 spark-$SPARK_VERSION-bin-$NAME.tgz > \
252-
spark-$SPARK_VERSION-bin-$NAME.tgz.md5
253242
echo $GPG_PASSPHRASE | $GPG --passphrase-fd 0 --print-md \
254243
SHA512 spark-$SPARK_VERSION-bin-$NAME.tgz > \
255244
spark-$SPARK_VERSION-bin-$NAME.tgz.sha512
@@ -382,18 +371,11 @@ if [[ "$1" == "publish-release" ]]; then
382371
find . -type f |grep -v \.jar |grep -v \.pom | xargs rm
383372

384373
echo "Creating hash and signature files"
385-
# this must have .asc, .md5 and .sha1 - it really doesn't like anything else there
374+
# this must have .asc and .sha1 - it really doesn't like anything else there
386375
for file in $(find . -type f)
387376
do
388377
echo $GPG_PASSPHRASE | $GPG --passphrase-fd 0 --output $file.asc \
389378
--detach-sig --armour $file;
390-
if [ $(command -v md5) ]; then
391-
# Available on OS X; -q to keep only hash
392-
md5 -q $file > $file.md5
393-
else
394-
# Available on Linux; cut to keep only hash
395-
md5sum $file | cut -f1 -d' ' > $file.md5
396-
fi
397379
sha1sum $file | cut -f1 -d' ' > $file.sha1
398380
done
399381

docs/security.md

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,6 @@ component-specific configuration namespaces used to override the default setting
4444
<th>Config Namespace</th>
4545
<th>Component</th>
4646
</tr>
47-
<tr>
48-
<td><code>spark.ssl.fs</code></td>
49-
<td>File download client (used to download jars and files from HTTPS-enabled servers).</td>
50-
</tr>
5147
<tr>
5248
<td><code>spark.ssl.ui</code></td>
5349
<td>Spark application Web UI</td>

0 commit comments

Comments
 (0)