Skip to content

Commit a2374a8

Browse files
baishuoliancheng
authored andcommitted
Update InsertIntoHiveTable.scala
1 parent 701a814 commit a2374a8

File tree

1 file changed

+130
-20
lines changed

1 file changed

+130
-20
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 130 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharOb
3434
import org.apache.hadoop.io.Writable
3535
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}
3636

37-
import org.apache.spark.{SparkException, TaskContext}
37+
import org.apache.spark.{SerializableWritable, SparkException, TaskContext}
3838
import org.apache.spark.annotation.DeveloperApi
3939
import org.apache.spark.rdd.RDD
4040
import org.apache.spark.sql.catalyst.expressions.Row
@@ -159,6 +159,30 @@ case class InsertIntoHiveTable(
159159
writer.commitJob()
160160
}
161161

162+
def getDynamicPartDir(tableInfo: TableDesc, row: Row, dynamicPartNum2: Int) :String = {
163+
println("tableInfo.class:" + tableInfo.getClass + "|row(2):" + row(2))
164+
println(tableInfo.getProperties.getProperty("columns") + "|" + tableInfo.getProperties.getProperty("partition_columns"))
165+
dynamicPartNum2 match {
166+
case 0 =>""
167+
case i => {
168+
val colsNum = tableInfo.getProperties.getProperty("columns").split("\\,").length
169+
val partColStr = tableInfo.getProperties.getProperty("partition_columns")
170+
val partCols = partColStr.split("/")
171+
var buf = new StringBuffer()
172+
if (partCols.length == dynamicPartNum2) {
173+
for (j <- 0 until partCols.length) {
174+
buf.append("/").append(partCols(j)).append("=").append(row(j + row.length - colsNum))
175+
}
176+
} else {
177+
for (j <- 0 until dynamicPartNum2) {
178+
buf.append("/").append(partCols(j + partCols.length - dynamicPartNum2)).append("=").append(row(j + colsNum))
179+
}
180+
}
181+
buf.toString
182+
}
183+
}
184+
}
185+
162186
override def execute() = result
163187

