@@ -21,28 +21,31 @@ import java.io.IOException
2121import java .text .NumberFormat
2222import java .util .Date
2323
24+ import scala .collection .mutable
25+
2426import org .apache .hadoop .fs .Path
2527import org .apache .hadoop .hive .ql .exec .{FileSinkOperator , Utilities }
2628import org .apache .hadoop .hive .ql .io .{HiveFileFormatUtils , HiveOutputFormat }
2729import org .apache .hadoop .hive .ql .plan .FileSinkDesc
28- import org .apache .hadoop .mapred ._
2930import org .apache .hadoop .io .Writable
31+ import org .apache .hadoop .mapred ._
3032
33+ import org .apache .spark .sql .Row
3134import org .apache .spark .{Logging , SerializableWritable , SparkHadoopWriter }
3235
3336/**
3437 * Internal helper class that saves an RDD using a Hive OutputFormat.
3538 * It is based on [[SparkHadoopWriter ]].
3639 */
37- private [hive] class SparkHiveHadoopWriter (
40+ private [hive] class SparkHiveWriterContainer (
3841 @ transient jobConf : JobConf ,
3942 fileSinkConf : FileSinkDesc )
4043 extends Logging
4144 with SparkHadoopMapRedUtil
4245 with Serializable {
4346
4447 private val now = new Date ()
45- private val conf = new SerializableWritable (jobConf)
48+ protected val conf = new SerializableWritable (jobConf)
4649
4750 private var jobID = 0
4851 private var splitID = 0
@@ -51,152 +54,75 @@ private[hive] class SparkHiveHadoopWriter(
5154 private var taID : SerializableWritable [TaskAttemptID ] = null
5255
5356 @ transient private var writer : FileSinkOperator .RecordWriter = null
54- @ transient private var format : HiveOutputFormat [AnyRef , Writable ] = null
55- @ transient private var committer : OutputCommitter = null
56- @ transient private var jobContext : JobContext = null
57- @ transient private var taskContext : TaskAttemptContext = null
57+ @ transient private lazy val committer = conf.value.getOutputCommitter
58+ @ transient private lazy val jobContext = newJobContext(conf.value, jID.value)
59+ @ transient private lazy val taskContext = newTaskAttemptContext(conf.value, taID.value)
60+ @ transient private lazy val outputFormat =
61+ conf.value.getOutputFormat.asInstanceOf [HiveOutputFormat [AnyRef ,Writable ]]
5862
59- def preSetup () {
63+ def driverSideSetup () {
6064 setIDs(0 , 0 , 0 )
6165 setConfParams()
62-
63- val jCtxt = getJobContext()
64- getOutputCommitter().setupJob(jCtxt)
66+ committer.setupJob(jobContext)
6567 }
6668
67-
68- def setup (jobid : Int , splitid : Int , attemptid : Int ) {
69- setIDs(jobid, splitid, attemptid)
69+ def executorSideSetup (jobId : Int , splitId : Int , attemptId : Int ) {
70+ setIDs(jobId, splitId, attemptId)
7071 setConfParams()
71- }
72-
73- def open () {
74- val numfmt = NumberFormat .getInstance()
75- numfmt.setMinimumIntegerDigits(5 )
76- numfmt.setGroupingUsed(false )
77-
78- val extension = Utilities .getFileExtension(
79- conf.value,
80- fileSinkConf.getCompressed,
81- getOutputFormat())
82-
83- val outputName = " part-" + numfmt.format(splitID) + extension
84- val path = FileOutputFormat .getTaskOutputPath(conf.value, outputName)
85-
86- getOutputCommitter().setupTask(getTaskContext())
87- writer = HiveFileFormatUtils .getHiveRecordWriter(
88- conf.value,
89- fileSinkConf.getTableInfo,
90- conf.value.getOutputValueClass.asInstanceOf [Class [Writable ]],
91- fileSinkConf,
92- path,
93- null )
72+ committer.setupTask(taskContext)
9473 }
9574
9675 /**
97- * create an HiveRecordWriter. imitate the above function open()
98- * @param dynamicPartPath the relative path for dynamic partition
99- *
100- * since this function is used to create different writer for
101- * different dynamic partition.So we need a parameter dynamicPartPath
102- * and use it we can calculate a new path and pass the new path to
103- * the function HiveFileFormatUtils.getHiveRecordWriter
76+ * Create a `HiveRecordWriter`. A relative dynamic partition path can be used to create a writer
77+ * for writing data to a dynamic partition.
10478 */
105- def open (dynamicPartPath : String ) {
106- val numfmt = NumberFormat .getInstance()
107- numfmt.setMinimumIntegerDigits(5 )
108- numfmt.setGroupingUsed(false )
109-
110- val extension = Utilities .getFileExtension(
111- conf.value,
112- fileSinkConf.getCompressed,
113- getOutputFormat())
114-
115- val outputName = " part-" + numfmt.format(splitID) + extension
116- val outputPath : Path = FileOutputFormat .getOutputPath(conf.value)
117- if (outputPath == null ) {
118- throw new IOException (" Undefined job output-path" )
119- }
120- val workPath = new Path (outputPath, dynamicPartPath.stripPrefix(" /" )) // remove "/"
121- val path = new Path (workPath, outputName)
122- getOutputCommitter().setupTask(getTaskContext())
79+ def open () {
12380 writer = HiveFileFormatUtils .getHiveRecordWriter(
12481 conf.value,
12582 fileSinkConf.getTableInfo,
12683 conf.value.getOutputValueClass.asInstanceOf [Class [Writable ]],
12784 fileSinkConf,
128- path ,
85+ FileOutputFormat .getTaskOutputPath(conf.value, getOutputName) ,
12986 Reporter .NULL )
13087 }
13188
132- def write ( value : Writable ) {
133- if (writer != null ) {
134- writer.write(value )
135- } else {
136- throw new IOException ( " Writer is null, open() has not been called " )
137- }
89+ protected def getOutputName : String = {
90+ val numberFormat = NumberFormat .getInstance()
91+ numberFormat.setMinimumIntegerDigits( 5 )
92+ numberFormat.setGroupingUsed( false )
93+ val extension = Utilities .getFileExtension(conf.value, fileSinkConf.getCompressed, outputFormat )
94+ " part- " + numberFormat.format(splitID) + extension
13895 }
13996
97+ def getLocalFileWriter (row : Row ): FileSinkOperator .RecordWriter = writer
98+
14099 def close () {
141100 // Seems the boolean value passed into close does not matter.
142101 writer.close(false )
143102 }
144103
145104 def commit () {
146- val taCtxt = getTaskContext()
147- val cmtr = getOutputCommitter()
148- if (cmtr.needsTaskCommit(taCtxt)) {
105+ if (committer.needsTaskCommit(taskContext)) {
149106 try {
150- cmtr .commitTask(taCtxt )
107+ committer .commitTask(taskContext )
151108 logInfo (taID + " : Committed" )
152109 } catch {
153110 case e : IOException =>
154111 logError(" Error committing the output of task: " + taID.value, e)
155- cmtr .abortTask(taCtxt )
112+ committer .abortTask(taskContext )
156113 throw e
157114 }
158115 } else {
159- logWarning (" No need to commit output of task: " + taID.value)
116+ logInfo (" No need to commit output of task: " + taID.value)
160117 }
161118 }
162119
163120 def commitJob () {
164- // always ? Or if cmtr.needsTaskCommit ?
165- val cmtr = getOutputCommitter()
166- cmtr.commitJob(getJobContext())
121+ committer.commitJob(jobContext)
167122 }
168123
169124 // ********* Private Functions *********
170125
171- private def getOutputFormat (): HiveOutputFormat [AnyRef ,Writable ] = {
172- if (format == null ) {
173- format = conf.value.getOutputFormat()
174- .asInstanceOf [HiveOutputFormat [AnyRef ,Writable ]]
175- }
176- format
177- }
178-
179- private def getOutputCommitter (): OutputCommitter = {
180- if (committer == null ) {
181- committer = conf.value.getOutputCommitter
182- }
183- committer
184- }
185-
186- private def getJobContext (): JobContext = {
187- if (jobContext == null ) {
188- jobContext = newJobContext(conf.value, jID.value)
189- }
190- jobContext
191- }
192-
193- private def getTaskContext (): TaskAttemptContext = {
194- if (taskContext == null ) {
195- taskContext = newTaskAttemptContext(conf.value, taID.value)
196- }
197- taskContext
198- }
199-
200126 private def setIDs (jobId : Int , splitId : Int , attemptId : Int ) {
201127 jobID = jobId
202128 splitID = splitId
@@ -216,7 +142,7 @@ private[hive] class SparkHiveHadoopWriter(
216142 }
217143}
218144
219- private [hive] object SparkHiveHadoopWriter {
145+ private [hive] object SparkHiveWriterContainer {
220146 def createPathFromString (path : String , conf : JobConf ): Path = {
221147 if (path == null ) {
222148 throw new IllegalArgumentException (" Output path is null" )
@@ -226,6 +152,59 @@ private[hive] object SparkHiveHadoopWriter {
226152 if (outputPath == null || fs == null ) {
227153 throw new IllegalArgumentException (" Incorrectly formatted output path" )
228154 }
229- outputPath.makeQualified(fs)
155+ outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
156+ }
157+ }
158+
159+ private [spark] class SparkHiveDynamicPartitionWriterContainer (
160+ @ transient jobConf : JobConf ,
161+ fileSinkConf : FileSinkDesc ,
162+ dynamicPartColNames : Array [String ],
163+ defaultPartName : String )
164+ extends SparkHiveWriterContainer (jobConf, fileSinkConf) {
165+
166+ @ transient var writers : mutable.HashMap [String , FileSinkOperator .RecordWriter ] = _
167+
168+ override def open (): Unit = {
169+ writers = mutable.HashMap .empty[String , FileSinkOperator .RecordWriter ]
170+ }
171+
172+ override def close (): Unit = {
173+ writers.values.foreach(_.close(false ))
174+ }
175+
176+ override def getLocalFileWriter (row : Row ): FileSinkOperator .RecordWriter = {
177+ val dynamicPartPath = dynamicPartColNames
178+ .zip(row.takeRight(dynamicPartColNames.length))
179+ .map { case (col, rawVal) =>
180+ val string = String .valueOf(rawVal)
181+ s " / $col= ${if (rawVal == null || string.isEmpty) defaultPartName else string}"
182+ }
183+ .mkString
184+
185+ val path = {
186+ val outputPath = FileOutputFormat .getOutputPath(conf.value)
187+ assert(outputPath != null , " Undefined job output-path" )
188+ val workPath = new Path (outputPath, dynamicPartPath.stripPrefix(" /" ))
189+ new Path (workPath, getOutputName)
190+ }
191+
192+ def newWriter = {
193+ val newFileSinkDesc = new FileSinkDesc (
194+ fileSinkConf.getDirName + dynamicPartPath,
195+ fileSinkConf.getTableInfo,
196+ fileSinkConf.getCompressed)
197+ newFileSinkDesc.setCompressCodec(fileSinkConf.getCompressCodec)
198+ newFileSinkDesc.setCompressType(fileSinkConf.getCompressType)
199+ HiveFileFormatUtils .getHiveRecordWriter(
200+ conf.value,
201+ fileSinkConf.getTableInfo,
202+ conf.value.getOutputValueClass.asInstanceOf [Class [Writable ]],
203+ newFileSinkDesc,
204+ path,
205+ Reporter .NULL )
206+ }
207+
208+ writers.getOrElseUpdate(dynamicPartPath, newWriter)
230209 }
231210}
0 commit comments