Skip to content

Commit b20a3dc

Browse files
committed
Addresses @yhuai's comments
1 parent 096bbbc commit b20a3dc

File tree

2 files changed

+32
-29
lines changed

2 files changed

+32
-29
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,11 +150,11 @@ case class InsertIntoHiveTable(
150150
// around by taking a mod. We expect that no task will be attempted 2 billion times.
151151
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
152152
writerContainer.executorSideSetup(context.stageId, context.partitionId, attemptNumber)
153-
writerContainer.open()
154153

155154
iterator.foreach { row =>
156155
var i = 0
157156
while (i < fieldOIs.length) {
157+
// TODO (lian) avoid per row dynamic dispatching and pattern matching cost in `wrap`
158158
outputData(i) = wrap(row(i), fieldOIs(i))
159159
i += 1
160160
}
@@ -164,7 +164,6 @@ case class InsertIntoHiveTable(
164164
}
165165

166166
writerContainer.close()
167-
writerContainer.commit()
168167
}
169168
}
170169

sql/hive/src/main/scala/org/apache/spark/sql/hive/SparkHadoopWriter.scala renamed to sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -71,20 +71,7 @@ private[hive] class SparkHiveWriterContainer(
7171
setIDs(jobId, splitId, attemptId)
7272
setConfParams()
7373
committer.setupTask(taskContext)
74-
}
75-
76-
/**
77-
* Create a `HiveRecordWriter`. A relative dynamic partition path can be used to create a writer
78-
* for writing data to a dynamic partition.
79-
*/
80-
def open() {
81-
writer = HiveFileFormatUtils.getHiveRecordWriter(
82-
conf.value,
83-
fileSinkConf.getTableInfo,
84-
conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
85-
fileSinkConf,
86-
FileOutputFormat.getTaskOutputPath(conf.value, getOutputName),
87-
Reporter.NULL)
74+
initWriters()
8875
}
8976

9077
protected def getOutputName: String = {
@@ -100,9 +87,26 @@ private[hive] class SparkHiveWriterContainer(
10087
def close() {
10188
// Seems the boolean value passed into close does not matter.
10289
writer.close(false)
90+
commit()
91+
}
92+
93+
def commitJob() {
94+
committer.commitJob(jobContext)
10395
}
10496

105-
def commit() {
97+
protected def initWriters() {
98+
// NOTE this method is executed at the executor side.
99+
// For Hive tables without partitions or with only static partitions, only 1 writer is needed.
100+
writer = HiveFileFormatUtils.getHiveRecordWriter(
101+
conf.value,
102+
fileSinkConf.getTableInfo,
103+
conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
104+
fileSinkConf,
105+
FileOutputFormat.getTaskOutputPath(conf.value, getOutputName),
106+
Reporter.NULL)
107+
}
108+
109+
protected def commit() {
106110
if (committer.needsTaskCommit(taskContext)) {
107111
try {
108112
committer.commitTask(taskContext)
@@ -118,10 +122,6 @@ private[hive] class SparkHiveWriterContainer(
118122
}
119123
}
120124

121-
def commitJob() {
122-
committer.commitJob(jobContext)
123-
}
124-
125125
// ********* Private Functions *********
126126

127127
private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {
@@ -168,12 +168,15 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
168168

169169
@transient private var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _
170170

171-
override def open(): Unit = {
171+
override protected def initWriters(): Unit = {
172+
// NOTE: This method is executed at the executor side.
173+
// Actual writers are created for each dynamic partition on the fly.
172174
writers = mutable.HashMap.empty[String, FileSinkOperator.RecordWriter]
173175
}
174176

175177
override def close(): Unit = {
176178
writers.values.foreach(_.close(false))
179+
commit()
177180
}
178181

179182
override def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = {
@@ -185,20 +188,21 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
185188
}
186189
.mkString
187190

188-
val path = {
189-
val outputPath = FileOutputFormat.getOutputPath(conf.value)
190-
assert(outputPath != null, "Undefined job output-path")
191-
val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/"))
192-
new Path(workPath, getOutputName)
193-
}
194-
195191
def newWriter = {
196192
val newFileSinkDesc = new FileSinkDesc(
197193
fileSinkConf.getDirName + dynamicPartPath,
198194
fileSinkConf.getTableInfo,
199195
fileSinkConf.getCompressed)
200196
newFileSinkDesc.setCompressCodec(fileSinkConf.getCompressCodec)
201197
newFileSinkDesc.setCompressType(fileSinkConf.getCompressType)
198+
199+
val path = {
200+
val outputPath = FileOutputFormat.getOutputPath(conf.value)
201+
assert(outputPath != null, "Undefined job output-path")
202+
val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/"))
203+
new Path(workPath, getOutputName)
204+
}
205+
202206
HiveFileFormatUtils.getHiveRecordWriter(
203207
conf.value,
204208
fileSinkConf.getTableInfo,

0 commit comments

Comments
 (0)