@@ -33,18 +33,16 @@ import org.apache.hadoop.io.Writable
3333import org .apache .hadoop .mapred ._
3434import org .apache .hadoop .mapreduce .TaskType
3535
36- import org .apache .spark .{ Logging , SerializableWritable }
36+ import org .apache .spark ._
3737import org .apache .spark .mapred .SparkHadoopMapRedUtil
38- import org .apache .spark .sql .catalyst .util .DateTimeUtils
3938import org .apache .spark .sql .execution .UnsafeKVExternalSorter
40- import org .apache .spark ._
4139import org .apache .spark .sql .catalyst .InternalRow
4240import org .apache .spark .sql .catalyst .expressions ._
4341import org .apache .spark .sql .hive .HiveShim .{ShimFileSinkDesc => FileSinkDesc }
4442import org .apache .spark .sql .types ._
4543import org .apache .spark .util .SerializableJobConf
4644
47- import scala .collection .JavaConversions ._
45+ import scala .collection .JavaConverters ._
4846
4947/**
5048 * Internal helper class that saves an RDD using a Hive OutputFormat.
@@ -171,7 +169,7 @@ private[hive] class SparkHiveWriterContainer(
171169 ObjectInspectorCopyOption .JAVA )
172170 .asInstanceOf [StructObjectInspector ]
173171
174- val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray
172+ val fieldOIs = standardOI.getAllStructFieldRefs.asScala. map(_.getFieldObjectInspector).toArray
175173 val dataTypes = inputSchema.map(_.dataType)
176174 val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt) }
177175 val outputData = new Array [Any ](fieldOIs.length)
@@ -213,8 +211,7 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
213211 fileSinkConf : FileSinkDesc ,
214212 dynamicPartColNames : Array [String ],
215213 inputSchema : Seq [Attribute ],
216- table : MetastoreRelation ,
217- maxOpenFiles : Int )
214+ table : MetastoreRelation )
218215 extends SparkHiveWriterContainer (jobConf, fileSinkConf, inputSchema, table) {
219216
220217 import SparkHiveDynamicPartitionWriterContainer ._
@@ -246,16 +243,14 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
246243
247244 // this function is executed on executor side
248245 override def writeToFile (context : TaskContext , iterator : Iterator [InternalRow ]): Unit = {
249- val outputWriters = new java.util.HashMap [InternalRow , FileSinkOperator .RecordWriter ]
250-
251246 val serializer = newSerializer(fileSinkConf.getTableInfo)
252247 val standardOI = ObjectInspectorUtils
253248 .getStandardObjectInspector(
254249 fileSinkConf.getTableInfo.getDeserializer.getObjectInspector,
255250 ObjectInspectorCopyOption .JAVA )
256251 .asInstanceOf [StructObjectInspector ]
257252
258- val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray
253+ val fieldOIs = standardOI.getAllStructFieldRefs.asScala. map(_.getFieldObjectInspector).toArray
259254 val dataTypes = inputSchema.map(_.dataType)
260255 val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt) }
261256 val outputData = new Array [Any ](fieldOIs.length)
@@ -284,97 +279,49 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
284279
285280 // If anything below fails, we should abort the task.
286281 try {
287- // This will be filled in if we have to fall back on sorting.
288- var sorter : UnsafeKVExternalSorter = null
289- while (iterator.hasNext && sorter == null ) {
282+ val sorter : UnsafeKVExternalSorter = new UnsafeKVExternalSorter (
283+ StructType .fromAttributes(partitionOutput),
284+ StructType .fromAttributes(dataOutput),
285+ SparkEnv .get.blockManager,
286+ TaskContext .get().taskMemoryManager().pageSizeBytes)
287+
288+ while (iterator.hasNext) {
290289 val inputRow = iterator.next()
291290 val currentKey = getPartitionKey(inputRow)
292- var currentWriter = outputWriters.get(currentKey)
291+ sorter.insertKV(currentKey, getOutputRow(inputRow))
292+ }
293293
294- if (currentWriter == null ) {
295- if (outputWriters.size < maxOpenFiles) {
296- currentWriter = newOutputWriter(currentKey)
297- outputWriters.put(currentKey.copy(), currentWriter)
298- var i = 0
299- while (i < fieldOIs.length) {
300- outputData(i) = if (inputRow.isNullAt(i)) {
301- null
302- } else {
303- wrappers(i)(inputRow.get(i, dataTypes(i)))
304- }
305- i += 1
294+ logInfo(s " Sorting complete. Writing out partition files one at a time. " )
295+ val sortedIterator = sorter.sortedIterator()
296+ var currentKey : InternalRow = null
297+ var currentWriter : FileSinkOperator .RecordWriter = null
298+ try {
299+ while (sortedIterator.next()) {
300+ if (currentKey != sortedIterator.getKey) {
301+ if (currentWriter != null ) {
302+ currentWriter.close(false )
306303 }
307- currentWriter.write(serializer.serialize(outputData, standardOI))
308- } else {
309- logInfo(s " Maximum partitions reached, falling back on sorting. " )
310- sorter = new UnsafeKVExternalSorter (
311- StructType .fromAttributes(partitionOutput),
312- StructType .fromAttributes(dataOutput),
313- SparkEnv .get.blockManager,
314- TaskContext .get().taskMemoryManager().pageSizeBytes)
315- sorter.insertKV(currentKey, getOutputRow(inputRow))
304+ currentKey = sortedIterator.getKey.copy()
305+ logDebug(s " Writing partition: $currentKey" )
306+ currentWriter = newOutputWriter(currentKey)
316307 }
317- } else {
308+
318309 var i = 0
319310 while (i < fieldOIs.length) {
320- outputData(i) = if (inputRow .isNullAt(i)) {
311+ outputData(i) = if (sortedIterator.getValue .isNullAt(i)) {
321312 null
322313 } else {
323- wrappers(i)(inputRow .get(i, dataTypes(i)))
314+ wrappers(i)(sortedIterator.getValue .get(i, dataTypes(i)))
324315 }
325316 i += 1
326317 }
327318 currentWriter.write(serializer.serialize(outputData, standardOI))
328319 }
329- }
330-
331- // If the sorter is not null that means that we reached the maxFiles above and need to finish
332- // using external sort.
333- if (sorter != null ) {
334- while (iterator.hasNext) {
335- val currentRow = iterator.next()
336- sorter.insertKV(getPartitionKey(currentRow), getOutputRow(currentRow))
337- }
338-
339- logInfo(s " Sorting complete. Writing out partition files one at a time. " )
340-
341- val sortedIterator = sorter.sortedIterator()
342- var currentKey : InternalRow = null
343- var currentWriter : FileSinkOperator .RecordWriter = null
344- try {
345- while (sortedIterator.next()) {
346- if (currentKey != sortedIterator.getKey) {
347- if (currentWriter != null ) {
348- currentWriter.close(false )
349- }
350- currentKey = sortedIterator.getKey.copy()
351- logDebug(s " Writing partition: $currentKey" )
352-
353- // Either use an existing file from before, or open a new one.
354- currentWriter = outputWriters.remove(currentKey)
355- if (currentWriter == null ) {
356- currentWriter = newOutputWriter(currentKey)
357- }
358- }
359-
360- var i = 0
361- while (i < fieldOIs.length) {
362- outputData(i) = if (sortedIterator.getValue.isNullAt(i)) {
363- null
364- } else {
365- wrappers(i)(sortedIterator.getValue.get(i, dataTypes(i)))
366- }
367- i += 1
368- }
369- currentWriter.write(serializer.serialize(outputData, standardOI))
370- }
371- } finally {
372- if (currentWriter != null ) {
373- currentWriter.close(false )
374- }
320+ } finally {
321+ if (currentWriter != null ) {
322+ currentWriter.close(false )
375323 }
376324 }
377- clearOutputWriters
378325 commit()
379326 } catch {
380327 case cause : Throwable =>
@@ -406,18 +353,5 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
406353 path,
407354 Reporter .NULL )
408355 }
409-
410- def clearOutputWriters (): Unit = {
411- outputWriters.values.foreach(_.close(false ))
412- outputWriters.clear()
413- }
414-
415- def abortTask (): Unit = {
416- try {
417- clearOutputWriters()
418- } finally {
419- super .abortTask()
420- }
421- }
422356 }
423357}
0 commit comments