Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
import org.apache.spark.sql.catalyst.util.{DropMalformedMode, FailFastMode, ParseMode, PermissiveMode}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -69,10 +70,17 @@ private[sql] object JsonInferSchema {
}.reduceOption(typeMerger).toIterator
}

// Here we get RDD local iterator then fold, instead of calling `RDD.fold` directly, because
// `RDD.fold` will run the fold function in DAGScheduler event loop thread, which may not have
// active SparkSession and `SQLConf.get` may point to the wrong configs.
val rootType = mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger)
// Here we manually submit a fold-like Spark job, so that we can set the SQLConf when running
// the fold functions in the scheduler event loop thread.
val existingConf = SQLConf.get
var rootType: DataType = StructType(Nil)
val foldPartition = (iter: Iterator[DataType]) => iter.fold(StructType(Nil))(typeMerger)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to do sc.clean(typeMerger) manually here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This closure is defined by us and I don't think we leak outer reference here. If we do, it's a bug and we should fix it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, agreed.

val mergeResult = (index: Int, taskResult: DataType) => {
rootType = SQLConf.withExistingConf(existingConf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a question, wouldn't:

val partitionsResult = json.sparkContext.runJob(mergedTypesFromPartitions, foldPartition)
partitionsResult.fold(typeMerger)

do the same without requiring these changes?

Copy link
Contributor Author

@cloud-fan cloud-fan Aug 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can work, but the problem is, we have to keep a large result array which can cause GC problems.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would contain one result per partition, do you think this is enough to cause GC problems?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the schema can be very complex (e.g. very wide and deep schema).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, makes sense, thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question was in my mind. thanks for clarification.

typeMerger(rootType, taskResult)
}
}
json.sparkContext.runJob(mergedTypesFromPartitions, foldPartition, mergeResult)

canonicalizeType(rootType, configOptions) match {
case Some(st: StructType) => st
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,19 @@ object SQLConf {
/** See [[get]] for more information. */
def getFallbackConf: SQLConf = fallbackConf.get()

private lazy val existingConf = new ThreadLocal[SQLConf] {
override def initialValue: SQLConf = null
}

def withExistingConf[T](conf: SQLConf)(f: => T): T = {
existingConf.set(conf)
try {
f
} finally {
existingConf.remove()
}
}

/**
* Defines a getter that returns the SQLConf within scope.
* See [[get]] for more information.
Expand Down Expand Up @@ -116,16 +129,24 @@ object SQLConf {
if (TaskContext.get != null) {
new ReadOnlySQLConf(TaskContext.get())
} else {
if (Utils.isTesting && SparkContext.getActive.isDefined) {
val isSchedulerEventLoopThread = SparkContext.getActive
.map(_.dagScheduler.eventProcessLoop.eventThread)
.exists(_.getId == Thread.currentThread().getId)
if (isSchedulerEventLoopThread) {
// DAGScheduler event loop thread does not have an active SparkSession, the `confGetter`
// will return `fallbackConf` which is unexpected. Here we prevent it from happening.
val schedulerEventLoopThread =
SparkContext.getActive.get.dagScheduler.eventProcessLoop.eventThread
if (schedulerEventLoopThread.getId == Thread.currentThread().getId) {
// will return `fallbackConf` which is unexpected. Here we require the caller to get the
// conf within `withExistingConf`, otherwise fail the query.
val conf = existingConf.get()
if (conf != null) {
conf
} else if (Utils.isTesting) {
throw new RuntimeException("Cannot get SQLConf inside scheduler event loop thread.")
} else {
confGetter.get()()
}
} else {
confGetter.get()()
}
confGetter.get()()
}
}

Expand Down
24 changes: 24 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import scala.util.Random
import org.scalatest.Matchers._

import org.apache.spark.SparkException
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.Uuid
import org.apache.spark.sql.catalyst.plans.logical.{Filter, OneRowRelation, Union}
Expand Down Expand Up @@ -2528,4 +2529,27 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
checkAnswer(aggPlusFilter1, aggPlusFilter2.collect())
}
}

test("SPARK-25159: json schema inference should only trigger one job") {
withTempPath { path =>
// This test is to prove that the `JsonInferSchema` does not use `RDD#toLocalIterator` which
// triggers one Spark job per RDD partition.
Seq(1 -> "a", 2 -> "b").toDF("i", "p")
// The data set has 2 partitions, so Spark will write at least 2 json files.
// Use a non-splittable compression (gzip), to make sure the json scan RDD has at least 2
// partitions.
.write.partitionBy("p").option("compression", "gzip").json(path.getCanonicalPath)

var numJobs = 0
sparkContext.addSparkListener(new SparkListener {
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
numJobs += 1
}
})

val df = spark.read.json(path.getCanonicalPath)
assert(df.columns === Array("i", "p"))
assert(numJobs == 1)
}
}
}