@@ -30,16 +30,20 @@ import org.apache.hadoop.hive.ql.Driver
3030import org .apache .hadoop .hive .ql .processors ._
3131import org .apache .hadoop .hive .ql .session .SessionState
3232
33- import org .apache .spark .annotation .DeveloperApi
3433import org .apache .spark .SparkContext
3534import org .apache .spark .rdd .RDD
3635import org .apache .spark .sql .catalyst .analysis .{Analyzer , OverrideCatalog }
3736import org .apache .spark .sql .catalyst .expressions .GenericRow
38- import org .apache .spark .sql .catalyst .plans .logical .{LogicalPlan , LowerCaseSchema }
39- import org .apache .spark .sql .catalyst .plans .logical .{NativeCommand , ExplainCommand }
37+ import org .apache .spark .sql .catalyst .plans .logical ._
4038import org .apache .spark .sql .catalyst .ScalaReflection
4139import org .apache .spark .sql .catalyst .types ._
4240import org .apache .spark .sql .execution ._
41+ import org .apache .spark .sql .catalyst .types .StructType
42+ import org .apache .spark .sql .catalyst .plans .logical .NativeCommand
43+ import org .apache .spark .sql .catalyst .plans .logical .ExplainCommand
44+ import org .apache .spark .sql .catalyst .types .ArrayType
45+ import org .apache .spark .sql .catalyst .plans .logical .LowerCaseSchema
46+ import org .apache .spark .sql .catalyst .types .MapType
4347
4448/* Implicit conversions */
4549import scala .collection .JavaConversions ._
@@ -168,7 +172,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
168172 /**
169173 * Execute the command using Hive and return the results as a sequence. Each element
170174 * in the sequence is one row.
171- */
175+ */ `````
172176 protected def runHive (cmd : String , maxRows : Int = 1000 ): Seq [String ] = {
173177 try {
174178 val cmd_trimmed : String = cmd.trim()
@@ -240,20 +244,30 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
240244 override lazy val optimizedPlan =
241245 optimizer(catalog.PreInsertionCasts (catalog.CreateTables (analyzed)))
242246
243- override lazy val toRdd : RDD [Row ] =
247+ override lazy val toRdd : RDD [Row ] = {
248+
249+ def processCmd (cmd : String ): RDD [Row ] = {
250+ val output = runSqlHive(cmd)
251+ if (output.size == 0 ) {
252+ emptyResult
253+ } else {
254+ val asRows = output.map(r => new GenericRow (r.split(" \t " ).asInstanceOf [Array [Any ]]))
255+ sparkContext.parallelize(asRows, 1 )
256+ }
257+ }
258+
244259 analyzed match {
260+ case SetCommand (key, value) =>
261+ logger.debug(" inside Hive's toRdd -- matched SetCommand" )
262+ // Record the set command inside SQLConf, as well as have Hive execute it.
263+ sqlConf.set(key, value)
264+ processCmd(s " set $key= $value" )
245265 case NativeCommand (cmd) =>
246- val output = runSqlHive(cmd)
247-
248- if (output.size == 0 ) {
249- emptyResult
250- } else {
251- val asRows = output.map(r => new GenericRow (r.split(" \t " ).asInstanceOf [Array [Any ]]))
252- sparkContext.parallelize(asRows, 1 )
253- }
266+ processCmd(cmd)
254267 case _ =>
255268 executedPlan.execute().map(_.copy())
256269 }
270+ }
257271
258272 protected val primitiveTypes =
259273 Seq (StringType , IntegerType , LongType , DoubleType , FloatType , BooleanType , ByteType ,
@@ -312,6 +326,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
312326 override def simpleString : String =
313327 logical match {
314328 case _ : NativeCommand => " <Executed by Hive>"
329+ case _ : SetCommand => " <Set Command: Executed by Hive, and noted by SQLContext>"
315330 case _ => executedPlan.toString
316331 }
317332 }
0 commit comments