Skip to content

Commit a318645

Browse files
committed
Fixes HadoopFsRelation speculative writes
1 parent 6993031 commit a318645

File tree

3 files changed

+15
-7
lines changed

3 files changed

+15
-7
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,18 +169,21 @@ private[json] class JsonOutputWriter(
169169
context: TaskAttemptContext)
170170
extends OutputWriter with SparkHadoopMapRedUtil with Logging {
171171

172-
val writer = new CharArrayWriter()
172+
private val writer = new CharArrayWriter()
173+
173174
// create the Generator without separator inserted between 2 records
174-
val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
175+
private val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
175176

176-
val result = new Text()
177+
private val result = new Text()
177178

178179
private val recordWriter: RecordWriter[NullWritable, Text] = {
179180
new TextOutputFormat[NullWritable, Text]() {
180181
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
181182
val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID")
182183
val split = context.getTaskAttemptID.getTaskID.getId
183-
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
184+
val file = new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
185+
file.getFileSystem(context.getConfiguration).delete(file, false)
186+
file
184187
}
185188
}.getRecordWriter(context)
186189
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import scala.collection.mutable
2626
import scala.util.{Failure, Try}
2727

2828
import com.google.common.base.Objects
29+
import org.apache.hadoop.conf.Configuration
2930
import org.apache.hadoop.fs.{FileStatus, Path}
3031
import org.apache.hadoop.io.Writable
3132
import org.apache.hadoop.mapreduce._
@@ -40,7 +41,6 @@ import org.apache.parquet.{Log => ParquetLog}
4041
import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
4142
import org.apache.spark.broadcast.Broadcast
4243
import org.apache.spark.rdd.{SqlNewHadoopPartition, SqlNewHadoopRDD, RDD}
43-
import org.apache.spark.rdd.RDD._
4444
import org.apache.spark.sql._
4545
import org.apache.spark.sql.catalyst.InternalRow
4646
import org.apache.spark.sql.execution.datasources.PartitionSpec
@@ -82,7 +82,9 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext
8282
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
8383
val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID")
8484
val split = context.getTaskAttemptID.getTaskID.getId
85-
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
85+
val file = new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
86+
file.getFileSystem(context.getConfiguration).delete(file, false)
87+
file
8688
}
8789
}
8890
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,12 @@ private[orc] class OrcOutputWriter(
113113
val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID")
114114
val partition = context.getTaskAttemptID.getTaskID.getId
115115
val filename = f"part-r-$partition%05d-$uniqueWriteJobId.orc"
116+
val filePath = new Path(path, filename)
117+
val fs = filePath.getFileSystem(conf)
118+
fs.delete(filePath, false)
116119

117120
new OrcOutputFormat().getRecordWriter(
118-
new Path(path, filename).getFileSystem(conf),
121+
fs,
119122
conf.asInstanceOf[JobConf],
120123
new Path(path, filename).toString,
121124
Reporter.NULL

0 commit comments

Comments
 (0)