Skip to content

Commit a1519d4

Browse files
committed
fix json schema inference
1 parent ba46703 commit a1519d4

File tree

3 files changed

+15
-5
lines changed

3 files changed

+15
-5
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.spark.internal.config._
3333
import org.apache.spark.network.util.ByteUnit
3434
import org.apache.spark.sql.catalyst.analysis.Resolver
3535
import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
36+
import org.apache.spark.util.Utils
3637

3738
////////////////////////////////////////////////////////////////////////////////////////////////////
3839
// This file defines the configuration options for Spark SQL.

sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,10 @@ object SQLExecution {
105105
}
106106
}
107107

108+
/**
109+
* Wrap an action with specified SQL configs. These configs will be propagated to the executor
110+
* side via job local properties.
111+
*/
108112
def withSQLConfPropagated[T](sparkSession: SparkSession)(body: => T): T = {
109113
val sc = sparkSession.sparkContext
110114
// Set all the specified SQL configs to local properties, so that they can be available at

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,9 @@ private[sql] object JsonInferSchema {
4545
val parseMode = configOptions.parseMode
4646
val columnNameOfCorruptRecord = configOptions.columnNameOfCorruptRecord
4747

48-
// perform schema inference on each row and merge afterwards
49-
val rootType = json.mapPartitions { iter =>
48+
// In each RDD partition, perform schema inference on each row and merge afterwards.
49+
val typeMerger = compatibleRootType(columnNameOfCorruptRecord, parseMode)
50+
val mergedTypesFromPartitions = json.mapPartitions { iter =>
5051
val factory = new JsonFactory()
5152
configOptions.setJacksonOptions(factory)
5253
iter.flatMap { row =>
@@ -66,9 +67,13 @@ private[sql] object JsonInferSchema {
6667
s"Parse Mode: ${FailFastMode.name}.", e)
6768
}
6869
}
69-
}
70-
}.fold(StructType(Nil))(
71-
compatibleRootType(columnNameOfCorruptRecord, parseMode))
70+
}.reduceOption(typeMerger).toIterator
71+
}
72+
73+
// Here we get RDD local iterator then fold, instead of calling `RDD.fold` directly, because
74+
// `RDD.fold` will run the fold function in DAGScheduler event loop thread, which may not have
75+
// active SparkSession and `SQLConf.get` may point to the wrong configs.
76+
val rootType = mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger)
7277

7378
canonicalizeType(rootType) match {
7479
case Some(st: StructType) => st

0 commit comments

Comments
 (0)