@@ -19,8 +19,6 @@ package org.apache.spark.sql.sources
1919
2020import java .util .{Date , UUID }
2121
22- import scala .collection .mutable
23-
2422import org .apache .hadoop .fs .Path
2523import org .apache .hadoop .mapreduce ._
2624import org .apache .hadoop .mapreduce .lib .output .{FileOutputCommitter => MapReduceFileOutputCommitter , FileOutputFormat }
@@ -110,7 +108,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
110108 ! exists
111109 }
112110 // If we are appending data to an existing dir.
113- val isAppend = ( pathExists) && (mode == SaveMode .Append )
111+ val isAppend = pathExists && (mode == SaveMode .Append )
114112
115113 if (doInsertion) {
116114 val job = new Job (hadoopConf)
@@ -142,9 +140,12 @@ private[sql] case class InsertIntoHadoopFsRelation(
142140 }
143141 }
144142
145- Seq .empty[InternalRow ]
143+ Seq .empty[Row ]
146144 }
147145
146+ /**
147+ * Inserts the content of the [[DataFrame ]] into a table without any partitioning columns.
148+ */
148149 private def insert (writerContainer : BaseWriterContainer , df : DataFrame ): Unit = {
149150 // Uses local vals for serialization
150151 val needsConversion = relation.needConversion
@@ -188,6 +189,9 @@ private[sql] case class InsertIntoHadoopFsRelation(
188189 }
189190 }
190191
192+ /**
193+ * Inserts the content of the [[DataFrame ]] into a table with partitioning columns.
194+ */
191195 private def insertWithDynamicPartitions (
192196 sqlContext : SQLContext ,
193197 writerContainer : BaseWriterContainer ,
@@ -497,13 +501,14 @@ private[sql] class DynamicPartitionWriterContainer(
497501 extends BaseWriterContainer (relation, job, isAppend) {
498502
499503 // All output writers are created on executor side.
500- @ transient protected var outputWriters : mutable. Map [String , OutputWriter ] = _
504+ @ transient protected var outputWriters : java.util. HashMap [String , OutputWriter ] = _
501505
502506 override protected def initWriters (): Unit = {
503- outputWriters = mutable. Map .empty [String , OutputWriter ]
507+ outputWriters = new java.util. HashMap [String , OutputWriter ]
504508 }
505509
506510 override def outputWriterForRow (row : Row ): OutputWriter = {
511+ // TODO (SPARK-8888): zip and all the stuff happening here is very inefficient.
507512 val partitionPath = partitionColumns.zip(row.toSeq).map { case (col, rawValue) =>
508513 val string = if (rawValue == null ) null else String .valueOf(rawValue)
509514 val valueString = if (string == null || string.isEmpty) {
@@ -514,18 +519,23 @@ private[sql] class DynamicPartitionWriterContainer(
514519 s " / $col= $valueString"
515520 }.mkString.stripPrefix(Path .SEPARATOR )
516521
517- outputWriters.getOrElseUpdate(partitionPath, {
522+ val writer = outputWriters.get(partitionPath)
523+ if (writer.eq(null )) {
518524 val path = new Path (getWorkPath, partitionPath)
519- taskAttemptContext.getConfiguration.set(
520- " spark.sql.sources.output.path" ,
525+ taskAttemptContext.getConfiguration.set(" spark.sql.sources.output.path" ,
521526 new Path (outputPath, partitionPath).toString)
522- outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
523- })
527+ val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
528+ outputWriters.put(partitionPath, newWriter)
529+ newWriter
530+ } else {
531+ writer
532+ }
524533 }
525534
526535 private def clearOutputWriters (): Unit = {
527- if (outputWriters.nonEmpty) {
528- outputWriters.values.foreach(_.close())
536+ if (! outputWriters.isEmpty) {
537+ val iter = scala.collection.JavaConversions .asScalaIterator(outputWriters.values().iterator())
538+ iter.foreach(_.close())
529539 outputWriters.clear()
530540 }
531541 }
0 commit comments