Skip to content

Commit 3fe16e8

Browse files
committed
Revert "Broadcast configuration in hiveWriterContainers (WIP hack)"
This reverts commit 480d20a.
1 parent 9e116d1 commit 3fe16e8

File tree

3 files changed

+33
-49
lines changed

3 files changed

+33
-49
lines changed

core/src/main/scala/org/apache/spark/SerializableWritable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,4 @@ class SerializableWritable[T <: Writable](@transient var t: T) extends Serializa
4343
ow.readFields(in)
4444
t = ow.get().asInstanceOf[T]
4545
}
46-
}
46+
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -174,14 +174,12 @@ case class InsertIntoHiveTable(
174174

175175
val jobConf = new JobConf(sc.hiveconf)
176176
val jobConfSer = new SerializableWritable(jobConf)
177-
val broadcastedConf = sc.sparkContext.broadcast(new SerializableWritable[JobConf](jobConf))
178177

179178
val writerContainer = if (numDynamicPartitions > 0) {
180179
val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions)
181-
new SparkHiveDynamicPartitionWriterContainer(
182-
broadcastedConf, fileSinkConf, dynamicPartColNames)
180+
new SparkHiveDynamicPartitionWriterContainer(jobConf, fileSinkConf, dynamicPartColNames)
183181
} else {
184-
new SparkHiveWriterContainer(broadcastedConf, fileSinkConf)
182+
new SparkHiveWriterContainer(jobConf, fileSinkConf)
185183
}
186184

187185
saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer)

sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala

Lines changed: 30 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,9 @@ import org.apache.hadoop.io.Writable
3131
import org.apache.hadoop.mapred._
3232
import org.apache.hadoop.hive.common.FileUtils
3333

34-
import org.apache.spark.broadcast.Broadcast
3534
import org.apache.spark.mapred.SparkHadoopMapRedUtil
3635
import org.apache.spark.sql.Row
37-
import org.apache.spark.{SerializableWritable, Logging, SparkHadoopWriter}
36+
import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
3837
import org.apache.spark.sql.catalyst.util.DateUtils
3938
import org.apache.spark.sql.hive.{ShimFileSinkDesc => FileSinkDesc}
4039
import org.apache.spark.sql.hive.HiveShim._
@@ -45,7 +44,7 @@ import org.apache.spark.sql.types._
4544
* It is based on [[SparkHadoopWriter]].
4645
*/
4746
private[hive] class SparkHiveWriterContainer(
48-
jobConf: Broadcast[SerializableWritable[JobConf]],
47+
@transient jobConf: JobConf,
4948
fileSinkConf: FileSinkDesc)
5049
extends Logging
5150
with SparkHadoopMapRedUtil
@@ -57,28 +56,22 @@ private[hive] class SparkHiveWriterContainer(
5756
// handler settings can be set to jobConf
5857
if (tableDesc != null) {
5958
PlanUtils.configureOutputJobPropertiesForStorageHandler(tableDesc)
60-
Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf.value.value)
59+
Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf)
6160
}
62-
@transient var conf: JobConf = jobConf.value.value
61+
protected val conf = new SerializableWritable(jobConf)
6362

6463
private var jobID = 0
6564
private var splitID = 0
6665
private var attemptID = 0
67-
68-
@transient private var jID: JobID = null
69-
@transient private var taID: TaskAttemptID = null
70-
private var jIDString: String = null
71-
private var taskIDString: String = null
72-
private var taskAttemptIDString: String = null
66+
private var jID: SerializableWritable[JobID] = null
67+
private var taID: SerializableWritable[TaskAttemptID] = null
7368

7469
@transient private var writer: FileSinkOperator.RecordWriter = null
75-
@transient protected lazy val committer = conf.getOutputCommitter
76-
/** Only used on driver side **/
77-
@transient protected lazy val jobContext = newJobContext(conf, jID)
78-
/** Only used on executor side */
79-
@transient private lazy val taskContext = newTaskAttemptContext(conf, taID)
70+
@transient protected lazy val committer = conf.value.getOutputCommitter
71+
@transient protected lazy val jobContext = newJobContext(conf.value, jID.value)
72+
@transient private lazy val taskContext = newTaskAttemptContext(conf.value, taID.value)
8073
@transient private lazy val outputFormat =
81-
conf.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]]
74+
conf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]]
8275

