Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3344bb8
Support TimestampNTZ type in Orc file source
beliefer Jul 30, 2021
ef516ec
Update code
beliefer Aug 2, 2021
a1aa093
Update code
beliefer Aug 2, 2021
0bd85eb
Merge branch 'master' into SPARK-36346
beliefer Aug 4, 2021
cd80e28
Store spark data types in ORC metadata.
beliefer Aug 16, 2021
3cb69f0
Merge branch 'SPARK-36346' of github.com:beliefer/spark into SPARK-36346
beliefer Aug 16, 2021
42fa8e4
Update code
beliefer Aug 16, 2021
db374b9
Merge branch 'master' into SPARK-36346
beliefer Aug 17, 2021
ab8d985
Update code
beliefer Aug 17, 2021
7d19c1e
Update code
beliefer Aug 17, 2021
ae3e2de
Merge branch 'master' into SPARK-36346
beliefer Aug 17, 2021
693c620
Merge branch 'master' into SPARK-36346
beliefer Aug 18, 2021
b0acc90
Update code
beliefer Aug 18, 2021
3c69b9c
Update code
beliefer Aug 18, 2021
fc08870
Merge branch 'master' into SPARK-36346
beliefer Aug 31, 2021
528f7ad
Merge branch 'master' into SPARK-36346
beliefer Nov 11, 2021
3549dba
Update code
beliefer Nov 11, 2021
282c648
Update code
beliefer Nov 11, 2021
0e0d44c
Update code
beliefer Nov 11, 2021
91331c7
Update code
beliefer Nov 13, 2021
cffe76b
Update sql/core/src/main/scala/org/apache/spark/sql/execution/datasou…
beliefer Nov 15, 2021
7c273d5
Update sql/core/src/main/scala/org/apache/spark/sql/execution/datasou…
beliefer Nov 15, 2021
9243f3b
Update sql/core/src/main/scala/org/apache/spark/sql/execution/datasou…
beliefer Nov 15, 2021
108c924
Update sql/core/src/test/scala/org/apache/spark/sql/execution/datasou…
beliefer Nov 15, 2021
625bd9f
Update code
beliefer Nov 15, 2021
691a9a8
Update code
beliefer Nov 15, 2021
8e66926
Update code
beliefer Nov 17, 2021
51d7651
Update code
beliefer Nov 17, 2021
3b70990
Update code
beliefer Nov 17, 2021
2c213c3
Update code
beliefer Nov 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1889,4 +1889,8 @@ object QueryExecutionErrors {
def hiveTableWithAnsiIntervalsError(tableName: String): Throwable = {
new UnsupportedOperationException(s"Hive table $tableName with ANSI intervals is not supported")
}

def cannotConvertOrcTimestampToTimestampNTZError(): Throwable = {
new RuntimeException("Unable to convert timestamp of Orc to data type 'timestamp_ntz'")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.TimestampType;
import org.apache.spark.sql.types.TimestampNTZType;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.types.UTF8String;
Expand All @@ -36,6 +37,7 @@
*/
public class OrcAtomicColumnVector extends OrcColumnVector {
private final boolean isTimestamp;
private final boolean isTimestampNTZ;
private final boolean isDate;

// Column vector for each type. Only 1 is populated for any type.
Expand All @@ -54,6 +56,12 @@ public class OrcAtomicColumnVector extends OrcColumnVector {
isTimestamp = false;
}

if (type instanceof TimestampNTZType) {
isTimestampNTZ = true;
} else {
isTimestampNTZ = false;
}

if (type instanceof DateType) {
isDate = true;
} else {
Expand Down Expand Up @@ -105,6 +113,8 @@ public long getLong(int rowId) {
int index = getRowIndex(rowId);
if (isTimestamp) {
return DateTimeUtils.fromJavaTimestamp(timestampData.asScratchTimestamp(index));
} else if (isTimestampNTZ) {
return OrcUtils.fromOrcNTZ(timestampData.asScratchTimestamp(index));
} else {
return longData.vector[index];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ class OrcDeserializer(
case TimestampType => (ordinal, value) =>
updater.setLong(ordinal, DateTimeUtils.fromJavaTimestamp(value.asInstanceOf[OrcTimestamp]))

case TimestampNTZType => (ordinal, value) =>
updater.setLong(ordinal, OrcUtils.fromOrcNTZ(value.asInstanceOf[OrcTimestamp]))

case DecimalType.Fixed(precision, scale) => (ordinal, value) =>
val v = OrcShimUtils.getDecimal(value)
v.changePrecision(precision, scale)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,24 +42,6 @@ import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.{SerializableConfiguration, Utils}

private[sql] object OrcFileFormat {

def getQuotedSchemaString(dataType: DataType): String = dataType match {
case _: DayTimeIntervalType => LongType.catalogString
case _: YearMonthIntervalType => IntegerType.catalogString
case _: AtomicType => dataType.catalogString
case StructType(fields) =>
fields.map(f => s"`${f.name}`:${getQuotedSchemaString(f.dataType)}")
.mkString("struct<", ",", ">")
case ArrayType(elementType, _) =>
s"array<${getQuotedSchemaString(elementType)}>"
case MapType(keyType, valueType, _) =>
s"map<${getQuotedSchemaString(keyType)},${getQuotedSchemaString(valueType)}>"
case _ => // UDT and others
dataType.catalogString
}
}

/**
* New ORC File Format based on Apache ORC.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.execution.datasources.orc

import org.apache.hadoop.io._
import org.apache.orc.TypeDescription
import org.apache.orc.mapred.{OrcList, OrcMap, OrcStruct, OrcTimestamp}

import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -148,6 +147,8 @@ class OrcSerializer(dataSchema: StructType) {
result.setNanos(ts.getNanos)
result

case TimestampNTZType => (getter, ordinal) => OrcUtils.toOrcNTZ(getter.getLong(ordinal))

case DecimalType.Fixed(precision, scale) =>
OrcShimUtils.getHiveDecimalWritable(precision, scale)

Expand Down Expand Up @@ -214,6 +215,6 @@ class OrcSerializer(dataSchema: StructType) {
* Return a Orc value object for the given Spark schema.
*/
private def createOrcValue(dataType: DataType) = {
OrcStruct.createValue(TypeDescription.fromString(OrcFileFormat.getQuotedSchemaString(dataType)))
OrcStruct.createValue(OrcUtils.orcTypeDescription(dataType))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.orc

import java.nio.charset.StandardCharsets.UTF_8
import java.sql.Timestamp
import java.util.Locale

import scala.collection.JavaConverters._
Expand All @@ -28,6 +29,7 @@ import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hive.serde2.io.DateWritable
import org.apache.hadoop.io.{BooleanWritable, ByteWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, ShortWritable, WritableComparable}
import org.apache.orc.{BooleanColumnStatistics, ColumnStatistics, DateColumnStatistics, DoubleColumnStatistics, IntegerColumnStatistics, OrcConf, OrcFile, Reader, TypeDescription, Writer}
import org.apache.orc.mapred.OrcTimestamp

import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
Expand All @@ -36,7 +38,8 @@ import org.apache.spark.sql.{SPARK_VERSION_METADATA_KEY, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.{quoteIdentifier, CharVarcharUtils}
import org.apache.spark.sql.catalyst.util.{quoteIdentifier, CharVarcharUtils, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.connector.expressions.aggregate.{Aggregation, Count, CountStar, Max, Min}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.SchemaMergeUtils
Expand Down Expand Up @@ -196,7 +199,18 @@ object OrcUtils extends Logging {
requiredSchema: StructType,
reader: Reader,
conf: Configuration): Option[(Array[Int], Boolean)] = {
val orcFieldNames = reader.getSchema.getFieldNames.asScala
def checkTimestampCompatibility(orcCatalystSchema: StructType, dataSchema: StructType): Unit = {
orcCatalystSchema.fields.map(_.dataType).zip(dataSchema.fields.map(_.dataType)).foreach {
case (TimestampType, TimestampNTZType) =>
throw QueryExecutionErrors.cannotConvertOrcTimestampToTimestampNTZError()
case (t1: StructType, t2: StructType) => checkTimestampCompatibility(t1, t2)
case _ =>
}
}

val orcSchema = reader.getSchema
checkTimestampCompatibility(toCatalystSchema(orcSchema), dataSchema)
val orcFieldNames = orcSchema.getFieldNames.asScala
val forcePositionalEvolution = OrcConf.FORCE_POSITIONAL_EVOLUTION.getBoolean(conf)
if (orcFieldNames.isEmpty) {
// SPARK-8501: Some old empty ORC files always have an empty schema stored in their footer.
Expand Down Expand Up @@ -277,6 +291,7 @@ object OrcUtils extends Logging {
s"array<${orcTypeDescriptionString(a.elementType)}>"
case m: MapType =>
s"map<${orcTypeDescriptionString(m.keyType)},${orcTypeDescriptionString(m.valueType)}>"
case TimestampNTZType => TypeDescription.Category.TIMESTAMP.getName
case _: DayTimeIntervalType => LongType.catalogString
case _: YearMonthIntervalType => IntegerType.catalogString
case _ => dt.catalogString
Expand All @@ -286,15 +301,23 @@ object OrcUtils extends Logging {
def getInnerTypeDecription(dt: DataType): Option[TypeDescription] = {
dt match {
case y: YearMonthIntervalType =>
val typeDesc = orcTypeDescription(IntegerType)
val typeDesc = new TypeDescription(TypeDescription.Category.INT)
typeDesc.setAttribute(
CATALYST_TYPE_ATTRIBUTE_NAME, y.typeName)
Some(typeDesc)
case d: DayTimeIntervalType =>
val typeDesc = orcTypeDescription(LongType)
val typeDesc = new TypeDescription(TypeDescription.Category.LONG)
typeDesc.setAttribute(
CATALYST_TYPE_ATTRIBUTE_NAME, d.typeName)
Some(typeDesc)
case n: TimestampNTZType =>
val typeDesc = new TypeDescription(TypeDescription.Category.TIMESTAMP)
typeDesc.setAttribute(CATALYST_TYPE_ATTRIBUTE_NAME, n.typeName)
Some(typeDesc)
case t: TimestampType =>
val typeDesc = new TypeDescription(TypeDescription.Category.TIMESTAMP)
typeDesc.setAttribute(CATALYST_TYPE_ATTRIBUTE_NAME, t.typeName)
Some(typeDesc)
case _ => None
}
}
Expand Down Expand Up @@ -493,4 +516,17 @@ object OrcUtils extends Logging {
val orcValuesDeserializer = new OrcDeserializer(aggSchema, (0 until aggSchema.length).toArray)
orcValuesDeserializer.deserializeFromValues(aggORCValues)
}

def fromOrcNTZ(ts: Timestamp): Long = {
DateTimeUtils.millisToMicros(ts.getTime) +
(ts.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS
}

def toOrcNTZ(micros: Long): OrcTimestamp = {
val seconds = Math.floorDiv(micros, MICROS_PER_SECOND)
val nanos = (micros - seconds * MICROS_PER_SECOND) * NANOS_PER_MICROS
val result = new OrcTimestamp(seconds * MILLIS_PER_SECOND)
result.setNanos(nanos.toInt)
result
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.orc.mapred.OrcStruct

import org.apache.spark.sql.connector.write.LogicalWriteInfo
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.execution.datasources.orc.{OrcFileFormat, OrcOptions, OrcOutputWriter, OrcUtils}
import org.apache.spark.sql.execution.datasources.orc.{OrcOptions, OrcOutputWriter, OrcUtils}
import org.apache.spark.sql.execution.datasources.v2.FileWrite
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand All @@ -43,7 +43,7 @@ case class OrcWrite(

val conf = job.getConfiguration

conf.set(MAPRED_OUTPUT_SCHEMA.getAttribute, OrcFileFormat.getQuotedSchemaString(dataSchema))
conf.set(MAPRED_OUTPUT_SCHEMA.getAttribute, OrcUtils.orcTypeDescriptionString(dataSchema))

conf.set(COMPRESS.getAttribute, orcOptions.compressionCodec)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ class FileBasedDataSourceSuite extends QueryTest
test("SPARK-22790,SPARK-27668: spark.sql.sources.compressionFactor takes effect") {
Seq(1.0, 0.5).foreach { compressionFactor =>
withSQLConf(SQLConf.FILE_COMPRESSION_FACTOR.key -> compressionFactor.toString,
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "350") {
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "457") {
withTempPath { workDir =>
// the file size is 504 bytes
val workDirPath = workDir.getAbsolutePath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.orc
import java.io.File
import java.nio.charset.StandardCharsets
import java.sql.Timestamp
import java.time.{LocalDateTime, ZoneOffset}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -768,6 +769,67 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession {
}
}
}

test("Read/write all timestamp types") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add test for write TimestampLTZ, read with provided schema of TimestampNTZ

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

val data = (0 to 255).map { i =>
(new Timestamp(i), LocalDateTime.of(2019, 3, 21, 0, 2, 3, 456000000 + i))
} :+ (null, null)

withOrcFile(data) { file =>
withAllOrcReaders {
checkAnswer(spark.read.orc(file), data.toDF().collect())
}
}
}

test("SPARK-36346: can't read TimestampLTZ as TimestampNTZ") {
val data = (1 to 10).map { i =>
val ts = new Timestamp(i)
Row(ts)
}
val answer = (1 to 10).map { i =>
// The second parameter is `nanoOfSecond`, while java.sql.Timestamp accepts milliseconds
// as input. So here we multiple the `nanoOfSecond` by NANOS_PER_MILLIS
val ts = LocalDateTime.ofEpochSecond(0, i * 1000000, ZoneOffset.UTC)
Row(ts)
}
val actualSchema = StructType(Seq(StructField("time", TimestampType, false)))
val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, false)))

withTempPath { file =>
val df = spark.createDataFrame(sparkContext.parallelize(data), actualSchema)
df.write.orc(file.getCanonicalPath)
withAllOrcReaders {
val msg = intercept[SparkException] {
spark.read.schema(providedSchema).orc(file.getCanonicalPath).collect()
}.getMessage
assert(msg.contains("Unable to convert timestamp of Orc to data type 'timestamp_ntz'"))
}
}
}

test("SPARK-36346: read TimestampNTZ as TimestampLTZ") {
val data = (1 to 10).map { i =>
// The second parameter is `nanoOfSecond`, while java.sql.Timestamp accepts milliseconds
// as input. So here we multiple the `nanoOfSecond` by NANOS_PER_MILLIS
val ts = LocalDateTime.ofEpochSecond(0, i * 1000000, ZoneOffset.UTC)
Row(ts)
}
val answer = (1 to 10).map { i =>
val ts = new java.sql.Timestamp(i)
Row(ts)
}
val actualSchema = StructType(Seq(StructField("time", TimestampNTZType, false)))
val providedSchema = StructType(Seq(StructField("time", TimestampType, false)))

withTempPath { file =>
val df = spark.createDataFrame(sparkContext.parallelize(data), actualSchema)
df.write.orc(file.getCanonicalPath)
withAllOrcReaders {
checkAnswer(spark.read.schema(providedSchema).orc(file.getCanonicalPath), answer)
}
}
}
}

class OrcV1QuerySuite extends OrcQuerySuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ abstract class OrcTest extends QueryTest with FileBasedDataSourceTest with Befor
spark.read.orc(file.getAbsolutePath)
}

def withAllOrcReaders(code: => Unit): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, All.
This naming is misleading because this only tests native ORC reader.
Apache Spark provides hive ORC reader, too.

Instead of withAllOrcReaders, let's use withAllNativeOrcReaders.

// test the row-based reader
withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false")(code)
// test the vectorized reader
withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "true")(code)
}

/**
* Takes a sequence of products `data` to generate multi-level nested
* dataframes as new test data. It tests both non-nested and nested dataframes
Expand Down