Skip to content

Commit 2a191cf

Browse files
committed
[SPARK-26753][CORE] Fix for ensuring custom log levels for packages/classes work for spark-shell
This fix replaces the Threshold with a Filter for ConsoleAppender which checks to ensure that either the logLevel is greater than thresholdLevel (shell log level) or the log originated from a custom defined logger. In these cases, it lets a log event go through, otherwise it doesn't. Testing Done: 1. Ensured that custom log level works when set by default (via log4j.properties) 2. Ensured that logs are not printed twice when log level is changed by setLogLevel 3. Ensured that custom logs are printed when log level is changed back by setLogLevel
1 parent dfed439 commit 2a191cf

File tree

2 files changed

+57
-12
lines changed

2 files changed

+57
-12
lines changed

core/src/main/scala/org/apache/spark/internal/Logging.scala

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.log4j._
2525
import org.slf4j.{Logger, LoggerFactory}
2626
import org.slf4j.impl.StaticLoggerBinder
2727

28-
import org.apache.spark.util.Utils
28+
import org.apache.spark.util.{SparkShellLoggingFilter, Utils}
2929

3030
/**
3131
* Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows
@@ -156,14 +156,7 @@ trait Logging {
156156
}
157157
rootLogger.getAllAppenders().asScala.foreach {
158158
case ca: ConsoleAppender =>
159-
Option(ca.getThreshold()) match {
160-
case Some(t) =>
161-
Logging.consoleAppenderToThreshold.put(ca, t)
162-
if (!t.isGreaterOrEqual(replLevel)) {
163-
ca.setThreshold(replLevel)
164-
}
165-
case None => ca.setThreshold(replLevel)
166-
}
159+
ca.addFilter(new SparkShellLoggingFilter(replLevel))
167160
case _ => // no-op
168161
}
169162
}
@@ -182,7 +175,6 @@ private[spark] object Logging {
182175
@volatile private var initialized = false
183176
@volatile private var defaultRootLevel: Level = null
184177
@volatile private var defaultSparkLog4jConfig = false
185-
private val consoleAppenderToThreshold = new ConcurrentHashMap[ConsoleAppender, Priority]()
186178

187179
val initLock = new Object()
188180
try {
@@ -213,7 +205,21 @@ private[spark] object Logging {
213205
rootLogger.setLevel(defaultRootLevel)
214206
rootLogger.getAllAppenders().asScala.foreach {
215207
case ca: ConsoleAppender =>
216-
ca.setThreshold(consoleAppenderToThreshold.get(ca))
208+
// SparkShellLoggingFilter is the last filter
209+
ca.getFirstFilter() match {
210+
case ssf: SparkShellLoggingFilter =>
211+
ca.clearFilters()
212+
case f: org.apache.log4j.spi.Filter =>
213+
var previous = f
214+
var current = previous.getNext()
215+
while (current != null && !current.isInstanceOf[SparkShellLoggingFilter]) {
216+
previous = current;
217+
current = previous.getNext()
218+
}
219+
if (current != null) {
220+
previous.setNext(current.getNext())
221+
}
222+
}
217223
case _ => // no-op
218224
}
219225
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ import org.apache.hadoop.conf.Configuration
5353
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
5454
import org.apache.hadoop.security.UserGroupInformation
5555
import org.apache.hadoop.yarn.conf.YarnConfiguration
56+
import org.apache.log4j.{Level, LogManager}
57+
import org.apache.log4j.spi.{Filter, LoggingEvent}
5658
import org.eclipse.jetty.util.MultiException
5759
import org.slf4j.Logger
5860

@@ -2285,7 +2287,15 @@ private[spark] object Utils extends Logging {
22852287
val rootLogger = org.apache.log4j.Logger.getRootLogger()
22862288
rootLogger.setLevel(l)
22872289
rootLogger.getAllAppenders().asScala.foreach {
2288-
case ca: org.apache.log4j.ConsoleAppender => ca.setThreshold(l)
2290+
case ca: org.apache.log4j.ConsoleAppender =>
2291+
var f = ca.getFirstFilter()
2292+
while (f != null) {
2293+
f match {
2294+
case ssf: SparkShellLoggingFilter =>
2295+
ssf.setThresholdLevel(l)
2296+
}
2297+
f = f.getNext()
2298+
}
22892299
case _ => // no-op
22902300
}
22912301
}
@@ -2991,3 +3001,32 @@ private[spark] class CircularBuffer(sizeInBytes: Int = 10240) extends java.io.Ou
29913001
new String(nonCircularBuffer, StandardCharsets.UTF_8)
29923002
}
29933003
}
3004+
3005+
private[spark] class SparkShellLoggingFilter(var thresholdLevel: Level) extends Filter {
3006+
3007+
/**
3008+
* If log level of event is lower than thresholdLevel, then the decision is made based on
3009+
* whether the log came from root or some custom configuration
3010+
* @param loggingEvent
3011+
* @return decision for accept/deny log event
3012+
*/
3013+
def decide(loggingEvent: LoggingEvent): Int = {
3014+
val rootLevel = LogManager.getRootLogger().getLevel()
3015+
if (loggingEvent.getLevel().isGreaterOrEqual(thresholdLevel) ||
3016+
!loggingEvent.getLevel().eq(rootLevel)) {
3017+
return Filter.NEUTRAL
3018+
}
3019+
var logger = loggingEvent.getLogger()
3020+
while(logger.getParent() != null) {
3021+
if (logger.getLevel() != null) {
3022+
return Filter.NEUTRAL
3023+
}
3024+
logger = logger.getParent()
3025+
}
3026+
return Filter.DENY
3027+
}
3028+
3029+
private[spark] def setThresholdLevel(level: Level): Unit = {
3030+
thresholdLevel = level
3031+
}
3032+
}

0 commit comments

Comments
 (0)