Skip to content

Commit ac663aa

Browse files
committed
Merge branch 'master' into spark-34152
2 parents 0b75fdc + 50d14c9 commit ac663aa

File tree

52 files changed

+885
-417
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+885
-417
lines changed

core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ <h4 class="title-table">Executors</h4>
104104
Peak Pool Memory Direct / Mapped</span></th>
105105
<th>Disk Used</th>
106106
<th>Cores</th>
107-
<th>Resources</th>
107+
<th><span data-toggle="tooltip" data-placement="top" title="Resources currently used by each executor, such as GPU, FPGA, etc.">Resources</span></th>
108108
<th>Resource Profile Id</th>
109109
<th><span data-toggle="tooltip" data-placement="top" title="Number of tasks currently executing. Darker shading highlights executors with more active tasks.">Active Tasks</span></th>
110110
<th><span data-toggle="tooltip" data-placement="top" title="Number of tasks that have failed on this executor. Darker shading highlights executors with a high proportion of failed tasks.">Failed Tasks</span></th>

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,14 +128,16 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
128128

129129
// Messages received from executors
130130
case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId, executorUpdates) =>
131+
var reregisterBlockManager = !sc.isStopped
131132
if (scheduler != null) {
132133
if (executorLastSeen.contains(executorId)) {
133134
executorLastSeen(executorId) = clock.getTimeMillis()
134135
eventLoopThread.submit(new Runnable {
135136
override def run(): Unit = Utils.tryLogNonFatalError {
136137
val unknownExecutor = !scheduler.executorHeartbeatReceived(
137138
executorId, accumUpdates, blockManagerId, executorUpdates)
138-
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
139+
reregisterBlockManager &= unknownExecutor
140+
val response = HeartbeatResponse(reregisterBlockManager)
139141
context.reply(response)
140142
}
141143
})
@@ -145,14 +147,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
145147
// not log warning here. Otherwise there may be a lot of noise especially if
146148
// we explicitly remove executors (SPARK-4134).
147149
logDebug(s"Received heartbeat from unknown executor $executorId")
148-
context.reply(HeartbeatResponse(reregisterBlockManager = true))
150+
context.reply(HeartbeatResponse(reregisterBlockManager))
149151
}
150152
} else {
151153
// Because Executor will sleep several seconds before sending the first "Heartbeat", this
152154
// case rarely happens. However, if it really happens, log it and ask the executor to
153155
// register itself again.
154156
logWarning(s"Dropping $heartbeat because TaskScheduler is not ready yet")
155-
context.reply(HeartbeatResponse(reregisterBlockManager = true))
157+
context.reply(HeartbeatResponse(reregisterBlockManager))
156158
}
157159
}
158160

core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,24 @@ class HeartbeatReceiverSuite
219219
fakeSchedulerBackend.stop()
220220
}
221221

222+
test("SPARK-34273: Do not reregister BlockManager when SparkContext is stopped") {
223+
val blockManagerId = BlockManagerId(executorId1, "localhost", 12345)
224+
225+
heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
226+
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
227+
Heartbeat(executorId1, Array.empty, blockManagerId, mutable.Map.empty))
228+
assert(response.reregisterBlockManager)
229+
230+
try {
231+
sc.stopped.set(true)
232+
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
233+
Heartbeat(executorId1, Array.empty, blockManagerId, mutable.Map.empty))
234+
assert(!response.reregisterBlockManager)
235+
} finally {
236+
sc.stopped.set(false)
237+
}
238+
}
239+
222240
/** Manually send a heartbeat and return the response. */
223241
private def triggerHeartbeat(
224242
executorId: String,

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1106,8 +1106,8 @@ class SparkSubmitSuite
11061106

11071107
// The path and filename are preserved.
11081108
assert(outputUri.getPath.endsWith(new Path(sourceUri).getName))
1109-
assert(FileUtils.readFileToString(new File(outputUri.getPath)) ===
1110-
FileUtils.readFileToString(new File(sourceUri.getPath)))
1109+
assert(FileUtils.readFileToString(new File(outputUri.getPath), StandardCharsets.UTF_8) ===
1110+
FileUtils.readFileToString(new File(sourceUri.getPath), StandardCharsets.UTF_8))
11111111
}
11121112

