@@ -24,20 +24,16 @@ import scala.collection.JavaConverters._
2424import org .apache .hadoop .hive .conf .HiveConf
2525import org .apache .hadoop .hive .conf .HiveConf .ConfVars
2626import org .apache .hadoop .hive .ql .{Context , ErrorMsg }
27- import org .apache .hadoop .hive .ql .plan .TableDesc
28- import org .apache .hadoop .hive .serde2 .Serializer
29- import org .apache .hadoop .hive .serde2 .objectinspector ._
30- import org .apache .hadoop .hive .serde2 .objectinspector .ObjectInspectorUtils .ObjectInspectorCopyOption
3127import org .apache .hadoop .mapred .{FileOutputFormat , JobConf }
3228
33- import org .apache .spark .{SparkException , TaskContext }
3429import org .apache .spark .rdd .RDD
30+ import org .apache .spark .sql .SQLConf
3531import org .apache .spark .sql .catalyst .InternalRow
36- import org .apache .spark .sql .catalyst .expressions .{ Attribute , FromUnsafeProjection }
32+ import org .apache .spark .sql .catalyst .expressions .Attribute
3733import org .apache .spark .sql .execution .{SparkPlan , UnaryNode }
3834import org .apache .spark .sql .hive ._
3935import org .apache .spark .sql .hive .HiveShim .{ShimFileSinkDesc => FileSinkDesc }
40- import org .apache .spark .sql . types . DataType
36+ import org .apache .spark .SparkException
4137import org .apache .spark .util .SerializableJobConf
4238
4339private [hive]
@@ -46,19 +42,12 @@ case class InsertIntoHiveTable(
4642 partition : Map [String , Option [String ]],
4743 child : SparkPlan ,
4844 overwrite : Boolean ,
49- ifNotExists : Boolean ) extends UnaryNode with HiveInspectors {
45+ ifNotExists : Boolean ) extends UnaryNode {
5046
5147 @ transient val sc : HiveContext = sqlContext.asInstanceOf [HiveContext ]
52- @ transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
5348 @ transient private lazy val hiveContext = new Context (sc.hiveconf)
5449 @ transient private lazy val catalog = sc.catalog
5550
56- private def newSerializer (tableDesc : TableDesc ): Serializer = {
57- val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf [Serializer ]
58- serializer.initialize(null , tableDesc.getProperties)
59- serializer
60- }
61-
6251 def output : Seq [Attribute ] = Seq .empty
6352
6453 private def saveAsHiveFile (
@@ -78,44 +67,10 @@ case class InsertIntoHiveTable(
7867 conf.value,
7968 SparkHiveWriterContainer .createPathFromString(fileSinkConf.getDirName, conf.value))
8069 log.debug(" Saving as hadoop file of type " + valueClass.getSimpleName)
81-
8270 writerContainer.driverSideSetup()
83- sc.sparkContext.runJob(rdd, writeToFile _)
71+ sc.sparkContext.runJob(rdd, writerContainer. writeToFile _)
8472 writerContainer.commitJob()
8573
86- // Note that this function is executed on executor side
87- def writeToFile (context : TaskContext , iterator : Iterator [InternalRow ]): Unit = {
88- val serializer = newSerializer(fileSinkConf.getTableInfo)
89- val standardOI = ObjectInspectorUtils
90- .getStandardObjectInspector(
91- fileSinkConf.getTableInfo.getDeserializer.getObjectInspector,
92- ObjectInspectorCopyOption .JAVA )
93- .asInstanceOf [StructObjectInspector ]
94-
95- val fieldOIs = standardOI.getAllStructFieldRefs.asScala
96- .map(_.getFieldObjectInspector).toArray
97- val dataTypes : Array [DataType ] = child.output.map(_.dataType).toArray
98- val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt)}
99- val outputData = new Array [Any ](fieldOIs.length)
100-
101- writerContainer.executorSideSetup(context.stageId, context.partitionId, context.attemptNumber)
102-
103- val proj = FromUnsafeProjection (child.schema)
104- iterator.foreach { row =>
105- var i = 0
106- val safeRow = proj(row)
107- while (i < fieldOIs.length) {
108- outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(safeRow.get(i, dataTypes(i)))
109- i += 1
110- }
111-
112- writerContainer
113- .getLocalFileWriter(safeRow, table.schema)
114- .write(serializer.serialize(outputData, standardOI))
115- }
116-
117- writerContainer.close()
118- }
11974 }
12075
12176 /**
@@ -194,11 +149,21 @@ case class InsertIntoHiveTable(
194149
195150 val writerContainer = if (numDynamicPartitions > 0 ) {
196151 val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions)
197- new SparkHiveDynamicPartitionWriterContainer (jobConf, fileSinkConf, dynamicPartColNames)
152+ new SparkHiveDynamicPartitionWriterContainer (
153+ jobConf,
154+ fileSinkConf,
155+ dynamicPartColNames,
156+ child.output,
157+ table)
198158 } else {
199- new SparkHiveWriterContainer (jobConf, fileSinkConf)
159+ new SparkHiveWriterContainer (
160+ jobConf,
161+ fileSinkConf,
162+ child.output,
163+ table)
200164 }
201165
166+ @ transient val outputClass = writerContainer.newSerializer(table.tableDesc).getSerializedClass
202167 saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer)
203168
204169 val outputPath = FileOutputFormat .getOutputPath(jobConf)
0 commit comments