-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-9104][SPARK-9105][SPARK-9106][SPARK-9107][CORE] Netty network layer memory usage on webUI #7753
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-9104][SPARK-9105][SPARK-9106][SPARK-9107][CORE] Netty network layer memory usage on webUI #7753
Changes from all commits
39ba441
ecc1044
2101538
9ccaf88
13c17fb
c9b44b1
984feaf
2501c82
e0ae855
7491279
424c172
f21a804
2f3d30b
7b846a2
41874aa
0531d0f
27b7da1
a8fcf74
f2f0e64
5f7a999
5ad7a6a
c836fb9
b5aa4da
e8e2bdd
1dffa29
75e63c3
0c1241c
c78628e
a93bd96
89214f3
1ed48c1
cb307aa
b438077
4123ac7
2ce9fd9
17d094e
4b3dbe4
0ea7cab
5e031ce
87f8172
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -86,6 +86,12 @@ private[spark] class Executor( | |
| env.blockManager.initialize(conf.getAppId) | ||
| } | ||
|
|
||
| private val executorMetrics: ExecutorMetrics = new ExecutorMetrics | ||
| executorMetrics.setHostname(Utils.localHostName) | ||
| if (env.rpcEnv.address != null) { | ||
| executorMetrics.setPort(Some(env.rpcEnv.address.port)) | ||
| } | ||
|
|
||
| // Whether to load classes in user jars before those in Spark jars | ||
| private val userClassPathFirst = conf.getBoolean("spark.executor.userClassPathFirst", false) | ||
|
|
||
|
|
@@ -431,7 +437,7 @@ private[spark] class Executor( | |
| metrics.updateAccumulators() | ||
|
|
||
| if (isLocal) { | ||
| // JobProgressListener will hold an reference of it during | ||
| // JobProgressListener will hold a reference of it during | ||
| // onExecutorMetricsUpdate(), then JobProgressListener can not see | ||
| // the changes of metrics any more, so make a deep copy of it | ||
| val copiedMetrics = Utils.deserialize[TaskMetrics](Utils.serialize(metrics)) | ||
|
|
@@ -444,7 +450,19 @@ private[spark] class Executor( | |
| } | ||
| } | ||
|
|
||
| val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId) | ||
| env.blockTransferService.getMemMetrics(this.executorMetrics) | ||
| val executorMetrics = if (isLocal) { | ||
| // JobProgressListener might hold a reference of it during onExecutorMetricsUpdate() | ||
| // in future, if then JobProgressListener cannot see the changes of metrics any | ||
| // more, so make a deep copy of it here for future change. | ||
| Utils.deserialize[ExecutorMetrics](Utils.serialize(this.executorMetrics)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. whats the point of this? sorry if we discussed it earlier ... if its just to test the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is due to SPARK-3465. Currently we do not have any aggregation operations for
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah, this is a great point. In that case, I think you can leave it in for now, but add a comment that the serialization & deserialization is just to make a copy, for to SPARK-3465 |
||
| } else { | ||
| this.executorMetrics | ||
| } | ||
|
|
||
| val message = Heartbeat( | ||
| executorId, executorMetrics, tasksMetrics.toArray, env.blockManager.blockManagerId) | ||
|
|
||
| try { | ||
| val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse]( | ||
| message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s")) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,91 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.executor | ||
|
|
||
| import org.apache.spark.annotation.DeveloperApi | ||
|
|
||
| /** | ||
| * :: DeveloperApi :: | ||
| * Metrics tracked during the execution of an executor. | ||
| * | ||
| * So, when adding new fields, take into consideration that the whole object can be serialized for | ||
| * shipping off at any time to consumers of the SparkListener interface. | ||
| */ | ||
| @DeveloperApi | ||
| class ExecutorMetrics extends Serializable { | ||
|
|
||
| /** | ||
| * Host's name the executor runs on | ||
| */ | ||
| private var _hostname: String = "" | ||
| def hostname: String = _hostname | ||
| private[spark] def setHostname(value: String) = _hostname = value | ||
|
|
||
| /** | ||
| * Host's port the executor runs on | ||
| */ | ||
| private var _port: Option[Int] = None | ||
| def port: Option[Int] = _port | ||
| private[spark] def setPort(value: Option[Int]) = _port = value | ||
|
|
||
| private[spark] def hostPort: String = { | ||
| val hp = port match { | ||
| case None => hostname | ||
| case value => hostname + ":" + value.get | ||
| } | ||
| hp | ||
| } | ||
|
|
||
| private var _transportMetrics: TransportMetrics = new TransportMetrics | ||
| def transportMetrics: TransportMetrics = _transportMetrics | ||
| private[spark] def setTransportMetrics(value: TransportMetrics) = { | ||
| _transportMetrics = value | ||
| } | ||
| } | ||
|
|
||
| object ExecutorMetrics extends Serializable { | ||
| def apply( | ||
| hostName: String, | ||
| port: Option[Int], | ||
| transportMetrics: TransportMetrics): ExecutorMetrics = { | ||
| val execMetrics = new ExecutorMetrics | ||
| execMetrics.setHostname(hostName) | ||
| execMetrics.setPort(port) | ||
| execMetrics.setTransportMetrics(transportMetrics) | ||
| execMetrics | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * :: DeveloperApi :: | ||
| * Metrics for network layer | ||
| */ | ||
| @DeveloperApi | ||
| class TransportMetrics ( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this needs to be
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that's true, thank you for pointing it out. |
||
| val timeStamp: Long = System.currentTimeMillis, | ||
| val onHeapSize: Long = 0L, | ||
| val offHeapSize: Long = 0L) extends Serializable | ||
|
|
||
| object TransportMetrics extends Serializable { | ||
| def apply( | ||
| timeStamp: Long, | ||
| onHeapSize: Long, | ||
| offHeapSize: Long): TransportMetrics = { | ||
| new TransportMetrics(timeStamp, onHeapSize, offHeapSize) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the hostname really enough? What about with multiple executors on the same host? The
Heartbeatalready has theexecutorId, maybe that is all we need?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to use HOST:PORT, but executor port cannot get here, which should get from
RpcEnv.adress.port. We can getexecutorIdon the driver side when receiving the message, that might be enough to identify the different executors, but since we will show the removed executors on the page, so we cannot know where the executor locate by onlyexecutorId, because the Executor tab only shows the active executors. we can remove the hostname here if we support showing the removed executors on Executor tab.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry I forgot about this in my last round. I don't understand the first sentence of your reply -- why can't you get the hostname & port? it looks like you can do exactly what you suggested to get the host & port from the rpc env with
executorMetrics.setHostPort(env.rpcEnv.address.hostPort)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry that I forgot the reason why I didn't get the port from
rpcEnvoriginally, but it seems we can get the port from it directly. Let me try to add it back.