Skip to content

Commit 13279e6

Browse files
Refactor the logic of eagerly processing SET commands.
1 parent b14b83e commit 13279e6

File tree

2 files changed

+23
-24
lines changed

2 files changed

+23
-24
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,22 @@ class SQLContext(@transient val sparkContext: SparkContext)
272272
protected abstract class QueryExecution {
273273
def logical: LogicalPlan
274274

275+
def eagerlyProcess(plan: LogicalPlan): RDD[Row] = plan match {
276+
case SetCommand(key, value) =>
277+
// Only this case needs to be executed eagerly. The other cases will
278+
// be taken care of when the actual results are being extracted.
279+
// In the case of HiveContext, sqlConf is overridden to also pass the
280+
// pair into its HiveConf.
281+
if (key.isDefined && value.isDefined) {
282+
sqlConf.set(key.get, value.get)
283+
}
284+
// It doesn't matter what we return here, since this is only used
285+
// to force the evaluation to happen eagerly. To query the results,
286+
// one must use SchemaRDD operations to extract them.
287+
emptyResult
288+
case _ => executedPlan.execute()
289+
}
290+
275291
lazy val analyzed = analyzer(logical)
276292
lazy val optimizedPlan = optimizer(analyzed)
277293
// TODO: Don't just pick the first one...
@@ -281,14 +297,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
281297
/** Internal version of the RDD. Avoids copies and has no schema */
282298
lazy val toRdd: RDD[Row] = {
283299
logical match {
284-
case SetCommand(key, value) =>
285-
if (key.isDefined && value.isDefined) {
286-
sqlConf.set(key.get, value.get)
287-
}
288-
// It doesn't matter what we return here, since toRdd is used
289-
// to force the evaluation happen eagerly. To query the results,
290-
// one must use SchemaRDD operations to extract them.
291-
emptyResult
300+
case s: SetCommand => eagerlyProcess(s)
292301
case _ => executedPlan.execute()
293302
}
294303
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
257257
optimizer(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))
258258

259259
override lazy val toRdd: RDD[Row] = {
260-
261260
def processCmd(cmd: String): RDD[Row] = {
262261
val output = runSqlHive(cmd)
263262
if (output.size == 0) {
@@ -268,21 +267,12 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
268267
}
269268
}
270269

271-
analyzed match {
272-
case SetCommand(key, value) =>
273-
// Record the set command inside SQLConf, as well as have Hive execute it.
274-
if (key.isDefined && value.isDefined) {
275-
sqlConf.set(key.get, value.get)
276-
processCmd(s"SET $key=$value")
277-
}
278-
// Only the above case needs to be executed in Hive eagerly (i.e. now).
279-
// The other cases will be taken care of when the actual results are
280-
// being extracted.
281-
emptyResult
282-
case NativeCommand(cmd) =>
283-
processCmd(cmd)
284-
case _ =>
285-
executedPlan.execute().map(_.copy())
270+
logical match {
271+
case s: SetCommand => eagerlyProcess(s)
272+
case _ => analyzed match {
273+
case NativeCommand(cmd) => processCmd(cmd)
274+
case _ => executedPlan.execute().map(_.copy())
275+
}
286276
}
287277
}
288278

0 commit comments

Comments
 (0)