Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 27 additions & 10 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
import org.apache.hadoop.hive.ql.plan.{PlanUtils, TableDesc}
import org.apache.hadoop.hive.serde2.Deserializer
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, StructObjectInspector}
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}
Expand Down Expand Up @@ -116,7 +116,7 @@ class HadoopTableReader(
val hconf = broadcastedHiveConf.value.value
val deserializer = deserializerClass.newInstance()
deserializer.initialize(hconf, tableDesc.getProperties)
HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow)
HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer)
}

deserializedHadoopRDD
Expand Down Expand Up @@ -189,9 +189,13 @@ class HadoopTableReader(
val hconf = broadcastedHiveConf.value.value
val deserializer = localDeserializer.newInstance()
deserializer.initialize(hconf, partProps)
// get the table deserializer
val tableSerDe = tableDesc.getDeserializerClass.newInstance()
tableSerDe.initialize(hconf, tableDesc.getProperties)

// fill the non partition key attributes
HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs, mutableRow)
HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs,
mutableRow, tableSerDe)
}
}.toSeq

Expand Down Expand Up @@ -261,25 +265,36 @@ private[hive] object HadoopTableReader extends HiveInspectors {
* Transform all given raw `Writable`s into `Row`s.
*
* @param iterator Iterator of all `Writable`s to be transformed
* @param deserializer The `Deserializer` associated with the input `Writable`
* @param rawDeser The `Deserializer` associated with the input `Writable`
* @param nonPartitionKeyAttrs Attributes that should be filled together with their corresponding
* positions in the output schema
* @param mutableRow A reusable `MutableRow` that should be filled
* @param tableDeser Table Deserializer
* @return An `Iterator[Row]` transformed from `iterator`
*/
def fillObject(
iterator: Iterator[Writable],
deserializer: Deserializer,
rawDeser: Deserializer,
nonPartitionKeyAttrs: Seq[(Attribute, Int)],
mutableRow: MutableRow): Iterator[Row] = {
mutableRow: MutableRow,
tableDeser: Deserializer): Iterator[Row] = {

val soi = if (rawDeser.getObjectInspector.equals(tableDeser.getObjectInspector)) {
rawDeser.getObjectInspector.asInstanceOf[StructObjectInspector]
} else {
HiveShim.getConvertedOI(
rawDeser.getObjectInspector,
tableDeser.getObjectInspector).asInstanceOf[StructObjectInspector]
}

val soi = deserializer.getObjectInspector().asInstanceOf[StructObjectInspector]
val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map { case (attr, ordinal) =>
soi.getStructFieldRef(attr.name) -> ordinal
}.unzip

// Builds specific unwrappers ahead of time according to object inspector types to avoid pattern
// matching and branching costs per row.
/**
* Builds specific unwrappers ahead of time according to object inspector
* types to avoid pattern matching and branching costs per row.
*/
val unwrappers: Seq[(Any, MutableRow, Int) => Unit] = fieldRefs.map {
_.getFieldObjectInspector match {
case oi: BooleanObjectInspector =>
Expand Down Expand Up @@ -316,9 +331,11 @@ private[hive] object HadoopTableReader extends HiveInspectors {
}
}

val converter = ObjectInspectorConverters.getConverter(rawDeser.getObjectInspector, soi)

// Map each tuple to a row object
iterator.map { value =>
val raw = deserializer.deserialize(value)
val raw = converter.convert(rawDeser.deserialize(value))
var i = 0
while (i < fieldRefs.length) {
val fieldValue = soi.getStructFieldData(raw, fieldRefs(i))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ import org.apache.spark.sql.hive.test.TestHive._

case class TestData(key: Int, value: String)

case class ThreeCloumntable(key: Int, value: String, key1: String)

class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter {
import org.apache.spark.sql.hive.test.TestHive.implicits._


val testData = TestHive.sparkContext.parallelize(
(1 to 100).map(i => TestData(i, i.toString))).toDF()

Expand Down Expand Up @@ -187,4 +190,43 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter {

sql("DROP TABLE hiveTableWithStructValue")
}

test("SPARK-5498:partition schema does not match table schema") {
val testData = TestHive.sparkContext.parallelize(
(1 to 10).map(i => TestData(i, i.toString))).toDF()
testData.registerTempTable("testData")

val testDatawithNull = TestHive.sparkContext.parallelize(
(1 to 10).map(i => ThreeCloumntable(i, i.toString,null))).toDF()

val tmpDir = Files.createTempDir()
sql(s"CREATE TABLE table_with_partition(key int,value string) PARTITIONED by (ds string) location '${tmpDir.toURI.toString}' ")
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1') SELECT key,value FROM testData")

// test schema the same between partition and table
sql("ALTER TABLE table_with_partition CHANGE COLUMN key key BIGINT")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just checked the Hive Document
It says:
The CASCADE|RESTRICT clause is available in Hive 0.15.0. ALTER TABLE CHANGE COLUMN with CASCADE command changes the columns of a table's metadata, and cascades the same change to all the partition metadata. RESTRICT is the default, limiting column change only to table metadata.
I guess in Hive 0.13.1, when table schema changed via alter table, only the table meta data will be updated, can you double check if above query works for Hive 0.13.1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I check this query in Hive 0.11 and hive-0.12 is OK,I will check this query in Hive 0.13.1 later.

checkAnswer(sql("select key,value from table_with_partition where ds='1' "),
testData.collect.toSeq
)

// test difference type of field
sql("ALTER TABLE table_with_partition CHANGE COLUMN key key BIGINT")
checkAnswer(sql("select key,value from table_with_partition where ds='1' "),
testData.collect.toSeq
)

// add column to table
sql("ALTER TABLE table_with_partition ADD COLUMNS(key1 string)")
checkAnswer(sql("select key,value,key1 from table_with_partition where ds='1' "),
testDatawithNull.collect.toSeq
)

// change column name to table
sql("ALTER TABLE table_with_partition CHANGE COLUMN key keynew BIGINT")
checkAnswer(sql("select keynew,value from table_with_partition where ds='1' "),
testData.collect.toSeq
)

sql("DROP TABLE table_with_partition")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.plan.{CreateTableDesc, FileSinkDesc, TableDesc}
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.stats.StatsSetupConst
import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Deserializer, io => hiveIo}
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, PrimitiveObjectInspector}
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, ObjectInspector, PrimitiveObjectInspector}
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory
import org.apache.hadoop.hive.serde2.objectinspector.primitive.{HiveDecimalObjectInspector, PrimitiveObjectInspectorFactory}
import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfo, TypeInfoFactory}
Expand Down Expand Up @@ -210,7 +210,7 @@ private[hive] object HiveShim {

def getDataLocationPath(p: Partition) = p.getPartitionPath

def getAllPartitionsOf(client: Hive, tbl: Table) = client.getAllPartitionsForPruner(tbl)
def getAllPartitionsOf(client: Hive, tbl: Table) = client.getAllPartitionsForPruner(tbl)

def compatibilityBlackList = Seq(
"decimal_.*",
Expand Down Expand Up @@ -244,6 +244,12 @@ private[hive] object HiveShim {
}
}

def getConvertedOI(
inputOI: ObjectInspector,
outputOI: ObjectInspector): ObjectInspector = {
ObjectInspectorConverters.getConvertedOI(inputOI, outputOI, true)
}

def prepareWritable(w: Writable): Writable = {
w
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.hive

import java.util
import java.util.{ArrayList => JArrayList}
import java.util.Properties
import java.rmi.server.UID
Expand All @@ -38,7 +39,7 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfo, DecimalTypeInfo, TypeInfoFactory}
import org.apache.hadoop.hive.serde2.objectinspector.primitive.{HiveDecimalObjectInspector, PrimitiveObjectInspectorFactory}
import org.apache.hadoop.hive.serde2.objectinspector.{PrimitiveObjectInspector, ObjectInspector}
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, PrimitiveObjectInspector, ObjectInspector}
import org.apache.hadoop.hive.serde2.{Deserializer, ColumnProjectionUtils}
import org.apache.hadoop.hive.serde2.{io => hiveIo}
import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable
Expand Down Expand Up @@ -400,7 +401,11 @@ private[hive] object HiveShim {
Decimal(hdoi.getPrimitiveJavaObject(data).bigDecimalValue(), hdoi.precision(), hdoi.scale())
}
}


def getConvertedOI(inputOI: ObjectInspector, outputOI: ObjectInspector): ObjectInspector = {
ObjectInspectorConverters.getConvertedOI(inputOI, outputOI)
}

/*
* Bug introduced in hive-0.13. AvroGenericRecordWritable has a member recordReaderID that
* is needed to initialize before serialization.
Expand Down