Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -169,18 +169,21 @@ private[json] class JsonOutputWriter(
context: TaskAttemptContext)
extends OutputWriter with SparkHadoopMapRedUtil with Logging {

val writer = new CharArrayWriter()
private val writer = new CharArrayWriter()

// create the Generator without separator inserted between 2 records
val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
private val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)

val result = new Text()
private val result = new Text()

private val recordWriter: RecordWriter[NullWritable, Text] = {
new TextOutputFormat[NullWritable, Text]() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID")
val split = context.getTaskAttemptID.getTaskID.getId
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
val file = new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
file.getFileSystem(context.getConfiguration).delete(file, false)
file
}
}.getRecordWriter(context)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.collection.mutable
import scala.util.{Failure, Try}

import com.google.common.base.Objects
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
Expand All @@ -40,7 +41,6 @@ import org.apache.parquet.{Log => ParquetLog}
import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.{SqlNewHadoopPartition, SqlNewHadoopRDD, RDD}
import org.apache.spark.rdd.RDD._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionSpec
Expand Down Expand Up @@ -82,7 +82,9 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID")
val split = context.getTaskAttemptID.getTaskID.getId
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
val file = new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
file.getFileSystem(context.getConfiguration).delete(file, false)
file
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,12 @@ private[orc] class OrcOutputWriter(
val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID")
val partition = context.getTaskAttemptID.getTaskID.getId
val filename = f"part-r-$partition%05d-$uniqueWriteJobId.orc"
val filePath = new Path(path, filename)
val fs = filePath.getFileSystem(conf)
fs.delete(filePath, false)

new OrcOutputFormat().getRecordWriter(
new Path(path, filename).getFileSystem(conf),
fs,
conf.asInstanceOf[JobConf],
new Path(path, filename).toString,
Reporter.NULL
Expand Down