From a3186454ff5898f306e8e3ca8e511381b5cb1bff Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 14 Aug 2015 15:28:07 +0800 Subject: [PATCH] Fixes HadoopFsRelation speculative writes --- .../sql/execution/datasources/json/JSONRelation.scala | 11 +++++++---- .../datasources/parquet/ParquetRelation.scala | 6 ++++-- .../org/apache/spark/sql/hive/orc/OrcRelation.scala | 5 ++++- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala index 114c8b211891e..a4ac25029cb74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala @@ -169,18 +169,21 @@ private[json] class JsonOutputWriter( context: TaskAttemptContext) extends OutputWriter with SparkHadoopMapRedUtil with Logging { - val writer = new CharArrayWriter() + private val writer = new CharArrayWriter() + // create the Generator without separator inserted between 2 records - val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null) + private val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null) - val result = new Text() + private val result = new Text() private val recordWriter: RecordWriter[NullWritable, Text] = { new TextOutputFormat[NullWritable, Text]() { override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID") val split = context.getTaskAttemptID.getTaskID.getId - new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension") + val file = new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension") + file.getFileSystem(context.getConfiguration).delete(file, false) + file } }.getRecordWriter(context) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 52fac18ba187a..b4313843649d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -26,6 +26,7 @@ import scala.collection.mutable import scala.util.{Failure, Try} import com.google.common.base.Objects +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ @@ -40,7 +41,6 @@ import org.apache.parquet.{Log => ParquetLog} import org.apache.spark.{Logging, Partition => SparkPartition, SparkException} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.{SqlNewHadoopPartition, SqlNewHadoopRDD, RDD} -import org.apache.spark.rdd.RDD._ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.PartitionSpec @@ -82,7 +82,9 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID") val split = context.getTaskAttemptID.getTaskID.getId - new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension") + val file = new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension") + file.getFileSystem(context.getConfiguration).delete(file, false) + file } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 9f4f8b5789afe..6b8d39ae8c1ef 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -113,9 +113,12 @@ private[orc] class OrcOutputWriter( val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID") val partition = context.getTaskAttemptID.getTaskID.getId val filename = f"part-r-$partition%05d-$uniqueWriteJobId.orc" + val filePath = new Path(path, filename) + val fs = filePath.getFileSystem(conf) + fs.delete(filePath, false) new OrcOutputFormat().getRecordWriter( - new Path(path, filename).getFileSystem(conf), + fs, conf.asInstanceOf[JobConf], new Path(path, filename).toString, Reporter.NULL