Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,16 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.serde2.Serializer
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}

import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, FromUnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.types.DataType
import org.apache.spark.SparkException
import org.apache.spark.util.SerializableJobConf

private[hive]
Expand All @@ -46,19 +42,12 @@ case class InsertIntoHiveTable(
partition: Map[String, Option[String]],
child: SparkPlan,
overwrite: Boolean,
ifNotExists: Boolean) extends UnaryNode with HiveInspectors {
ifNotExists: Boolean) extends UnaryNode {

@transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext]
@transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
@transient private lazy val hiveContext = new Context(sc.hiveconf)
@transient private lazy val catalog = sc.catalog

private def newSerializer(tableDesc: TableDesc): Serializer = {
val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
serializer.initialize(null, tableDesc.getProperties)
serializer
}

def output: Seq[Attribute] = Seq.empty

private def saveAsHiveFile(
Expand All @@ -78,44 +67,10 @@ case class InsertIntoHiveTable(
conf.value,
SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName, conf.value))
log.debug("Saving as hadoop file of type " + valueClass.getSimpleName)

writerContainer.driverSideSetup()
sc.sparkContext.runJob(rdd, writeToFile _)
sc.sparkContext.runJob(rdd, writerContainer.writeToFile _)
writerContainer.commitJob()

// Note that this function is executed on executor side
def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = {
val serializer = newSerializer(fileSinkConf.getTableInfo)
val standardOI = ObjectInspectorUtils
.getStandardObjectInspector(
fileSinkConf.getTableInfo.getDeserializer.getObjectInspector,
ObjectInspectorCopyOption.JAVA)
.asInstanceOf[StructObjectInspector]

val fieldOIs = standardOI.getAllStructFieldRefs.asScala
.map(_.getFieldObjectInspector).toArray
val dataTypes: Array[DataType] = child.output.map(_.dataType).toArray
val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt)}
val outputData = new Array[Any](fieldOIs.length)

writerContainer.executorSideSetup(context.stageId, context.partitionId, context.attemptNumber)

val proj = FromUnsafeProjection(child.schema)
iterator.foreach { row =>
var i = 0
val safeRow = proj(row)
while (i < fieldOIs.length) {
outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(safeRow.get(i, dataTypes(i)))
i += 1
}

writerContainer
.getLocalFileWriter(safeRow, table.schema)
.write(serializer.serialize(outputData, standardOI))
}

writerContainer.close()
}
}

/**
Expand Down Expand Up @@ -194,11 +149,21 @@ case class InsertIntoHiveTable(

val writerContainer = if (numDynamicPartitions > 0) {
val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions)
new SparkHiveDynamicPartitionWriterContainer(jobConf, fileSinkConf, dynamicPartColNames)
new SparkHiveDynamicPartitionWriterContainer(
jobConf,
fileSinkConf,
dynamicPartColNames,
child.output,
table)
} else {
new SparkHiveWriterContainer(jobConf, fileSinkConf)
new SparkHiveWriterContainer(
jobConf,
fileSinkConf,
child.output,
table)
}

@transient val outputClass = writerContainer.newSerializer(table.tableDesc).getSerializedClass
saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer)

val outputPath = FileOutputFormat.getOutputPath(jobConf)
Expand Down
Loading