Skip to content

Commit be3933d

Browse files
Michael Allmanrxin
authored andcommitted
[SPARK-17993][SQL] Fix Parquet log output redirection
(Link to Jira issue: https://issues.apache.org/jira/browse/SPARK-17993) ## What changes were proposed in this pull request? PR #14690 broke parquet log output redirection for converted partitioned Hive tables. For example, when querying parquet files written by Parquet-mr 1.6.0 Spark prints a torrent of (harmless) warning messages from the Parquet reader: ``` Oct 18, 2016 7:42:18 PM WARNING: org.apache.parquet.CorruptStatistics: Ignoring statistics because created_by could not be parsed (see PARQUET-251): parquet-mr version 1.6.0 org.apache.parquet.VersionParser$VersionParseException: Could not parse created_by: parquet-mr version 1.6.0 using format: (.+) version ((.*) )?\(build ?(.*)\) at org.apache.parquet.VersionParser.parse(VersionParser.java:112) at org.apache.parquet.CorruptStatistics.shouldIgnoreStatistics(CorruptStatistics.java:60) at org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:263) at org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:583) at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:513) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:270) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:225) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137) at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:162) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:372) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` This only happens during execution, not planning, and it doesn't matter what log level the `SparkContext` is set to. That's because Parquet (versions < 1.9) doesn't use slf4j for logging. Note, you can tell that log redirection is not working here because the log message format does not conform to the default Spark log message format. This is a regression I noted as something we needed to fix as a follow up. It appears that the problem arose because we removed the call to `inferSchema` during Hive table conversion. That call is what triggered the output redirection. ## How was this patch tested? I tested this manually in four ways: 1. Executing `spark.sqlContext.range(10).selectExpr("id as a").write.mode("overwrite").parquet("test")`. 2. Executing `spark.read.format("parquet").load(legacyParquetFile).show` for a Parquet file `legacyParquetFile` written using Parquet-mr 1.6.0. 3. Executing `select * from legacy_parquet_table limit 1` for some unpartitioned Parquet-based Hive table written using Parquet-mr 1.6.0. 4. Executing `select * from legacy_partitioned_parquet_table where partcol=x limit 1` for some partitioned Parquet-based Hive table written using Parquet-mr 1.6.0. I ran each test with a new instance of `spark-shell` or `spark-sql`. Incidentally, I found that test case 3 was not a regression—redirection was not occurring in the master codebase prior to #14690. I spent some time working on a unit test, but based on my experience working on this ticket I feel that automated testing here is far from feasible. cc ericl dongjoon-hyun Author: Michael Allman <[email protected]> Closes #15538 from mallman/spark-17993-fix_parquet_log_redirection. (cherry picked from commit b533fa2) Signed-off-by: Reynold Xin <[email protected]>
1 parent 62236b9 commit be3933d

File tree

4 files changed

+90
-48
lines changed

4 files changed

+90
-48
lines changed
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.execution.datasources.parquet;
18+
19+
import java.io.Serializable;
20+
import java.util.logging.Handler;
21+
import java.util.logging.Logger;
22+
23+
import org.apache.parquet.Log;
24+
import org.slf4j.bridge.SLF4JBridgeHandler;
25+
26+
// Redirects the JUL logging for parquet-mr versions <= 1.8 to SLF4J logging using
27+
// SLF4JBridgeHandler. Parquet-mr versions >= 1.9 use SLF4J directly
28+
final class ParquetLogRedirector implements Serializable {
29+
// Client classes should hold a reference to INSTANCE to ensure redirection occurs. This is
30+
// especially important for Serializable classes where fields are set but constructors are
31+
// ignored
32+
static final ParquetLogRedirector INSTANCE = new ParquetLogRedirector();
33+
34+
// JUL loggers must be held by a strong reference, otherwise they may get destroyed by GC.
35+
// However, the root JUL logger used by Parquet isn't properly referenced. Here we keep
36+
// references to loggers in both parquet-mr <= 1.6 and 1.7/1.8
37+
private static final Logger apacheParquetLogger =
38+
Logger.getLogger(Log.class.getPackage().getName());
39+
private static final Logger parquetLogger = Logger.getLogger("parquet");
40+
41+
static {
42+
// For parquet-mr 1.7 and 1.8, which are under `org.apache.parquet` namespace.
43+
try {
44+
Class.forName(Log.class.getName());
45+
redirect(Logger.getLogger(Log.class.getPackage().getName()));
46+
} catch (ClassNotFoundException ex) {
47+
throw new RuntimeException(ex);
48+
}
49+
50+
// For parquet-mr 1.6.0 and lower versions bundled with Hive, which are under `parquet`
51+
// namespace.
52+
try {
53+
Class.forName("parquet.Log");
54+
redirect(Logger.getLogger("parquet"));
55+
} catch (Throwable t) {
56+
// SPARK-9974: com.twitter:parquet-hadoop-bundle:1.6.0 is not packaged into the assembly
57+
// when Spark is built with SBT. So `parquet.Log` may not be found. This try/catch block
58+
// should be removed after this issue is fixed.
59+
}
60+
}
61+
62+
private ParquetLogRedirector() {
63+
}
64+
65+
private static void redirect(Logger logger) {
66+
for (Handler handler : logger.getHandlers()) {
67+
logger.removeHandler(handler);
68+
}
69+
logger.setUseParentHandlers(false);
70+
logger.addHandler(new SLF4JBridgeHandler());
71+
}
72+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 12 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.sql.execution.datasources.parquet
1919

2020
import java.net.URI
21-
import java.util.logging.{Logger => JLogger}
2221

2322
import scala.collection.JavaConverters._
2423
import scala.collection.mutable
@@ -29,14 +28,12 @@ import org.apache.hadoop.fs.{FileStatus, Path}
2928
import org.apache.hadoop.mapreduce._
3029
import org.apache.hadoop.mapreduce.lib.input.FileSplit
3130
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
32-
import org.apache.parquet.{Log => ApacheParquetLog}
3331
import org.apache.parquet.filter2.compat.FilterCompat
3432
import org.apache.parquet.filter2.predicate.FilterApi
3533
import org.apache.parquet.hadoop._
3634
import org.apache.parquet.hadoop.codec.CodecConfig
3735
import org.apache.parquet.hadoop.util.ContextUtil
3836
import org.apache.parquet.schema.MessageType
39-
import org.slf4j.bridge.SLF4JBridgeHandler
4037

4138
import org.apache.spark.{SparkException, TaskContext}
4239
import org.apache.spark.internal.Logging
@@ -56,6 +53,11 @@ class ParquetFileFormat
5653
with DataSourceRegister
5754
with Logging
5855
with Serializable {
56+
// Hold a reference to the (serializable) singleton instance of ParquetLogRedirector. This
57+
// ensures the ParquetLogRedirector class is initialized whether an instance of ParquetFileFormat
58+
// is constructed or deserialized. Do not heed the Scala compiler's warning about an unused field
59+
// here.
60+
private val parquetLogRedirector = ParquetLogRedirector.INSTANCE
5961

6062
override def shortName(): String = "parquet"
6163

@@ -129,10 +131,14 @@ class ParquetFileFormat
129131
conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
130132
}
131133

132-
ParquetFileFormat.redirectParquetLogs()
133-
134134
new OutputWriterFactory {
135-
override def newInstance(
135+
// This OutputWriterFactory instance is deserialized when writing Parquet files on the
136+
// executor side without constructing or deserializing ParquetFileFormat. Therefore, we hold
137+
// another reference to ParquetLogRedirector.INSTANCE here to ensure the latter class is
138+
// initialized.
139+
private val parquetLogRedirector = ParquetLogRedirector.INSTANCE
140+
141+
override def newInstance(
136142
path: String,
137143
dataSchema: StructType,
138144
context: TaskAttemptContext): OutputWriter = {
@@ -673,44 +679,4 @@ object ParquetFileFormat extends Logging {
673679
Failure(cause)
674680
}.toOption
675681
}
676-
677-
// JUL loggers must be held by a strong reference, otherwise they may get destroyed by GC.
678-
// However, the root JUL logger used by Parquet isn't properly referenced. Here we keep
679-
// references to loggers in both parquet-mr <= 1.6 and >= 1.7
680-
val apacheParquetLogger: JLogger = JLogger.getLogger(classOf[ApacheParquetLog].getPackage.getName)
681-
val parquetLogger: JLogger = JLogger.getLogger("parquet")
682-
683-
// Parquet initializes its own JUL logger in a static block which always prints to stdout. Here
684-
// we redirect the JUL logger via SLF4J JUL bridge handler.
685-
val redirectParquetLogsViaSLF4J: Unit = {
686-
def redirect(logger: JLogger): Unit = {
687-
logger.getHandlers.foreach(logger.removeHandler)
688-
logger.setUseParentHandlers(false)
689-
logger.addHandler(new SLF4JBridgeHandler)
690-
}
691-
692-
// For parquet-mr 1.7.0 and above versions, which are under `org.apache.parquet` namespace.
693-
// scalastyle:off classforname
694-
Class.forName(classOf[ApacheParquetLog].getName)
695-
// scalastyle:on classforname
696-
redirect(JLogger.getLogger(classOf[ApacheParquetLog].getPackage.getName))
697-
698-
// For parquet-mr 1.6.0 and lower versions bundled with Hive, which are under `parquet`
699-
// namespace.
700-
try {
701-
// scalastyle:off classforname
702-
Class.forName("parquet.Log")
703-
// scalastyle:on classforname
704-
redirect(JLogger.getLogger("parquet"))
705-
} catch { case _: Throwable =>
706-
// SPARK-9974: com.twitter:parquet-hadoop-bundle:1.6.0 is not packaged into the assembly
707-
// when Spark is built with SBT. So `parquet.Log` may not be found. This try/catch block
708-
// should be removed after this issue is fixed.
709-
}
710-
}
711-
712-
/**
713-
* ParquetFileFormat.prepareWrite calls this function to initialize `redirectParquetLogsViaSLF4J`.
714-
*/
715-
def redirectParquetLogs(): Unit = {}
716682
}

sql/core/src/test/resources/log4j.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,5 +53,5 @@ log4j.additivity.hive.ql.metadata.Hive=false
5353
log4j.logger.hive.ql.metadata.Hive=OFF
5454

5555
# Parquet related logging
56-
log4j.logger.org.apache.parquet.hadoop=WARN
57-
log4j.logger.org.apache.spark.sql.parquet=INFO
56+
log4j.logger.org.apache.parquet=ERROR
57+
log4j.logger.parquet=ERROR

sql/hive/src/test/resources/log4j.properties

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,7 @@ log4j.logger.hive.ql.metadata.Hive=OFF
5959

6060
log4j.additivity.org.apache.hadoop.hive.ql.io.RCFile=false
6161
log4j.logger.org.apache.hadoop.hive.ql.io.RCFile=ERROR
62+
63+
# Parquet related logging
64+
log4j.logger.org.apache.parquet=ERROR
65+
log4j.logger.parquet=ERROR

0 commit comments

Comments
 (0)