164188
/**
@@ -178,6 +202,12 @@ case class InsertIntoHiveTable(
178202
val tableLocation = table.hiveQlTable.getDataLocation
179203
val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation)
180204
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
205+
var dynamicPartNum = 0
206+
var dynamicPartPath = "";
207+
val partitionSpec = partition.map {
208+
case (key, Some(value)) => key -> value
209+
case (key, None) => { dynamicPartNum += 1; key -> "" }// Should not reach here right now.
210+
}
181211
val rdd = childRdd.mapPartitions { iter =>
182212
val serializer = newSerializer(fileSinkConf.getTableInfo)
183213
val standardOI = ObjectInspectorUtils
@@ -191,7 +221,10 @@ case class InsertIntoHiveTable(
191221
val outputData = new Array[Any](fieldOIs.length)
192222
iter.map { row =>
193223
var i = 0
194-
while (i < row.length) {
224+
while (i < fieldOIs.length) {
225+
if (fieldOIs.length < row.length && row.length - fieldOIs.length == dynamicPartNum) {
226+
dynamicPartPath = getDynamicPartDir(fileSinkConf.getTableInfo, row, dynamicPartNum)
227+
}
195228
// Casts Strings to HiveVarchars when necessary.
196229
outputData(i) = wrap(row(i), fieldOIs(i))
197230
i += 1
@@ -204,12 +237,81 @@ case class InsertIntoHiveTable(
204237
// ORC stores compression information in table properties. While, there are other formats
205238
// (e.g. RCFile) that rely on hadoop configurations to store compression information.
206239
val jobConf = new JobConf(sc.hiveconf)
207-
saveAsHiveFile(
208-
rdd,
209-
outputClass,
210-
fileSinkConf,
211-
jobConf,
212-
sc.hiveconf.getBoolean("hive.exec.compress.output", false))
240+
val jobConfSer = new SerializableWritable(jobConf)
241+
if (dynamicPartNum>0) {
242+
if (outputClass == null) {
243+
throw new SparkException("Output value class not set")
244+
}
245+
jobConfSer.value.setOutputValueClass(outputClass)
246+
if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) {
247+
throw new SparkException("Output format class not set")
248+
}
249+
// Doesn't work in Scala 2.9 due to what may be a generics bug
250+
// TODO: Should we uncomment this for Scala 2.10?
251+
// conf.setOutputFormat(outputFormatClass)
252+
jobConfSer.value.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName)
253+
if (sc.hiveconf.getBoolean("hive.exec.compress.output", false)) {
254+
// Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
255+
// and "mapred.output.compression.type" have no impact on ORC because it uses table properties
256+
// to store compression information.
257+
jobConfSer.value.set("mapred.output.compress", "true")
258+
fileSinkConf.setCompressed(true)
259+
fileSinkConf.setCompressCodec(jobConfSer.value.get("mapred.output.compression.codec"))
260+
fileSinkConf.setCompressType(jobConfSer.value.get("mapred.output.compression.type"))
261+
}
262+
jobConfSer.value.setOutputCommitter(classOf[FileOutputCommitter])
263+
264+
FileOutputFormat.setOutputPath(
265+
jobConfSer.value,
266+
SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, jobConfSer.value))
267+
268+
var writerMap = new scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter]
269+
def writeToFile2(context: TaskContext, iter: Iterator[Writable]) {
270+
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
271+
// around by taking a mod. We expect that no task will be attempted 2 billion times.
272+
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
273+
val serializer = newSerializer(fileSinkConf.getTableInfo)
274+
var count = 0
275+
var writer2:SparkHiveHadoopWriter = null
276+
while(iter.hasNext) {
277+
val record = iter.next();
278+
val location = fileSinkConf.getDirName
279+
val partLocation = location + dynamicPartPath
280+
writer2=writerMap.get(dynamicPartPath) match {
281+
case Some(writer)=> writer
282+
case None => {
283+
val tempWriter = new SparkHiveHadoopWriter(jobConfSer.value, new FileSinkDesc(partLocation, fileSinkConf.getTableInfo, false))
284+
tempWriter.setup(context.stageId, context.partitionId, attemptNumber)
285+
tempWriter.open(dynamicPartPath);
286+
writerMap += (dynamicPartPath -> tempWriter)
287+
tempWriter
288+
}
289+
}
290+
count += 1
291+
writer2.write(record)
292+
}
293+
for((k,v) <- writerMap) {
294+
v.close()
295+
v.commit()
296+
}
297+
}
298+
299+
sc.sparkContext.runJob(rdd, writeToFile2 _)
300+
301+
for((k,v) <- writerMap) {
302+
v.commitJob()
303+
}
304+
writerMap.clear()
305+
//writer.commitJob()
306+
307+
} else {
308+
saveAsHiveFile(
309+
rdd,
310+
outputClass,
311+
fileSinkConf,
312+
jobConf,
313+
sc.hiveconf.getBoolean("hive.exec.compress.output", false))
314+
}
213315

214316
// TODO: Handle dynamic partitioning.
215317
val outputPath = FileOutputFormat.getOutputPath(jobConf)
@@ -220,25 +322,33 @@ case class InsertIntoHiveTable(
220322
// holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint.
221323
val holdDDLTime = false
222324
if (partition.nonEmpty) {
223-
val partitionSpec = partition.map {
224-
case (key, Some(value)) => key -> value
225-
case (key, None) => key -> "" // Should not reach here right now.
226-
}
227325
val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols, partitionSpec)
228326
db.validatePartitionNameCharacters(partVals)
229327
// inheritTableSpecs is set to true. It should be set to false for a IMPORT query
230328
// which is currently considered as a Hive native command.
231329
val inheritTableSpecs = true
232330
// TODO: Correctly set isSkewedStoreAsSubdir.
233331
val isSkewedStoreAsSubdir = false
234-
db.loadPartition(
235-
outputPath,
236-
qualifiedTableName,
237-
partitionSpec,
238-
overwrite,
239-
holdDDLTime,
240-
inheritTableSpecs,
241-
isSkewedStoreAsSubdir)
332+
if (dynamicPartNum>0) {
333+
db.loadDynamicPartitions(
334+
outputPath,
335+
qualifiedTableName,
336+
partitionSpec,
337+
overwrite,
338+
dynamicPartNum/*dpCtx.getNumDPCols()*/,
339+
holdDDLTime,
340+
isSkewedStoreAsSubdir
341+
)
342+
} else {
343+
db.loadPartition(
344+
outputPath,
345+
qualifiedTableName,
346+
partitionSpec,
347+
overwrite,
348+
holdDDLTime,
349+
inheritTableSpecs,
350+
isSkewedStoreAsSubdir)
351+
}
242352
} else {
243353
db.loadTable(
244354
outputPath,

0 commit comments

Comments
 (0)