8376
def driverSideSetup() {
8477
setIDs(0, 0, 0)
@@ -87,7 +80,6 @@ private[hive] class SparkHiveWriterContainer(
8780
}
8881

8982
def executorSideSetup(jobId: Int, splitId: Int, attemptId: Int) {
90-
conf = new JobConf(jobConf.value.value)
9183
setIDs(jobId, splitId, attemptId)
9284
setConfParams()
9385
committer.setupTask(taskContext)
@@ -98,7 +90,7 @@ private[hive] class SparkHiveWriterContainer(
9890
val numberFormat = NumberFormat.getInstance()
9991
numberFormat.setMinimumIntegerDigits(5)
10092
numberFormat.setGroupingUsed(false)
101-
val extension = Utilities.getFileExtension(conf, fileSinkConf.getCompressed, outputFormat)
93+
val extension = Utilities.getFileExtension(conf.value, fileSinkConf.getCompressed, outputFormat)
10294
"part-" + numberFormat.format(splitID) + extension
10395
}
10496

@@ -118,11 +110,11 @@ private[hive] class SparkHiveWriterContainer(
118110
// NOTE this method is executed at the executor side.
119111
// For Hive tables without partitions or with only static partitions, only 1 writer is needed.
120112
writer = HiveFileFormatUtils.getHiveRecordWriter(
121-
conf,
113+
conf.value,
122114
fileSinkConf.getTableInfo,
123-
conf.getOutputValueClass.asInstanceOf[Class[Writable]],
115+
conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
124116
fileSinkConf,
125-
FileOutputFormat.getTaskOutputPath(conf, getOutputName),
117+
FileOutputFormat.getTaskOutputPath(conf.value, getOutputName),
126118
Reporter.NULL)
127119
}
128120

@@ -135,23 +127,17 @@ private[hive] class SparkHiveWriterContainer(
135127
splitID = splitId
136128
attemptID = attemptId
137129

138-
// note: sparkHadoopwriter.createjobid may be locale-dependent because it doesn't pass a locale
139-
// to date format; we should fix this so that its results is location-independent in case
140-
// different cluster nodes have different locales (e.g. driver and executor may be different
141-
// types of machines with different configurations).
142-
jID = SparkHadoopWriter.createJobID(now, jobId)
143-
taID = new TaskAttemptID(new TaskID(jID, true, splitID), attemptID)
144-
jIDString = jID.toString
145-
taskAttemptIDString = taID.toString
146-
taskIDString = taID.getTaskID.toString
130+
jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobId))
131+
taID = new SerializableWritable[TaskAttemptID](
132+
new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
147133
}
148134

149135
private def setConfParams() {
150-
conf.set("mapred.job.id", jIDString)
151-
conf.set("mapred.tip.id", taskIDString)
152-
conf.set("mapred.task.id", taskAttemptIDString)
153-
conf.setBoolean("mapred.task.is.map", true)
154-
conf.setInt("mapred.task.partition", splitID)
136+
conf.value.set("mapred.job.id", jID.value.toString)
137+
conf.value.set("mapred.tip.id", taID.value.getTaskID.toString)
138+
conf.value.set("mapred.task.id", taID.value.toString)
139+
conf.value.setBoolean("mapred.task.is.map", true)
140+
conf.value.setInt("mapred.task.partition", splitID)
155141
}
156142
}
157143

@@ -174,14 +160,14 @@ private[spark] object SparkHiveDynamicPartitionWriterContainer {
174160
}
175161

176162
private[spark] class SparkHiveDynamicPartitionWriterContainer(
177-
jobConf: Broadcast[SerializableWritable[JobConf]],
163+
@transient jobConf: JobConf,
178164
fileSinkConf: FileSinkDesc,
179165
dynamicPartColNames: Array[String])
180166
extends SparkHiveWriterContainer(jobConf, fileSinkConf) {
181167

182168
import SparkHiveDynamicPartitionWriterContainer._
183169

184-
private val defaultPartName = jobConf.value.value.get(
170+
private val defaultPartName = jobConf.get(
185171
ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal)
186172

187173
@transient private var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _
@@ -205,10 +191,10 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
205191
// Better solution is to add a step similar to what Hive FileSinkOperator.jobCloseOp does:
206192
// calling something like Utilities.mvFileToFinalPath to cleanup the output directory and then
207193
// load it with loadDynamicPartitions/loadPartition/loadTable.
208-
val oldMarker = jobConf.value.value.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)
209-
jobConf.value.value.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, false)
194+
val oldMarker = jobConf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)
195+
jobConf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, false)
210196
super.commitJob()
211-
jobConf.value.value.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker)
197+
jobConf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker)
212198
}
213199

214200
override def getLocalFileWriter(row: Row, schema: StructType): FileSinkOperator.RecordWriter = {
@@ -243,16 +229,16 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
243229
newFileSinkDesc.setCompressType(fileSinkConf.getCompressType)
244230

245231
val path = {
246-
val outputPath = FileOutputFormat.getOutputPath(conf)
232+
val outputPath = FileOutputFormat.getOutputPath(conf.value)
247233
assert(outputPath != null, "Undefined job output-path")
248234
val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/"))
249235
new Path(workPath, getOutputName)
250236
}
251237

252238
HiveFileFormatUtils.getHiveRecordWriter(
253-
conf,
239+
conf.value,
254240
fileSinkConf.getTableInfo,
255-
conf.getOutputValueClass.asInstanceOf[Class[Writable]],
241+
conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
256242
newFileSinkDesc,
257243
path,
258244
Reporter.NULL)

0 commit comments

Comments
 (0)