Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,21 @@ object LogKey extends Enumeration {
val APP_DESC = Value
val APP_ID = Value
val APP_STATE = Value
val BATCH_ID = Value
val BLOCK_ID = Value
val BLOCK_MANAGER_ID = Value
val BROADCAST_ID = Value
val BUCKET = Value
val BYTECODE_SIZE = Value
val CACHE_AUTO_REMOVED_SIZE = Value
val CACHE_UNTIL_HIGHEST_CONSUMED_SIZE = Value
val CACHE_UNTIL_LAST_PRODUCED_SIZE = Value
val CATEGORICAL_FEATURES = Value
val CLASS_LOADER = Value
val CLASS_NAME = Value
val CLUSTER_ID = Value
val CODEC_LEVEL = Value
val CODEC_NAME = Value
val COLUMN_DATA_TYPE_SOURCE = Value
val COLUMN_DATA_TYPE_TARGET = Value
val COLUMN_DEFAULT_VALUE = Value
Expand All @@ -47,6 +53,7 @@ object LogKey extends Enumeration {
val CONFIG3 = Value
val CONFIG4 = Value
val CONFIG5 = Value
val CONSUMER = Value
val CONTAINER = Value
val CONTAINER_ID = Value
val COUNT = Value
Expand All @@ -59,13 +66,18 @@ object LogKey extends Enumeration {
val DATA = Value
val DATABASE_NAME = Value
val DATAFRAME_CACHE_ENTRY = Value
val DATAFRAME_ID = Value
val DESCRIPTION = Value
val DRIVER_ID = Value
val DROPPED_PARTITIONS = Value
val DURATION = Value
val END_POINT = Value
val ENGINE = Value
val ERROR = Value
val EVENT_LOOP = Value
val EVENT_QUEUE = Value
val EXECUTE_INFO = Value
val EXECUTE_KEY = Value
val EXECUTOR_ENV_REGEX = Value
val EXECUTOR_ID = Value
val EXECUTOR_IDS = Value
Expand All @@ -77,32 +89,42 @@ object LogKey extends Enumeration {
val FIELD_NAME = Value
val FILE_FORMAT = Value
val FILE_FORMAT2 = Value
val FROM_OFFSET = Value
val FUNCTION_NAME = Value
val FUNCTION_PARAMETER = Value
val GROUP_ID = Value
val HADOOP_VERSION = Value
val HIVE_OPERATION_STATE = Value
val HIVE_OPERATION_TYPE = Value
val HOST = Value
val HOST_PORT = Value
val INDEX = Value
val INFERENCE_MODE = Value
val INITIAL_CAPACITY = Value
val INTERVAL = Value
val JOB_ID = Value
val JOIN_CONDITION = Value
val JOIN_CONDITION_SUB_EXPRESSION = Value
val KAFKA_PULLS_COUNT = Value
val KAFKA_RECORDS_PULLED_COUNT = Value
val KEY = Value
val LAST_ACCESS_TIME = Value
val LEARNING_RATE = Value
val LINE = Value
val LINE_NUM = Value
val LISTENER = Value
val LOAD_FACTOR = Value
val LOG_TYPE = Value
val MASTER_URL = Value
val MAX_ATTEMPTS = Value
val MAX_CACHE_UNTIL_HIGHEST_CONSUMED_SIZE = Value
val MAX_CACHE_UNTIL_LAST_PRODUCED_SIZE = Value
val MAX_CAPACITY = Value
val MAX_CATEGORIES = Value
val MAX_EXECUTOR_FAILURES = Value
val MAX_SIZE = Value
val MERGE_DIR_NAME = Value
val MESSAGE = Value
val METHOD_NAME = Value
val MIN_SIZE = Value
val NEW_VALUE = Value
Expand All @@ -129,6 +151,7 @@ object LogKey extends Enumeration {
val POLICY = Value
val PORT = Value
val PRODUCER_ID = Value
val QUERY_CACHE_VALUE = Value
val QUERY_HINT = Value
val QUERY_ID = Value
val QUERY_PLAN = Value
Expand All @@ -138,21 +161,24 @@ object LogKey extends Enumeration {
val RANGE = Value
val RDD_ID = Value
val REASON = Value
val REATTACHABLE = Value
val RECEIVED_BLOCK_INFO = Value
val REDUCE_ID = Value
val RELATION_NAME = Value
val REMAINING_PARTITIONS = Value
val RESOURCE_NAME = Value
val RETRY_COUNT = Value
val RETRY_INTERVAL = Value
val RPC_ADDRESS = Value
val RULE_BATCH_NAME = Value
val RULE_NAME = Value
val RULE_NUMBER_OF_RUNS = Value
val RUN_ID = Value
val SCHEMA = Value
val SCHEMA2 = Value
val SERVICE_NAME = Value
val SESSION_HOLD_INFO = Value
val SESSION_ID = Value
val SESSION_KEY = Value
val SHARD_ID = Value
val SHUFFLE_BLOCK_INFO = Value
val SHUFFLE_ID = Value
Expand All @@ -176,12 +202,20 @@ object LogKey extends Enumeration {
val THREAD = Value
val THREAD_NAME = Value
val TID = Value
val TIME = Value
val TIMEOUT = Value
val TIME_UNITS = Value
val TIP = Value
val TOPIC = Value
val TOPIC_PARTITION = Value
val TOPIC_PARTITIONS = Value
val TOPIC_PARTITION_OFFSET = Value
val TOPIC_PARTITION_OFFSET_RANGE = Value
val TOTAL_EFFECTIVE_TIME = Value
val TOTAL_RECORDS_READ = Value
val TOTAL_SIZE = Value
val TOTAL_TIME = Value
val TOTAL_TIME_READ = Value
val UNSUPPORTED_EXPRESSION = Value
val UNSUPPORTED_HINT_REASON = Value
val UNTIL_OFFSET = Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.hadoop.mapreduce.Job

import org.apache.spark.{SparkException, SparkIllegalArgumentException}
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{CONFIG, PATH}
import org.apache.spark.internal.LogKey.{CODEC_LEVEL, CODEC_NAME, CONFIG, PATH}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.AvroCompressionCodec._
import org.apache.spark.sql.avro.AvroOptions.IGNORE_EXTENSION
Expand Down Expand Up @@ -118,7 +118,8 @@ private[sql] object AvroUtils extends Logging {
if (compressed.getSupportCompressionLevel) {
val level = sqlConf.getConfString(s"spark.sql.avro.$codecName.level",
compressed.getDefaultCompressionLevel.toString)
logInfo(s"Compressing Avro output using the $codecName codec at level $level")
logInfo(log"Compressing Avro output using the ${MDC(CODEC_NAME, codecName)} codec " +
log"at level ${MDC(CODEC_LEVEL, level)}")
val s = if (compressed == ZSTANDARD) {
val bufferPoolEnabled = sqlConf.getConf(SQLConf.AVRO_ZSTANDARD_BUFFER_POOL_ENABLED)
jobConf.setBoolean(AvroOutputFormat.ZSTD_BUFFERPOOL_KEY, bufferPoolEnabled)
Expand All @@ -128,7 +129,7 @@ private[sql] object AvroUtils extends Logging {
}
jobConf.setInt(s"avro.mapred.$s.level", level.toInt)
} else {
logInfo(s"Compressing Avro output using the $codecName codec")
logInfo(log"Compressing Avro output using the ${MDC(CODEC_NAME, codecName)} codec")
}
}
case unknown =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import io.grpc.stub.{ServerCallStreamObserver, StreamObserver}
import org.apache.spark.{SparkEnv, SparkSQLException}
import org.apache.spark.connect.proto.ExecutePlanResponse
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{INDEX, OP_ID, TOTAL_TIME, WAIT_RESULT_TIME, WAIT_SEND_TIME}
import org.apache.spark.internal.LogKey._
import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_MILLIS
import org.apache.spark.sql.connect.common.ProtoUtils
import org.apache.spark.sql.connect.config.Connect.{CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION, CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_SIZE, CONNECT_PROGRESS_REPORT_INTERVAL}
Expand Down Expand Up @@ -183,9 +183,9 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
*/
def execute(lastConsumedStreamIndex: Long): Unit = {
logInfo(
s"Starting for opId=${executeHolder.operationId}, " +
s"reattachable=${executeHolder.reattachable}, " +
s"lastConsumedStreamIndex=$lastConsumedStreamIndex")
log"Starting for opId=${MDC(OP_ID, executeHolder.operationId)}, " +
log"reattachable=${MDC(REATTACHABLE, executeHolder.reattachable)}, " +
log"lastConsumedStreamIndex=${MDC(STREAM_ID, lastConsumedStreamIndex)}")
val startTime = System.nanoTime()

var nextIndex = lastConsumedStreamIndex + 1
Expand Down Expand Up @@ -294,11 +294,13 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
}
} else if (streamFinished) {
// Stream is finished and all responses have been sent
logInfo(
s"Stream finished for opId=${executeHolder.operationId}, " +
s"sent all responses up to last index ${nextIndex - 1}. " +
s"totalTime=${System.nanoTime - startTime}ns " +
s"waitingForResults=${consumeSleep}ns waitingForSend=${sendSleep}ns")
// scalastyle:off line.size.limit
logInfo(log"Stream finished for opId=${MDC(OP_ID, executeHolder.operationId)}, " +
log"sent all responses up to last index ${MDC(STREAM_ID, nextIndex - 1)}. " +
log"totalTime=${MDC(TOTAL_TIME, (System.nanoTime - startTime) / NANOS_PER_MILLIS.toDouble)} ms " +
log"waitingForResults=${MDC(WAIT_RESULT_TIME, consumeSleep / NANOS_PER_MILLIS.toDouble)} ms " +
log"waitingForSend=${MDC(WAIT_SEND_TIME, sendSleep / NANOS_PER_MILLIS.toDouble)} ms")
// scalastyle:on line.size.limit
executionObserver.getError() match {
case Some(t) => grpcObserver.onError(t)
case None => grpcObserver.onCompleted()
Expand All @@ -307,11 +309,14 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
} else if (deadlineLimitReached) {
// The stream is not complete, but should be finished now.
// The client needs to reattach with ReattachExecute.
logInfo(
s"Deadline reached, shutting down stream for opId=${executeHolder.operationId} " +
s"after index ${nextIndex - 1}. " +
s"totalTime=${System.nanoTime - startTime}ns " +
s"waitingForResults=${consumeSleep}ns waitingForSend=${sendSleep}ns")
// scalastyle:off line.size.limit
logInfo(log"Deadline reached, shutting down stream for " +
log"opId=${MDC(OP_ID, executeHolder.operationId)} " +
log"after index ${MDC(STREAM_ID, nextIndex - 1)}. " +
log"totalTime=${MDC(TOTAL_TIME, (System.nanoTime - startTime) / NANOS_PER_MILLIS.toDouble)} ms " +
log"waitingForResults=${MDC(WAIT_RESULT_TIME, consumeSleep / NANOS_PER_MILLIS.toDouble)} ms " +
log"waitingForSend=${MDC(WAIT_SEND_TIME, sendSleep / NANOS_PER_MILLIS.toDouble)} ms")
// scalastyle:on line.size.limit
grpcObserver.onCompleted()
finished = true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import io.grpc.stub.StreamObserver

import org.apache.spark.{SparkEnv, SparkSQLException}
import org.apache.spark.connect.proto
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey
import org.apache.spark.sql.connect.config.Connect.CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE
import org.apache.spark.sql.connect.service.ExecuteHolder

Expand Down Expand Up @@ -242,14 +243,16 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder:
/** Remove all cached responses */
def removeAll(): Unit = responseLock.synchronized {
removeResponsesUntilIndex(lastProducedIndex)
// scalastyle:off line.size.limit
logInfo(
s"Release all for opId=${executeHolder.operationId}. Execution stats: " +
s"total=${totalSize} " +
s"autoRemoved=${autoRemovedSize} " +
s"cachedUntilConsumed=$cachedSizeUntilHighestConsumed " +
s"cachedUntilProduced=$cachedSizeUntilLastProduced " +
s"maxCachedUntilConsumed=${cachedSizeUntilHighestConsumed.max} " +
s"maxCachedUntilProduced=${cachedSizeUntilLastProduced.max}")
log"Release all for opId=${MDC(LogKey.OP_ID, executeHolder.operationId)}. Execution stats: " +
log"total=${MDC(LogKey.TOTAL_SIZE, totalSize)} " +
log"autoRemoved=${MDC(LogKey.CACHE_AUTO_REMOVED_SIZE, autoRemovedSize)} " +
log"cachedUntilConsumed=${MDC(LogKey.CACHE_UNTIL_HIGHEST_CONSUMED_SIZE, cachedSizeUntilHighestConsumed)} " +
log"cachedUntilProduced=${MDC(LogKey.CACHE_UNTIL_LAST_PRODUCED_SIZE, cachedSizeUntilLastProduced)} " +
log"maxCachedUntilConsumed=${MDC(LogKey.MAX_CACHE_UNTIL_HIGHEST_CONSUMED_SIZE, cachedSizeUntilHighestConsumed.max)} " +
log"maxCachedUntilProduced=${MDC(LogKey.MAX_CACHE_UNTIL_LAST_PRODUCED_SIZE, cachedSizeUntilLastProduced.max)}")
// scalastyle:on line.size.limit
}

/** Returns if the stream is finished. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ import org.apache.spark.connect.proto.ExecutePlanResponse.SqlCommandResult
import org.apache.spark.connect.proto.Parse.ParseFormat
import org.apache.spark.connect.proto.StreamingQueryManagerCommandResult.StreamingQueryInstance
import org.apache.spark.connect.proto.WriteStreamOperationStart.TriggerCase
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.SESSION_ID
import org.apache.spark.ml.{functions => MLFunctions}
import org.apache.spark.resource.{ExecutorResourceRequest, ResourceProfile, TaskResourceProfile, TaskResourceRequest}
import org.apache.spark.sql.{Column, Dataset, Encoders, ForeachWriter, Observation, RelationalGroupedDataset, SparkSession}
Expand Down Expand Up @@ -3131,7 +3132,9 @@ class SparkConnectPlanner(
}
} catch {
case NonFatal(ex) => // Failed to start the query, clean up foreach runner if any.
logInfo(s"Removing foreachBatch worker, query failed to start for session $sessionId.")
logInfo(
log"Removing foreachBatch worker, query failed to start " +
log"for session ${MDC(SESSION_ID, sessionId)}.")
foreachBatchRunnerCleaner.foreach(_.close())
throw ex
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import scala.util.control.NonFatal

import org.apache.spark.SparkException
import org.apache.spark.api.python.{PythonException, PythonWorkerUtils, SimplePythonFunction, SpecialLengths, StreamingPythonRunner}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{DATAFRAME_ID, QUERY_ID, RUN_ID, SESSION_ID}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.connect.service.SessionHolder
import org.apache.spark.sql.connect.service.SparkConnectService
Expand Down Expand Up @@ -63,7 +64,8 @@ object StreamingForeachBatchHelper extends Logging {
sessionHolder: SessionHolder): ForeachBatchFnType = { (df: DataFrame, batchId: Long) =>
{
val dfId = UUID.randomUUID().toString
logInfo(s"Caching DataFrame with id $dfId") // TODO: Add query id to the log.
// TODO: Add query id to the log.
logInfo(log"Caching DataFrame with id ${MDC(DATAFRAME_ID, dfId)}")

// TODO(SPARK-44462): Sanity check there is no other active DataFrame for this query.
// The query id needs to be saved in the cache for this check.
Expand All @@ -72,7 +74,7 @@ object StreamingForeachBatchHelper extends Logging {
try {
fn(FnArgsWithId(dfId, df, batchId))
} finally {
logInfo(s"Removing DataFrame with id $dfId from the cache")
logInfo(log"Removing DataFrame with id ${MDC(DATAFRAME_ID, dfId)} from the cache")
sessionHolder.removeCachedDataFrame(dfId)
}
}
Expand Down Expand Up @@ -133,7 +135,9 @@ object StreamingForeachBatchHelper extends Logging {
try {
dataIn.readInt() match {
case 0 =>
logInfo(s"Python foreach batch for dfId ${args.dfId} completed (ret: 0)")
logInfo(
log"Python foreach batch for dfId ${MDC(DATAFRAME_ID, args.dfId)} " +
log"completed (ret: 0)")
case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
val msg = PythonWorkerUtils.readUTF(dataIn)
throw new PythonException(
Expand Down Expand Up @@ -169,7 +173,9 @@ object StreamingForeachBatchHelper extends Logging {
private lazy val streamingListener = { // Initialized on first registered query
val listener = new StreamingRunnerCleanerListener
sessionHolder.session.streams.addListener(listener)
logInfo(s"Registered runner clean up listener for session ${sessionHolder.sessionId}")
logInfo(
log"Registered runner clean up listener for " +
log"session ${MDC(SESSION_ID, sessionHolder.sessionId)}")
listener
}

Expand All @@ -195,7 +201,9 @@ object StreamingForeachBatchHelper extends Logging {

private def cleanupStreamingRunner(key: CacheKey): Unit = {
Option(cleanerCache.remove(key)).foreach { cleaner =>
logInfo(s"Cleaning up runner for queryId ${key.queryId} runId ${key.runId}.")
logInfo(
log"Cleaning up runner for queryId ${MDC(QUERY_ID, key.queryId)} " +
log"runId ${MDC(RUN_ID, key.runId)}.")
cleaner.close()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import java.io.EOFException

import org.apache.spark.SparkException
import org.apache.spark.api.python.{PythonException, PythonWorkerUtils, SimplePythonFunction, SpecialLengths, StreamingPythonRunner}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.FUNCTION_NAME
import org.apache.spark.sql.connect.service.{SessionHolder, SparkConnectService}
import org.apache.spark.sql.streaming.StreamingQueryListener

Expand Down Expand Up @@ -82,7 +83,9 @@ class PythonStreamingQueryListener(listener: SimplePythonFunction, sessionHolder
try {
dataIn.readInt() match {
case 0 =>
logInfo(s"Streaming query listener function $functionName completed (ret: 0)")
logInfo(
log"Streaming query listener function ${MDC(FUNCTION_NAME, functionName)} " +
log"completed (ret: 0)")
case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
val msg = PythonWorkerUtils.readUTF(dataIn)
throw new PythonException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import io.grpc.ServerCall
import io.grpc.ServerCallHandler
import io.grpc.ServerInterceptor

import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{DESCRIPTION, MESSAGE}

/**
* A gRPC interceptor to log RPC requests and responses. It logs the protobufs as JSON. Useful for
Expand All @@ -42,9 +43,11 @@ class LoggingInterceptor extends ServerInterceptor with Logging {
private def logProto[T](description: String, message: T): Unit = {
message match {
case m: Message =>
logInfo(s"$description:\n${jsonPrinter.print(m)}")
logInfo(log"${MDC(DESCRIPTION, description)}:\n${MDC(MESSAGE, jsonPrinter.print(m))}")
case other =>
logInfo(s"$description: (Unknown message type) $other")
logInfo(
log"${MDC(DESCRIPTION, description)}: " +
log"(Unknown message type) ${MDC(MESSAGE, other)}")
}
}

Expand Down
Loading