Skip to content

Commit c35e50e

Browse files
wakunGitHub Enterprise
authored andcommitted
[CARMEL-6434] Expose active sql related metrics from driver (#1171)
* [CARMEL-6434] Expose active sql related metrics from driver * Use atomic variable instead of int * Count download data and upload data queries
1 parent a99b6e3 commit c35e50e

File tree

6 files changed

+40
-3
lines changed

6 files changed

+40
-3
lines changed

core/src/main/scala/org/apache/spark/WorkQueueSource.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,12 @@ class WorkQueueSource(
184184
}
185185
})
186186

187+
metricRegistry.register(MetricRegistry.name("task-scheduler.runningTasks"), new Gauge[Int] {
188+
override def getValue: Int = {
189+
taskScheduler.get.taskSummary().runningTasks
190+
}
191+
})
192+
187193
dagScheduler.map { scheduler =>
188194
scheduler.stageRetryCount.indices.foreach { idx =>
189195
metricRegistry.register(MetricRegistry.name(s"task-scheduler.stage-retry-count-${idx + 1}"),
@@ -195,6 +201,20 @@ class WorkQueueSource(
195201
}
196202
}
197203

204+
metricRegistry.register(MetricRegistry.name(s"task-scheduler.activeSessions"),
205+
new Gauge[Int] {
206+
override def getValue: Int = {
207+
TaskSchedulerImpl.activeSessions.get()
208+
}
209+
})
210+
211+
metricRegistry.register(MetricRegistry.name(s"task-scheduler.activeSqls"),
212+
new Gauge[Int] {
213+
override def getValue: Int = {
214+
TaskSchedulerImpl.activeSqls.get()
215+
}
216+
})
217+
198218
}
199219

200220
if (schedulerBackend.isDefined) {

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1552,4 +1552,6 @@ private[spark] object TaskSchedulerImpl {
15521552
}
15531553
}
15541554

1555+
var activeSqls: AtomicInteger = new AtomicInteger(0)
1556+
var activeSessions: AtomicInteger = new AtomicInteger(0)
15551557
}

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkDownloadDataOperation.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import org.apache.hive.service.cli.session.HiveSession
3939

4040
import org.apache.spark.SparkContext
4141
import org.apache.spark.internal.{Logging, QueryLogging}
42+
import org.apache.spark.scheduler.TaskSchedulerImpl
4243
import org.apache.spark.sql.{Column, DataFrame, SaveMode, SQLContext}
4344
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
4445
import org.apache.spark.sql.execution._
@@ -237,6 +238,7 @@ private[hive] class SparkDownloadDataOperation(
237238

238239
logInfo(s"Running query [$statementId] in session " +
239240
s"[${parentSession.getSessionHandle.getSessionId.toString}] DOWNLOAD '$query'")
241+
TaskSchedulerImpl.activeSqls.incrementAndGet()
240242
val resultPath = if (keepDataType) {
241243
writeDataKeepDataType(new Path(pathPrefix, statementId))
242244
} else {
@@ -301,6 +303,8 @@ private[hive] class SparkDownloadDataOperation(
301303
statementId, Utils.findFirstCause(e).toString, Utils.exceptionString(e))
302304
val exception = new HiveSQLException(e)
303305
setOperationException(exception)
306+
} finally {
307+
TaskSchedulerImpl.activeSqls.decrementAndGet()
304308
}
305309
}
306310

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.apache.hive.service.cli.session.HiveSession
3636
import org.apache.spark.SparkContext
3737
import org.apache.spark.broadcast.CorruptBroadcastException
3838
import org.apache.spark.internal.{config, Logging, QueryLogging}
39-
import org.apache.spark.scheduler.{RepeatableIterator, SpilledResultIterator, UserInfo}
39+
import org.apache.spark.scheduler.{RepeatableIterator, SpilledResultIterator, TaskSchedulerImpl, UserInfo}
4040
import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLContext}
4141
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
4242
import org.apache.spark.sql.catalyst.plans.logical.{Command, ParsedStatement}
@@ -335,6 +335,7 @@ private[hive] class SparkExecuteStatementOperation(
335335
sqlContext.sparkContext.userResourceManager.
336336
foreach(_.requestQuery(UserInfo(userName, profile, None)))
337337

338+
TaskSchedulerImpl.activeSqls.incrementAndGet()
338339
withRetry {
339340
result = sqlContext.sql(statement)
340341
logDebug(result.queryExecution.toString())
@@ -385,6 +386,7 @@ private[hive] class SparkExecuteStatementOperation(
385386
}
386387
}
387388
} finally {
389+
TaskSchedulerImpl.activeSqls.decrementAndGet()
388390
synchronized {
389391
if (!getStatus.getState.isTerminal) {
390392
setState(OperationState.FINISHED)

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.hive.service.server.HiveServer2
3030

3131
import org.apache.spark.SparkContext
3232
import org.apache.spark.internal.Logging
33-
import org.apache.spark.scheduler.UserInfo
33+
import org.apache.spark.scheduler.{TaskSchedulerImpl, UserInfo}
3434
import org.apache.spark.sql.SQLContext
3535
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
3636
import org.apache.spark.sql.hive.HiveUtils
@@ -107,6 +107,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext:
107107
}
108108
}
109109
sparkSqlOperationManager.sessionToContexts.put(sessionHandle, ctx)
110+
TaskSchedulerImpl.activeSessions.incrementAndGet()
110111
logInfo(s"Successfully open session ${sessionHandle.getSessionId.toString}")
111112
sessionHandle
112113
} catch {
@@ -135,6 +136,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext:
135136
ctx.clearTempTables()
136137
super.closeSession(sessionHandle)
137138
sparkSqlOperationManager.sessionToContexts.remove(sessionHandle)
139+
TaskSchedulerImpl.activeSessions.decrementAndGet()
138140

139141
// Remove temporary folder generated by data upload operation
140142
val dataUploadBaseDir = Some(Utils.resolveURI(

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkTransferDataOperation.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.hive.service.cli.operation.TransferDataOperation
3030
import org.apache.hive.service.cli.session.HiveSession
3131

3232
import org.apache.spark.internal.{Logging, QueryLogging}
33+
import org.apache.spark.scheduler.TaskSchedulerImpl
3334
import org.apache.spark.sql.{DataFrame, SQLContext}
3435
import org.apache.spark.sql.hive.thriftserver.errors.QueryLevelRestrictionErrors
3536
import org.apache.spark.util.Utils
@@ -83,7 +84,13 @@ private[hive] class SparkTransferDataOperation(
8384
if(!fileSystem.exists(new Path(sessionPath))) {
8485
fileSystem.mkdirs(new Path(sessionPath))
8586
}
86-
persistData(fileSystem, hadoopConf, new Path(getDataPath(dataUploadBaseDir, sessionId, path)))
87+
try {
88+
TaskSchedulerImpl.activeSqls.incrementAndGet()
89+
val dataPath = new Path(getDataPath(dataUploadBaseDir, sessionId, path))
90+
persistData(fileSystem, hadoopConf, dataPath)
91+
} finally {
92+
TaskSchedulerImpl.activeSqls.decrementAndGet()
93+
}
8794
}
8895

8996
private def persistData(fileSystem: FileSystem, hadoopConf: Configuration,

0 commit comments

Comments
 (0)