11131113
private def deleteTempOutputFile(outputPath: String): Unit = {
@@ -1149,7 +1149,7 @@ class SparkSubmitSuite
11491149
val jarFile = File.createTempFile("test", ".jar")
11501150
jarFile.deleteOnExit()
11511151
val content = "hello, world"
1152-
FileUtils.write(jarFile, content)
1152+
FileUtils.write(jarFile, content, StandardCharsets.UTF_8)
11531153
val hadoopConf = new Configuration()
11541154
val tmpDir = Files.createTempDirectory("tmp").toFile
11551155
updateConfWithFakeS3Fs(hadoopConf)
@@ -1164,7 +1164,7 @@ class SparkSubmitSuite
11641164
val jarFile = File.createTempFile("test", ".jar")
11651165
jarFile.deleteOnExit()
11661166
val content = "hello, world"
1167-
FileUtils.write(jarFile, content)
1167+
FileUtils.write(jarFile, content, StandardCharsets.UTF_8)
11681168
val hadoopConf = new Configuration()
11691169
val tmpDir = Files.createTempDirectory("tmp").toFile
11701170
updateConfWithFakeS3Fs(hadoopConf)

core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
199199
errOpt should be (None)
200200

201201
val exp = IOUtils.toString(new FileInputStream(
202-
new File(expRoot, HistoryServerSuite.sanitizePath(name) + "_expectation.json")))
202+
new File(expRoot, HistoryServerSuite.sanitizePath(name) + "_expectation.json")),
203+
StandardCharsets.UTF_8)
203204
// compare the ASTs so formatting differences don't cause failures
204205
import org.json4s._
205206
import org.json4s.jackson.JsonMethods._
@@ -713,7 +714,7 @@ object HistoryServerSuite {
713714

714715
def getContentAndCode(url: URL): (Int, Option[String], Option[String]) = {
715716
val (code, in, errString) = connectAndGetInputStream(url)
716-
val inString = in.map(IOUtils.toString)
717+
val inString = in.map(IOUtils.toString(_, StandardCharsets.UTF_8))
717718
(code, inString, errString)
718719
}
719720

@@ -729,7 +730,7 @@ object HistoryServerSuite {
729730
}
730731
val errString = try {
731732
val err = Option(connection.getErrorStream())
732-
err.map(IOUtils.toString)
733+
err.map(IOUtils.toString(_, StandardCharsets.UTF_8))
733734
} catch {
734735
case io: IOException => None
735736
}

dev/deps/spark-deps-hadoop-2.7-hive-2.3

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -202,12 +202,12 @@ orc-shims/1.6.7//orc-shims-1.6.7.jar
202202
oro/2.0.8//oro-2.0.8.jar
203203
osgi-resource-locator/1.0.3//osgi-resource-locator-1.0.3.jar
204204
paranamer/2.8//paranamer-2.8.jar
205-
parquet-column/1.10.1//parquet-column-1.10.1.jar
206-
parquet-common/1.10.1//parquet-common-1.10.1.jar
207-
parquet-encoding/1.10.1//parquet-encoding-1.10.1.jar
208-
parquet-format/2.4.0//parquet-format-2.4.0.jar
209-
parquet-hadoop/1.10.1//parquet-hadoop-1.10.1.jar
210-
parquet-jackson/1.10.1//parquet-jackson-1.10.1.jar
205+
parquet-column/1.11.1//parquet-column-1.11.1.jar
206+
parquet-common/1.11.1//parquet-common-1.11.1.jar
207+
parquet-encoding/1.11.1//parquet-encoding-1.11.1.jar
208+
parquet-format-structures/1.11.1//parquet-format-structures-1.11.1.jar
209+
parquet-hadoop/1.11.1//parquet-hadoop-1.11.1.jar
210+
parquet-jackson/1.11.1//parquet-jackson-1.11.1.jar
211211
protobuf-java/2.5.0//protobuf-java-2.5.0.jar
212212
py4j/0.10.9.1//py4j-0.10.9.1.jar
213213
pyrolite/4.30//pyrolite-4.30.jar

dev/deps/spark-deps-hadoop-3.2-hive-2.3

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -172,12 +172,12 @@ orc-shims/1.6.7//orc-shims-1.6.7.jar
172172
oro/2.0.8//oro-2.0.8.jar
173173
osgi-resource-locator/1.0.3//osgi-resource-locator-1.0.3.jar
174174
paranamer/2.8//paranamer-2.8.jar
175-
parquet-column/1.10.1//parquet-column-1.10.1.jar
176-
parquet-common/1.10.1//parquet-common-1.10.1.jar
177-
parquet-encoding/1.10.1//parquet-encoding-1.10.1.jar
178-
parquet-format/2.4.0//parquet-format-2.4.0.jar
179-
parquet-hadoop/1.10.1//parquet-hadoop-1.10.1.jar
180-
parquet-jackson/1.10.1//parquet-jackson-1.10.1.jar
175+
parquet-column/1.11.1//parquet-column-1.11.1.jar
176+
parquet-common/1.11.1//parquet-common-1.11.1.jar
177+
parquet-encoding/1.11.1//parquet-encoding-1.11.1.jar
178+
parquet-format-structures/1.11.1//parquet-format-structures-1.11.1.jar
179+
parquet-hadoop/1.11.1//parquet-hadoop-1.11.1.jar
180+
parquet-jackson/1.11.1//parquet-jackson-1.11.1.jar
181181
protobuf-java/2.5.0//protobuf-java-2.5.0.jar
182182
py4j/0.10.9.1//py4j-0.10.9.1.jar
183183
pyrolite/4.30//pyrolite-4.30.jar

docs/running-on-kubernetes.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,9 @@ for any reason, these pods will remain in the cluster. The executor processes sh
193193
driver, so the executor pods should not consume compute resources (cpu and memory) in the cluster after your application
194194
exits.
195195

196+
You may use `spark.kubernetes.executor.podNamePrefix` to fully control the executor pod names.
197+
When this property is set, it's highly recommended to make it unique across all jobs in the same namespace.
198+
196199
### Authentication Parameters
197200

198201
Use the exact prefix `spark.kubernetes.authenticate` for Kubernetes authentication parameters in client mode.
@@ -873,6 +876,14 @@ See the [configuration page](configuration.html) for information on Spark config
873876
</td>
874877
<td>2.3.0</td>
875878
</tr>
879+
<tr>
880+
<td><code>spark.kubernetes.executor.podNamePrefix</code></td>
881+
<td>(none)</td>
882+
<td>
883+
Prefix to use in front of the executor pod names.
884+
</td>
885+
<td>2.3.0</td>
886+
</tr>
876887
<tr>
877888
<td><code>spark.kubernetes.executor.lostCheck.maxAttempts</code></td>
878889
<td><code>10</code></td>

pom.xml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@
136136
<kafka.version>2.6.0</kafka.version>
137137
<!-- After 10.15.1.3, the minimum required version is JDK9 -->
138138
<derby.version>10.14.2.0</derby.version>
139-
<parquet.version>1.10.1</parquet.version>
139+
<parquet.version>1.11.1</parquet.version>
140140
<orc.version>1.6.7</orc.version>
141141
<jetty.version>9.4.34.v20201102</jetty.version>
142142
<jakartaservlet.version>4.0.3</jakartaservlet.version>
@@ -2290,6 +2290,10 @@
22902290
<groupId>commons-pool</groupId>
22912291
<artifactId>commons-pool</artifactId>
22922292
</exclusion>
2293+
<exclusion>
2294+
<groupId>javax.annotation</groupId>
2295+
<artifactId>javax.annotation-api</artifactId>
2296+
</exclusion>
22932297
</exclusions>
22942298
</dependency>
22952299
<dependency>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,6 @@ private[spark] object Config extends Logging {
224224
ConfigBuilder("spark.kubernetes.executor.podNamePrefix")
225225
.doc("Prefix to use in front of the executor pod names.")
226226
.version("2.3.0")
227-
.internal()
228227
.stringConf
229228
.createOptional
230229

0 commit comments

Comments
 (0)