@@ -119,7 +119,8 @@ case class InsertIntoHiveTable(
119119 // Doesn't work in Scala 2.9 due to what may be a generics bug
120120 // TODO: Should we uncomment this for Scala 2.10?
121121 // conf.setOutputFormat(outputFormatClass)
122- conf.value.set(" mapred.output.format.class" , fileSinkConf.getTableInfo.getOutputFileFormatClassName)
122+ conf.value.set(" mapred.output.format.class" ,
123+ fileSinkConf.getTableInfo.getOutputFileFormatClassName)
123124 if (isCompressed) {
124125 // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
125126 // and "mapred.output.compression.type" have no impact on ORC because it uses table properties
@@ -136,7 +137,7 @@ case class InsertIntoHiveTable(
136137 SparkHiveHadoopWriter .createPathFromString(fileSinkConf.getDirName, conf.value))
137138 log.debug(" Saving as hadoop file of type " + valueClass.getSimpleName)
138139 var writer : SparkHiveHadoopWriter = null
139- // Map restore writesr for Dynamic Partition
140+ // Map restore writesr for Dynamic Partition
140141 var writerMap : scala.collection.mutable.HashMap [String , SparkHiveHadoopWriter ] = null
141142 if (dynamicPartNum == 0 ) {
142143 writer = new SparkHiveHadoopWriter (conf.value, fileSinkConf)
@@ -169,7 +170,8 @@ case class InsertIntoHiveTable(
169170 writer2 = writerMap.get(record._2) match {
170171 case Some (writer)=> writer
171172 case None => {
172- val tempWriter = new SparkHiveHadoopWriter (conf.value, new FileSinkDesc (partLocation, fileSinkConf.getTableInfo, false ))
173+ val tempWriter = new SparkHiveHadoopWriter (conf.value,
174+ new FileSinkDesc (partLocation, fileSinkConf.getTableInfo, false ))
173175 tempWriter.setup(context.stageId, context.partitionId, attemptNumber)
174176 tempWriter.open(record._2)
175177 writerMap += (record._2 -> tempWriter)
@@ -211,8 +213,11 @@ case class InsertIntoHiveTable(
211213 * return: /part2=val2
212214 * for sql: Insert.....tablename(part1=val1,part2,part3) select ....,val2,val3 from ...
213215 * return: /part2=val2/part3=val3
214- * */
215- private def getDynamicPartDir (partCols : Array [String ], row : Row , dynamicPartNum : Int , defaultPartName : String ): String = {
216+ */
217+ private def getDynamicPartDir (partCols : Array [String ],
218+ row : Row ,
219+ dynamicPartNum : Int ,
220+ defaultPartName : String ): String = {
216221 assert(dynamicPartNum > 0 )
217222 partCols
218223 .takeRight(dynamicPartNum)
@@ -269,7 +274,8 @@ case class InsertIntoHiveTable(
269274 if (! sc.hiveconf.getBoolVar(HiveConf .ConfVars .DYNAMICPARTITIONING )) {
270275 throw new SparkException (ErrorMsg .DYNAMIC_PARTITION_DISABLED .getMsg())
271276 }
272- if (numStaPart == 0 && sc.hiveconf.getVar(HiveConf .ConfVars .DYNAMICPARTITIONINGMODE ).equalsIgnoreCase(" strict" )) {
277+ if (numStaPart == 0 &&
278+ sc.hiveconf.getVar(HiveConf .ConfVars .DYNAMICPARTITIONINGMODE ).equalsIgnoreCase(" strict" )) {
273279 throw new SparkException (ErrorMsg .DYNAMIC_PARTITION_STRICT_MODE .getMsg())
274280 }
275281 // check if static partition appear after dynamic partitions
@@ -294,7 +300,8 @@ case class InsertIntoHiveTable(
294300
295301 val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray
296302 val outputData = new Array [Any ](fieldOIs.length)
297- val defaultPartName = jobConfSer.value.get(" hive.exec.default.partition.name " , " __HIVE_DEFAULT_PARTITION__" )
303+ val defaultPartName = jobConfSer.value.get(
304+ " hive.exec.default.partition.name " , " __HIVE_DEFAULT_PARTITION__" )
298305 var partColStr : Array [String ] = null ;
299306 if (fileSinkConf.getTableInfo.getProperties.getProperty(" partition_columns" ) != null ) {
300307 partColStr = fileSinkConf
0 commit comments