From 3344bb839a278d8edb8a49177bfeebc6bdd4c71f Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Fri, 30 Jul 2021 19:15:07 +0800 Subject: [PATCH 01/23] Support TimestampNTZ type in Orc file source --- .../datasources/orc/OrcAtomicColumnVector.java | 10 ++++++++++ .../execution/datasources/orc/OrcDeserializer.scala | 3 +++ .../execution/datasources/orc/OrcFileFormat.scala | 1 + .../execution/datasources/orc/OrcSerializer.scala | 4 ++++ .../sql/execution/datasources/orc/OrcUtils.scala | 8 +++++++- .../execution/datasources/orc/OrcQuerySuite.scala | 13 ++++++++++--- 6 files changed, 35 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java index c2d8334d928c..a8c51fb23974 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java @@ -27,6 +27,7 @@ import org.apache.spark.sql.types.DateType; import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.types.TimestampType; +import org.apache.spark.sql.types.TimestampNTZType; import org.apache.spark.sql.vectorized.ColumnarArray; import org.apache.spark.sql.vectorized.ColumnarMap; import org.apache.spark.unsafe.types.UTF8String; @@ -36,6 +37,7 @@ */ public class OrcAtomicColumnVector extends OrcColumnVector { private final boolean isTimestamp; + private final boolean isTimestampNTZ; private final boolean isDate; // Column vector for each type. Only 1 is populated for any type. @@ -54,6 +56,12 @@ public class OrcAtomicColumnVector extends OrcColumnVector { isTimestamp = false; } + if (type instanceof TimestampNTZType) { + isTimestampNTZ = true; + } else { + isTimestampNTZ = false; + } + if (type instanceof DateType) { isDate = true; } else { @@ -105,6 +113,8 @@ public long getLong(int rowId) { int index = getRowIndex(rowId); if (isTimestamp) { return DateTimeUtils.fromJavaTimestamp(timestampData.asScratchTimestamp(index)); + } else if (isTimestampNTZ) { + return timestampData.asScratchTimestamp(index).getTime(); } else { return longData.vector[index]; } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala index fa8977f23916..c223f09efef2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala @@ -113,6 +113,9 @@ class OrcDeserializer( case TimestampType => (ordinal, value) => updater.setLong(ordinal, DateTimeUtils.fromJavaTimestamp(value.asInstanceOf[OrcTimestamp])) + case TimestampNTZType => (ordinal, value) => + updater.setLong(ordinal, value.asInstanceOf[OrcTimestamp].getTime) + case DecimalType.Fixed(precision, scale) => (ordinal, value) => val v = OrcShimUtils.getDecimal(value) v.changePrecision(precision, scale) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 85c0ff01cfba..98fbc5fae596 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -44,6 +44,7 @@ import org.apache.spark.util.{SerializableConfiguration, Utils} private[sql] object OrcFileFormat { def getQuotedSchemaString(dataType: DataType): String = dataType match { + case TimestampNTZType => OrcUtils.TIMESTAMP_WITH_LOCAL_TIME_ZONE case _: AtomicType => dataType.catalogString case StructType(fields) => fields.map(f => s"`${f.name}`:${getQuotedSchemaString(f.dataType)}") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala index ac32be2fe12b..69a680c83661 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala @@ -148,6 +148,10 @@ class OrcSerializer(dataSchema: StructType) { result.setNanos(ts.getNanos) result + case TimestampNTZType => (getter, ordinal) => + val result = new OrcTimestamp(getter.getLong(ordinal)) + result + case DecimalType.Fixed(precision, scale) => OrcShimUtils.getHiveDecimalWritable(precision, scale) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index a8647726fe02..673dfce85d16 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -49,6 +49,8 @@ object OrcUtils extends Logging { "LZ4" -> ".lz4", "LZO" -> ".lzo") + val TIMESTAMP_WITH_LOCAL_TIME_ZONE = "timestamp with local time zone" + def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = { val origPath = new Path(pathStr) val fs = origPath.getFileSystem(conf) @@ -85,10 +87,13 @@ object OrcUtils extends Logging { } private def toCatalystSchema(schema: TypeDescription): StructType = { + // The timestampNTZ type in Spark named "timestamp_ntz", but Orc is not. + val schemaStr = + schema.toString.replace(TIMESTAMP_WITH_LOCAL_TIME_ZONE, TimestampNTZType.typeName) // The Spark query engine has not completely supported CHAR/VARCHAR type yet, and here we // replace the orc CHAR/VARCHAR with STRING type. CharVarcharUtils.replaceCharVarcharWithStringInSchema( - CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType]) + CatalystSqlParser.parseDataType(schemaStr).asInstanceOf[StructType]) } def readSchema(sparkSession: SparkSession, files: Seq[FileStatus], options: Map[String, String]) @@ -230,6 +235,7 @@ object OrcUtils extends Logging { s"array<${orcTypeDescriptionString(a.elementType)}>" case m: MapType => s"map<${orcTypeDescriptionString(m.keyType)},${orcTypeDescriptionString(m.valueType)}>" + case TimestampNTZType => TIMESTAMP_WITH_LOCAL_TIME_ZONE case _ => dt.catalogString } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala index ead2c2cf1b70..c729b5e02d0c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.orc import java.io.File import java.nio.charset.StandardCharsets import java.sql.Timestamp +import java.time.LocalDateTime import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -49,6 +50,8 @@ case class AllDataTypesWithNonPrimitiveType( shortField: Short, byteField: Byte, booleanField: Boolean, + timestampField: Timestamp, + timestampNTZField: LocalDateTime, array: Seq[Int], arrayContainsNull: Seq[Option[Int]], map: Map[Int, Long], @@ -66,7 +69,8 @@ abstract class OrcQueryTest extends OrcTest { test("Read/write All Types") { val data = (0 to 255).map { i => - (s"$i", i, i.toLong, i.toFloat, i.toDouble, i.toShort, i.toByte, i % 2 == 0) + (s"$i", i, i.toLong, i.toFloat, i.toDouble, i.toShort, i.toByte, i % 2 == 0, + new Timestamp(i), LocalDateTime.of(2019, 3, 21, 0, 2, 3, 456000000 + i)) } withOrcFile(data) { file => @@ -87,6 +91,7 @@ abstract class OrcQueryTest extends OrcTest { val data: Seq[AllDataTypesWithNonPrimitiveType] = (0 to 255).map { i => AllDataTypesWithNonPrimitiveType( s"$i", i, i.toLong, i.toFloat, i.toDouble, i.toShort, i.toByte, i % 2 == 0, + new Timestamp(i), LocalDateTime.of(2019, 3, 21, 0, 2, 3, 456000000 + i), 0 until i, (0 until i).map(Option(_).filter(_ % 3 == 0)), (0 until i).map(i => i -> i.toLong).toMap, @@ -172,13 +177,15 @@ abstract class OrcQueryTest extends OrcTest { Option.empty[Long], Option.empty[Float], Option.empty[Double], - Option.empty[Boolean] + Option.empty[Boolean], + Option.empty[Timestamp], + Option.empty[LocalDateTime] ) :: Nil withOrcFile(data) { file => checkAnswer( spark.read.orc(file), - Row(Seq.fill(5)(null): _*)) + Row(Seq.fill(7)(null): _*)) } } From ef516eca57e9de3140cb44451afab31d40eb78ea Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Mon, 2 Aug 2021 12:22:04 +0800 Subject: [PATCH 02/23] Update code --- .../datasources/orc/OrcQuerySuite.scala | 53 +++++++++++++++---- 1 file changed, 44 insertions(+), 9 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala index c729b5e02d0c..0b860c537732 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala @@ -50,14 +50,15 @@ case class AllDataTypesWithNonPrimitiveType( shortField: Short, byteField: Byte, booleanField: Boolean, - timestampField: Timestamp, - timestampNTZField: LocalDateTime, array: Seq[Int], arrayContainsNull: Seq[Option[Int]], map: Map[Int, Long], mapValueContainsNull: Map[Int, Option[Long]], data: (Seq[Int], (Int, String))) +case class TimestampsWithNonPrimitiveType( + timestampField: Timestamp, timestampNTZField: LocalDateTime) + case class BinaryData(binaryData: Array[Byte]) case class Contact(name: String, phone: String) @@ -69,8 +70,7 @@ abstract class OrcQueryTest extends OrcTest { test("Read/write All Types") { val data = (0 to 255).map { i => - (s"$i", i, i.toLong, i.toFloat, i.toDouble, i.toShort, i.toByte, i % 2 == 0, - new Timestamp(i), LocalDateTime.of(2019, 3, 21, 0, 2, 3, 456000000 + i)) + (s"$i", i, i.toLong, i.toFloat, i.toDouble, i.toShort, i.toByte, i % 2 == 0) } withOrcFile(data) { file => @@ -91,7 +91,6 @@ abstract class OrcQueryTest extends OrcTest { val data: Seq[AllDataTypesWithNonPrimitiveType] = (0 to 255).map { i => AllDataTypesWithNonPrimitiveType( s"$i", i, i.toLong, i.toFloat, i.toDouble, i.toShort, i.toByte, i % 2 == 0, - new Timestamp(i), LocalDateTime.of(2019, 3, 21, 0, 2, 3, 456000000 + i), 0 until i, (0 until i).map(Option(_).filter(_ % 3 == 0)), (0 until i).map(i => i -> i.toLong).toMap, @@ -177,15 +176,13 @@ abstract class OrcQueryTest extends OrcTest { Option.empty[Long], Option.empty[Float], Option.empty[Double], - Option.empty[Boolean], - Option.empty[Timestamp], - Option.empty[LocalDateTime] + Option.empty[Boolean] ) :: Nil withOrcFile(data) { file => checkAnswer( spark.read.orc(file), - Row(Seq.fill(7)(null): _*)) + Row(Seq.fill(5)(null): _*)) } } @@ -720,6 +717,44 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession { } } } + + test("Read/write all timestamp types") { + val data = (0 to 255).map { i => + (new Timestamp(i), LocalDateTime.of(2019, 3, 21, 0, 2, 3, 456000000 + i)) + } + + withOrcFile(data) { file => + checkAnswer( + spark.read.orc(file), + data.toDF().collect()) + } + } + + test("Read/write all timestamp types with non-primitive type") { + val data: Seq[TimestampsWithNonPrimitiveType] = (0 to 255).map { i => + TimestampsWithNonPrimitiveType( + new Timestamp(i), LocalDateTime.of(2019, 3, 21, 0, 2, 3, 456000000 + i)) + } + + withOrcFile(data) { file => + checkAnswer( + spark.read.orc(file), + data.toDF().collect()) + } + } + + test("test for timestamp types: save and load case class RDD with `None`s as orc") { + val data = ( + Option.empty[Timestamp], + Option.empty[LocalDateTime] + ) :: Nil + + withOrcFile(data) { file => + checkAnswer( + spark.read.orc(file), + Row(Seq.fill(2)(null): _*)) + } + } } class OrcV1QuerySuite extends OrcQuerySuite { From a1aa093c778729b5a221f1c6fd307092dae05e6b Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Mon, 2 Aug 2021 14:36:09 +0800 Subject: [PATCH 03/23] Update code --- .../org/apache/spark/sql/hive/orc/OrcFileFormat.scala | 2 +- .../org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index c50ecf7a9360..c4d93ff133f8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -194,7 +194,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable } override def supportDataType(dataType: DataType): Boolean = dataType match { - case _: AtomicType => true + case at: AtomicType if !at.isInstanceOf[TimestampNTZType] => true case st: StructType => st.forall { f => supportDataType(f.dataType) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala index e94e0b39c859..d62ea0988462 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala @@ -143,6 +143,13 @@ class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton { spark.read.schema(schema).orc(orcDir).collect() }.getMessage assert(msg.contains("ORC data source does not support interval data type.")) + + msg = intercept[AnalysisException] { + val schema = StructType(StructField("a", TimestampNTZType, true) :: Nil) + spark.range(1).write.mode("overwrite").orc(orcDir) + spark.read.schema(schema).orc(orcDir).collect() + }.getMessage + assert(msg.contains("ORC data source does not support timestamp_ntz data type.")) } } From cd80e28aa7112b42a8c85cdad47bc08439e86e42 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Mon, 16 Aug 2021 16:15:46 +0800 Subject: [PATCH 04/23] Store spark data types in ORC metadata. --- .../datasources/orc/OrcFileFormat.scala | 3 +- .../datasources/orc/OrcOutputWriter.scala | 1 + .../execution/datasources/orc/OrcUtils.scala | 56 +++++++++++++------ .../scala/org/apache/spark/sql/package.scala | 5 ++ 4 files changed, 45 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 98fbc5fae596..f75dfdaa3dfb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -44,8 +44,6 @@ import org.apache.spark.util.{SerializableConfiguration, Utils} private[sql] object OrcFileFormat { def getQuotedSchemaString(dataType: DataType): String = dataType match { - case TimestampNTZType => OrcUtils.TIMESTAMP_WITH_LOCAL_TIME_ZONE - case _: AtomicType => dataType.catalogString case StructType(fields) => fields.map(f => s"`${f.name}`:${getQuotedSchemaString(f.dataType)}") .mkString("struct<", ",", ">") @@ -53,6 +51,7 @@ private[sql] object OrcFileFormat { s"array<${getQuotedSchemaString(elementType)}>" case MapType(keyType, valueType, _) => s"map<${getQuotedSchemaString(keyType)},${getQuotedSchemaString(valueType)}>" + case TimestampNTZType => TypeDescription.Category.TIMESTAMP.getName case _ => // UDT and others dataType.catalogString } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOutputWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOutputWriter.scala index 6f215737f570..846b0915e131 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOutputWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOutputWriter.scala @@ -47,6 +47,7 @@ private[sql] class OrcOutputWriter( val writer = OrcFile.createWriter(filename, options) val recordWriter = new OrcMapreduceRecordWriter[OrcStruct](writer) OrcUtils.addSparkVersionMetadata(writer) + OrcUtils.addSparkTypeMetadata(writer, dataSchema) recordWriter } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index 673dfce85d16..bfa385af37dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -29,7 +29,7 @@ import org.apache.orc.{OrcConf, OrcFile, Reader, TypeDescription, Writer} import org.apache.spark.SPARK_VERSION_SHORT import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging -import org.apache.spark.sql.{SPARK_VERSION_METADATA_KEY, SparkSession} +import org.apache.spark.sql.{SPARK_DATA_TYPE_METADATA_KEY, SPARK_VERSION_METADATA_KEY, SparkSession} import org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.{quoteIdentifier, CharVarcharUtils} @@ -49,8 +49,6 @@ object OrcUtils extends Logging { "LZ4" -> ".lz4", "LZO" -> ".lzo") - val TIMESTAMP_WITH_LOCAL_TIME_ZONE = "timestamp with local time zone" - def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = { val origPath = new Path(pathStr) val fs = origPath.getFileSystem(conf) @@ -63,23 +61,32 @@ object OrcUtils extends Logging { } def readSchema(file: Path, conf: Configuration, ignoreCorruptFiles: Boolean) - : Option[TypeDescription] = { + : (Option[TypeDescription], Option[StructType]) = { val fs = file.getFileSystem(conf) val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) try { - val schema = Utils.tryWithResource(OrcFile.createReader(file, readerOptions)) { reader => - reader.getSchema + val (schema, structTypeOpt) = + Utils.tryWithResource(OrcFile.createReader(file, readerOptions)) { reader => + val schema = reader.getSchema + val metadataKeys = reader.getMetadataKeys + if (metadataKeys.contains(SPARK_DATA_TYPE_METADATA_KEY)) { + val dataTypeMetadata = + UTF_8.decode(reader.getMetadataValue(SPARK_DATA_TYPE_METADATA_KEY)).toString + (schema, Some(DataType.fromJson(dataTypeMetadata).asInstanceOf[StructType])) + } else { + (schema, None) + } } if (schema.getFieldNames.size == 0) { - None + (None, structTypeOpt) } else { - Some(schema) + (Some(schema), structTypeOpt) } } catch { case e: org.apache.orc.FileFormatException => if (ignoreCorruptFiles) { logWarning(s"Skipped the footer in the corrupted file: $file", e) - None + (None, None) } else { throw QueryExecutionErrors.cannotReadFooterForFileError(file, e) } @@ -87,13 +94,10 @@ object OrcUtils extends Logging { } private def toCatalystSchema(schema: TypeDescription): StructType = { - // The timestampNTZ type in Spark named "timestamp_ntz", but Orc is not. - val schemaStr = - schema.toString.replace(TIMESTAMP_WITH_LOCAL_TIME_ZONE, TimestampNTZType.typeName) // The Spark query engine has not completely supported CHAR/VARCHAR type yet, and here we // replace the orc CHAR/VARCHAR with STRING type. CharVarcharUtils.replaceCharVarcharWithStringInSchema( - CatalystSqlParser.parseDataType(schemaStr).asInstanceOf[StructType]) + CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType]) } def readSchema(sparkSession: SparkSession, files: Seq[FileStatus], options: Map[String, String]) @@ -101,7 +105,10 @@ object OrcUtils extends Logging { val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles val conf = sparkSession.sessionState.newHadoopConfWithOptions(options) files.toIterator.map(file => readSchema(file.getPath, conf, ignoreCorruptFiles)).collectFirst { - case Some(schema) => + case (_, Some(structType)) => + logDebug(s"Reading schema from file $files, got Spark schema string: $structType") + structType + case (Some(schema), None) => logDebug(s"Reading schema from file $files, got Hive schema string: $schema") toCatalystSchema(schema) } @@ -112,9 +119,11 @@ object OrcUtils extends Logging { conf: Configuration, ignoreCorruptFiles: Boolean): Option[StructType] = { readSchema(file, conf, ignoreCorruptFiles) match { - case Some(schema) => Some(toCatalystSchema(schema)) + case (_, Some(structType)) => Some(structType) + + case (Some(schema), None) => Some(toCatalystSchema(schema)) - case None => + case (None, None) => // Field names is empty or `FileFormatException` was thrown but ignoreCorruptFiles is true. None } @@ -127,7 +136,11 @@ object OrcUtils extends Logging { def readOrcSchemasInParallel( files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean): Seq[StructType] = { ThreadUtils.parmap(files, "readingOrcSchemas", 8) { currentFile => - OrcUtils.readSchema(currentFile.getPath, conf, ignoreCorruptFiles).map(toCatalystSchema) + OrcUtils.readSchema(currentFile.getPath, conf, ignoreCorruptFiles) match { + case (_, Some(structType)) => Some(structType) + case (Some(schema), None) => Some(toCatalystSchema(schema)) + case (None, None) => None + } }.flatten } @@ -214,6 +227,13 @@ object OrcUtils extends Logging { } } + /** + * Add Spark metadata. + */ + def addSparkTypeMetadata(writer: Writer, structType: StructType): Unit = { + writer.addUserMetadata(SPARK_DATA_TYPE_METADATA_KEY, UTF_8.encode(structType.json)) + } + /** * Add a metadata specifying Spark version. */ @@ -235,7 +255,7 @@ object OrcUtils extends Logging { s"array<${orcTypeDescriptionString(a.elementType)}>" case m: MapType => s"map<${orcTypeDescriptionString(m.keyType)},${orcTypeDescriptionString(m.valueType)}>" - case TimestampNTZType => TIMESTAMP_WITH_LOCAL_TIME_ZONE + case TimestampNTZType => TypeDescription.Category.TIMESTAMP.getName case _ => dt.catalogString } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/package.scala index 022fecf1ae41..86836e9b42d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/package.scala @@ -55,6 +55,11 @@ package object sql { */ private[sql] val SPARK_VERSION_METADATA_KEY = "org.apache.spark.version" + /** + * Metadata key which is used to write Spark data types in the ORC file metadata. + */ + private[sql] val SPARK_DATA_TYPE_METADATA_KEY = "org.apache.spark.dataType" + /** * Parquet/Avro file metadata key to indicate that the file was written with legacy datetime * values. From 42fa8e412eb9f28c96b367b3630ce617d3c7f9a9 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Mon, 16 Aug 2021 18:20:32 +0800 Subject: [PATCH 05/23] Update code --- .../spark/sql/execution/datasources/orc/OrcFileFormat.scala | 2 +- .../spark/sql/execution/datasources/orc/OrcUtils.scala | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 006ded36c609..d9db2301c871 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -52,7 +52,7 @@ private[sql] object OrcFileFormat { case MapType(keyType, valueType, _) => s"map<${getQuotedSchemaString(keyType)},${getQuotedSchemaString(valueType)}>" case TimestampNTZType => TypeDescription.Category.TIMESTAMP.getName - case _ => // UDT and others + case _ => dataType.catalogString } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index b2e7697361ad..0b5c0ebf7c16 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -228,7 +228,11 @@ object OrcUtils extends Logging { } /** - * Add Spark metadata. + * Add a metadata specifying Spark data types. + * + * Note: Spark writes Timestamp and TimestampNTZ as timestamp without time zone into ORC. + * In order to be able to distinguish between the two, we write the Spark data types + * into ORC metadata so that it can be read correctly. */ def addSparkTypeMetadata(writer: Writer, structType: StructType): Unit = { writer.addUserMetadata(SPARK_DATA_TYPE_METADATA_KEY, UTF_8.encode(structType.json)) From ab8d9854130b3312b2414da749cf1ae0d9950093 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Tue, 17 Aug 2021 12:28:31 +0800 Subject: [PATCH 06/23] Update code --- .../apache/spark/sql/execution/datasources/orc/OrcUtils.scala | 4 +++- .../scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index 0b5c0ebf7c16..4e3fe8632547 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -235,7 +235,9 @@ object OrcUtils extends Logging { * into ORC metadata so that it can be read correctly. */ def addSparkTypeMetadata(writer: Writer, structType: StructType): Unit = { - writer.addUserMetadata(SPARK_DATA_TYPE_METADATA_KEY, UTF_8.encode(structType.json)) + // Because all types in ORC can take null values, we set nullable to true. + val typeJson = structType.json.replaceAll(s""""nullable":false""", s""""nullable":true""") + writer.addUserMetadata(SPARK_DATA_TYPE_METADATA_KEY, UTF_8.encode(typeJson)) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 243639222f3e..4669bbfa90d3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -695,7 +695,7 @@ class FileBasedDataSourceSuite extends QueryTest test("SPARK-22790,SPARK-27668: spark.sql.sources.compressionFactor takes effect") { Seq(1.0, 0.5).foreach { compressionFactor => withSQLConf(SQLConf.FILE_COMPRESSION_FACTOR.key -> compressionFactor.toString, - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "250") { + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "357") { withTempPath { workDir => // the file size is 486 bytes val workDirPath = workDir.getAbsolutePath From 7d19c1e94bbd033e304bd9e1d7a4ad74286a38b4 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Tue, 17 Aug 2021 17:17:43 +0800 Subject: [PATCH 07/23] Update code --- .../execution/datasources/orc/OrcUtils.scala | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index 4e3fe8632547..9d103053bef4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -67,21 +67,21 @@ object OrcUtils extends Logging { try { val (schema, structTypeOpt) = Utils.tryWithResource(OrcFile.createReader(file, readerOptions)) { reader => - val schema = reader.getSchema - val metadataKeys = reader.getMetadataKeys - if (metadataKeys.contains(SPARK_DATA_TYPE_METADATA_KEY)) { - val dataTypeMetadata = - UTF_8.decode(reader.getMetadataValue(SPARK_DATA_TYPE_METADATA_KEY)).toString - (schema, Some(DataType.fromJson(dataTypeMetadata).asInstanceOf[StructType])) + val schema = reader.getSchema + val metadataKeys = reader.getMetadataKeys + if (metadataKeys.contains(SPARK_DATA_TYPE_METADATA_KEY)) { + val dataTypeMetadata = + UTF_8.decode(reader.getMetadataValue(SPARK_DATA_TYPE_METADATA_KEY)).toString + (schema, Some(DataType.fromJson(dataTypeMetadata).asInstanceOf[StructType])) + } else { + (schema, None) + } + } + if (schema.getFieldNames.size == 0) { + (None, structTypeOpt) } else { - (schema, None) + (Some(schema), structTypeOpt) } - } - if (schema.getFieldNames.size == 0) { - (None, structTypeOpt) - } else { - (Some(schema), structTypeOpt) - } } catch { case e: org.apache.orc.FileFormatException => if (ignoreCorruptFiles) { From b0acc90e1fd01b0255ac312673be56e5ddee2ff7 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Wed, 18 Aug 2021 13:29:22 +0800 Subject: [PATCH 08/23] Update code --- sql/core/src/test/resources/sql-tests/inputs/timestamp.sql | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/core/src/test/resources/sql-tests/inputs/timestamp.sql b/sql/core/src/test/resources/sql-tests/inputs/timestamp.sql index a55adb060b5b..0bc77a8c971f 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/timestamp.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/timestamp.sql @@ -22,6 +22,9 @@ SELECT make_timestamp(1, 1, 1, 1, 1, 1); SELECT make_timestamp(1, 1, 1, 1, 1, 60); SELECT make_timestamp(1, 1, 1, 1, 1, 61); SELECT make_timestamp(1, 1, 1, 1, 1, null); +SELECT make_timestamp(1, 1, 1, 1, 1, 59.999999); +SELECT make_timestamp(1, 1, 1, 1, 1, 99.999999); +SELECT make_timestamp(1, 1, 1, 1, 1, 999.999999); -- [SPARK-31710] TIMESTAMP_SECONDS, TIMESTAMP_MILLISECONDS and TIMESTAMP_MICROSECONDS that always create timestamp_ltz select TIMESTAMP_SECONDS(1230219000),TIMESTAMP_SECONDS(-1230219000),TIMESTAMP_SECONDS(null); From 3c69b9cbacfb4d4fb51851e1dd1de53dd3b7f16a Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Wed, 18 Aug 2021 14:00:46 +0800 Subject: [PATCH 09/23] Update code --- sql/core/src/test/resources/sql-tests/inputs/timestamp.sql | 3 --- 1 file changed, 3 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/inputs/timestamp.sql b/sql/core/src/test/resources/sql-tests/inputs/timestamp.sql index 0bc77a8c971f..a55adb060b5b 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/timestamp.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/timestamp.sql @@ -22,9 +22,6 @@ SELECT make_timestamp(1, 1, 1, 1, 1, 1); SELECT make_timestamp(1, 1, 1, 1, 1, 60); SELECT make_timestamp(1, 1, 1, 1, 1, 61); SELECT make_timestamp(1, 1, 1, 1, 1, null); -SELECT make_timestamp(1, 1, 1, 1, 1, 59.999999); -SELECT make_timestamp(1, 1, 1, 1, 1, 99.999999); -SELECT make_timestamp(1, 1, 1, 1, 1, 999.999999); -- [SPARK-31710] TIMESTAMP_SECONDS, TIMESTAMP_MILLISECONDS and TIMESTAMP_MICROSECONDS that always create timestamp_ltz select TIMESTAMP_SECONDS(1230219000),TIMESTAMP_SECONDS(-1230219000),TIMESTAMP_SECONDS(null); From 3549dba29186737e076438a46c9017028290416e Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Thu, 11 Nov 2021 15:03:31 +0800 Subject: [PATCH 10/23] Update code --- .../datasources/orc/OrcFileFormat.scala | 1 - .../execution/datasources/orc/OrcUtils.scala | 12 ++++--- .../datasources/orc/OrcQuerySuite.scala | 36 ++++++++++++------- .../spark/sql/hive/orc/OrcFileFormat.scala | 2 +- .../sql/hive/orc/HiveOrcSourceSuite.scala | 7 ---- 5 files changed, 32 insertions(+), 26 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 9ed6714ee6db..6fda96750959 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -47,7 +47,6 @@ private[sql] object OrcFileFormat { def getQuotedSchemaString(dataType: DataType): String = dataType match { case _: DayTimeIntervalType => LongType.catalogString case _: YearMonthIntervalType => IntegerType.catalogString - case _: AtomicType => dataType.catalogString case StructType(fields) => fields.map(f => s"`${f.name}`:${getQuotedSchemaString(f.dataType)}") .mkString("struct<", ",", ">") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index 0eb13657dc57..953d2dfdb904 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -84,11 +84,11 @@ object OrcUtils extends Logging { (schema, None) } } - if (schema.getFieldNames.size == 0) { - (None, structTypeOpt) - } else { - (Some(schema), structTypeOpt) - } + if (schema.getFieldNames.size == 0) { + (None, structTypeOpt) + } else { + (Some(schema), structTypeOpt) + } } catch { case e: org.apache.orc.FileFormatException => if (ignoreCorruptFiles) { @@ -359,6 +359,8 @@ object OrcUtils extends Logging { case None => result.addChild(orcTypeDescription(m.valueType)) } result + case TimestampNTZType => + TypeDescription.fromString(TypeDescription.Category.TIMESTAMP.getName) case other => TypeDescription.fromString(other.catalogString) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala index 2068a7b53e5c..432adce53c83 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala @@ -778,10 +778,14 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession { (new Timestamp(i), LocalDateTime.of(2019, 3, 21, 0, 2, 3, 456000000 + i)) } - withOrcFile(data) { file => - checkAnswer( - spark.read.orc(file), - data.toDF().collect()) + Seq("true", "false").foreach { key => + withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> key) { + withOrcFile(data) { file => + checkAnswer( + spark.read.orc(file), + data.toDF().collect()) + } + } } } @@ -791,10 +795,14 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession { new Timestamp(i), LocalDateTime.of(2019, 3, 21, 0, 2, 3, 456000000 + i)) } - withOrcFile(data) { file => - checkAnswer( - spark.read.orc(file), - data.toDF().collect()) + Seq("true", "false").foreach { key => + withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> key) { + withOrcFile(data) { file => + checkAnswer( + spark.read.orc(file), + data.toDF().collect()) + } + } } } @@ -804,10 +812,14 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession { Option.empty[LocalDateTime] ) :: Nil - withOrcFile(data) { file => - checkAnswer( - spark.read.orc(file), - Row(Seq.fill(2)(null): _*)) + Seq("true", "false").foreach { key => + withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> key) { + withOrcFile(data) { file => + checkAnswer( + spark.read.orc(file), + Row(Seq.fill(2)(null): _*)) + } + } } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 59d0f3932ccd..40924ecbc934 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -194,7 +194,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable } override def supportDataType(dataType: DataType): Boolean = dataType match { - case _: AnsiIntervalType | _: TimestampNTZType => false + case _: AnsiIntervalType => false case _: AtomicType => true diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala index 74f2ec16e1cc..574281a65775 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala @@ -143,13 +143,6 @@ class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton { spark.read.schema(schema).orc(orcDir).collect() }.getMessage assert(msg.contains("ORC data source does not support interval data type.")) - - msg = intercept[AnalysisException] { - val schema = StructType(StructField("a", TimestampNTZType, true) :: Nil) - spark.range(1).write.mode("overwrite").orc(orcDir) - spark.read.schema(schema).orc(orcDir).collect() - }.getMessage - assert(msg.contains("ORC data source does not support timestamp_ntz data type.")) } } From 282c64834bf1629e1c627df88767958cd4cfda74 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Thu, 11 Nov 2021 15:13:29 +0800 Subject: [PATCH 11/23] Update code --- .../apache/spark/sql/execution/datasources/orc/OrcUtils.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index 953d2dfdb904..eb2c5d87ecdc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -81,6 +81,8 @@ object OrcUtils extends Logging { UTF_8.decode(reader.getMetadataValue(SPARK_DATA_TYPE_METADATA_KEY)).toString (schema, Some(DataType.fromJson(dataTypeMetadata).asInstanceOf[StructType])) } else { + // If metadata keys of ORC not contains `org.apache.spark.dataType`, + // it means the timestamp written by the legacy code and we respect the ORC schema. (schema, None) } } From 0e0d44c604922c542f6dade1962cfc8057e5e298 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Thu, 11 Nov 2021 21:19:14 +0800 Subject: [PATCH 12/23] Update code --- .../datasources/orc/OrcOutputWriter.scala | 1 - .../execution/datasources/orc/OrcUtils.scala | 69 ++++++------------- .../scala/org/apache/spark/sql/package.scala | 5 -- 3 files changed, 22 insertions(+), 53 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOutputWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOutputWriter.scala index 16e6a3b473b6..fe057e0ddfc4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOutputWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOutputWriter.scala @@ -48,7 +48,6 @@ private[sql] class OrcOutputWriter( val writer = OrcFile.createWriter(filename, options) val recordWriter = new OrcMapreduceRecordWriter[OrcStruct](writer) OrcUtils.addSparkVersionMetadata(writer) - OrcUtils.addSparkTypeMetadata(writer, dataSchema) recordWriter } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index eb2c5d87ecdc..1cd43e62668e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -32,7 +32,7 @@ import org.apache.orc.{BooleanColumnStatistics, ColumnStatistics, DateColumnStat import org.apache.spark.{SPARK_VERSION_SHORT, SparkException} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging -import org.apache.spark.sql.{SPARK_DATA_TYPE_METADATA_KEY, SPARK_VERSION_METADATA_KEY, SparkSession} +import org.apache.spark.sql.{SPARK_VERSION_METADATA_KEY, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution import org.apache.spark.sql.catalyst.parser.CatalystSqlParser @@ -68,34 +68,23 @@ object OrcUtils extends Logging { } def readSchema(file: Path, conf: Configuration, ignoreCorruptFiles: Boolean) - : (Option[TypeDescription], Option[StructType]) = { + : Option[TypeDescription] = { val fs = file.getFileSystem(conf) val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) try { - val (schema, structTypeOpt) = - Utils.tryWithResource(OrcFile.createReader(file, readerOptions)) { reader => - val schema = reader.getSchema - val metadataKeys = reader.getMetadataKeys - if (metadataKeys.contains(SPARK_DATA_TYPE_METADATA_KEY)) { - val dataTypeMetadata = - UTF_8.decode(reader.getMetadataValue(SPARK_DATA_TYPE_METADATA_KEY)).toString - (schema, Some(DataType.fromJson(dataTypeMetadata).asInstanceOf[StructType])) - } else { - // If metadata keys of ORC not contains `org.apache.spark.dataType`, - // it means the timestamp written by the legacy code and we respect the ORC schema. - (schema, None) - } - } + val schema = Utils.tryWithResource(OrcFile.createReader(file, readerOptions)) { reader => + reader.getSchema + } if (schema.getFieldNames.size == 0) { - (None, structTypeOpt) + None } else { - (Some(schema), structTypeOpt) + Some(schema) } } catch { case e: org.apache.orc.FileFormatException => if (ignoreCorruptFiles) { logWarning(s"Skipped the footer in the corrupted file: $file", e) - (None, None) + None } else { throw QueryExecutionErrors.cannotReadFooterForFileError(file, e) } @@ -154,10 +143,7 @@ object OrcUtils extends Logging { val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles val conf = sparkSession.sessionState.newHadoopConfWithOptions(options) files.toIterator.map(file => readSchema(file.getPath, conf, ignoreCorruptFiles)).collectFirst { - case (_, Some(structType)) => - logDebug(s"Reading schema from file $files, got Spark schema string: $structType") - structType - case (Some(schema), None) => + case Some(schema) => logDebug(s"Reading schema from file $files, got Hive schema string: $schema") toCatalystSchema(schema) } @@ -168,11 +154,9 @@ object OrcUtils extends Logging { conf: Configuration, ignoreCorruptFiles: Boolean): Option[StructType] = { readSchema(file, conf, ignoreCorruptFiles) match { - case (_, Some(structType)) => Some(structType) - - case (Some(schema), None) => Some(toCatalystSchema(schema)) + case Some(schema) => Some(toCatalystSchema(schema)) - case (None, None) => + case None => // Field names is empty or `FileFormatException` was thrown but ignoreCorruptFiles is true. None } @@ -185,11 +169,7 @@ object OrcUtils extends Logging { def readOrcSchemasInParallel( files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean): Seq[StructType] = { ThreadUtils.parmap(files, "readingOrcSchemas", 8) { currentFile => - OrcUtils.readSchema(currentFile.getPath, conf, ignoreCorruptFiles) match { - case (_, Some(structType)) => Some(structType) - case (Some(schema), None) => Some(toCatalystSchema(schema)) - case (None, None) => None - } + OrcUtils.readSchema(currentFile.getPath, conf, ignoreCorruptFiles).map(toCatalystSchema) }.flatten } @@ -276,19 +256,6 @@ object OrcUtils extends Logging { } } - /** - * Add a metadata specifying Spark data types. - * - * Note: Spark writes Timestamp and TimestampNTZ as timestamp without time zone into ORC. - * In order to be able to distinguish between the two, we write the Spark data types - * into ORC metadata so that it can be read correctly. - */ - def addSparkTypeMetadata(writer: Writer, structType: StructType): Unit = { - // Because all types in ORC can take null values, we set nullable to true. - val typeJson = structType.json.replaceAll(s""""nullable":false""", s""""nullable":true""") - writer.addUserMetadata(SPARK_DATA_TYPE_METADATA_KEY, UTF_8.encode(typeJson)) - } - /** * Add a metadata specifying Spark version. */ @@ -329,6 +296,16 @@ object OrcUtils extends Logging { typeDesc.setAttribute( CATALYST_TYPE_ATTRIBUTE_NAME, d.typeName) Some(typeDesc) + case n: TimestampNTZType => + val typeDesc = new TypeDescription(TypeDescription.Category.TIMESTAMP) + typeDesc.setAttribute( + CATALYST_TYPE_ATTRIBUTE_NAME, n.typeName) + Some(typeDesc) + case t: TimestampType => + val typeDesc = new TypeDescription(TypeDescription.Category.TIMESTAMP) + typeDesc.setAttribute( + CATALYST_TYPE_ATTRIBUTE_NAME, t.typeName) + Some(typeDesc) case _ => None } } @@ -361,8 +338,6 @@ object OrcUtils extends Logging { case None => result.addChild(orcTypeDescription(m.valueType)) } result - case TimestampNTZType => - TypeDescription.fromString(TypeDescription.Category.TIMESTAMP.getName) case other => TypeDescription.fromString(other.catalogString) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/package.scala index 86836e9b42d4..022fecf1ae41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/package.scala @@ -55,11 +55,6 @@ package object sql { */ private[sql] val SPARK_VERSION_METADATA_KEY = "org.apache.spark.version" - /** - * Metadata key which is used to write Spark data types in the ORC file metadata. - */ - private[sql] val SPARK_DATA_TYPE_METADATA_KEY = "org.apache.spark.dataType" - /** * Parquet/Avro file metadata key to indicate that the file was written with legacy datetime * values. From 91331c76a9f211ce1ec91aeb3d95d082de8e6f98 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Sat, 13 Nov 2021 09:39:38 +0800 Subject: [PATCH 13/23] Update code --- .../datasources/orc/OrcFileFormat.scala | 18 ---------- .../datasources/orc/OrcSerializer.scala | 5 ++- .../execution/datasources/orc/OrcUtils.scala | 4 +-- .../datasources/v2/orc/OrcWrite.scala | 4 +-- .../datasources/orc/OrcQuerySuite.scala | 36 +------------------ 5 files changed, 7 insertions(+), 60 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 6fda96750959..ce851c58cc4f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -42,24 +42,6 @@ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.{SerializableConfiguration, Utils} -private[sql] object OrcFileFormat { - - def getQuotedSchemaString(dataType: DataType): String = dataType match { - case _: DayTimeIntervalType => LongType.catalogString - case _: YearMonthIntervalType => IntegerType.catalogString - case StructType(fields) => - fields.map(f => s"`${f.name}`:${getQuotedSchemaString(f.dataType)}") - .mkString("struct<", ",", ">") - case ArrayType(elementType, _) => - s"array<${getQuotedSchemaString(elementType)}>" - case MapType(keyType, valueType, _) => - s"map<${getQuotedSchemaString(keyType)},${getQuotedSchemaString(valueType)}>" - case TimestampNTZType => TypeDescription.Category.TIMESTAMP.getName - case _ => - dataType.catalogString - } -} - /** * New ORC File Format based on Apache ORC. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala index 6d833f96eb4e..8d500d44b4bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala @@ -149,8 +149,7 @@ class OrcSerializer(dataSchema: StructType) { result case TimestampNTZType => (getter, ordinal) => - val result = new OrcTimestamp(getter.getLong(ordinal)) - result + new OrcTimestamp(getter.getLong(ordinal)) case DecimalType.Fixed(precision, scale) => OrcShimUtils.getHiveDecimalWritable(precision, scale) @@ -218,6 +217,6 @@ class OrcSerializer(dataSchema: StructType) { * Return a Orc value object for the given Spark schema. */ private def createOrcValue(dataType: DataType) = { - OrcStruct.createValue(TypeDescription.fromString(OrcFileFormat.getQuotedSchemaString(dataType))) + OrcStruct.createValue(TypeDescription.fromString(OrcUtils.orcTypeDescriptionString(dataType))) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index 1cd43e62668e..95a7d31f939b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -287,12 +287,12 @@ object OrcUtils extends Logging { def getInnerTypeDecription(dt: DataType): Option[TypeDescription] = { dt match { case y: YearMonthIntervalType => - val typeDesc = orcTypeDescription(IntegerType) + val typeDesc = new TypeDescription(TypeDescription.Category.INT) typeDesc.setAttribute( CATALYST_TYPE_ATTRIBUTE_NAME, y.typeName) Some(typeDesc) case d: DayTimeIntervalType => - val typeDesc = orcTypeDescription(LongType) + val typeDesc = new TypeDescription(TypeDescription.Category.LONG) typeDesc.setAttribute( CATALYST_TYPE_ATTRIBUTE_NAME, d.typeName) Some(typeDesc) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala index 286e87108053..1ac9266e8d5e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala @@ -23,7 +23,7 @@ import org.apache.orc.mapred.OrcStruct import org.apache.spark.sql.connector.write.LogicalWriteInfo import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} -import org.apache.spark.sql.execution.datasources.orc.{OrcFileFormat, OrcOptions, OrcOutputWriter, OrcUtils} +import org.apache.spark.sql.execution.datasources.orc.{OrcOptions, OrcOutputWriter, OrcUtils} import org.apache.spark.sql.execution.datasources.v2.FileWrite import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -43,7 +43,7 @@ case class OrcWrite( val conf = job.getConfiguration - conf.set(MAPRED_OUTPUT_SCHEMA.getAttribute, OrcFileFormat.getQuotedSchemaString(dataSchema)) + conf.set(MAPRED_OUTPUT_SCHEMA.getAttribute, OrcUtils.orcTypeDescriptionString(dataSchema)) conf.set(COMPRESS.getAttribute, orcOptions.compressionCodec) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala index 432adce53c83..a690a1d5a994 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala @@ -776,24 +776,7 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession { test("Read/write all timestamp types") { val data = (0 to 255).map { i => (new Timestamp(i), LocalDateTime.of(2019, 3, 21, 0, 2, 3, 456000000 + i)) - } - - Seq("true", "false").foreach { key => - withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> key) { - withOrcFile(data) { file => - checkAnswer( - spark.read.orc(file), - data.toDF().collect()) - } - } - } - } - - test("Read/write all timestamp types with non-primitive type") { - val data: Seq[TimestampsWithNonPrimitiveType] = (0 to 255).map { i => - TimestampsWithNonPrimitiveType( - new Timestamp(i), LocalDateTime.of(2019, 3, 21, 0, 2, 3, 456000000 + i)) - } + } :+ (null, null) Seq("true", "false").foreach { key => withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> key) { @@ -805,23 +788,6 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession { } } } - - test("test for timestamp types: save and load case class RDD with `None`s as orc") { - val data = ( - Option.empty[Timestamp], - Option.empty[LocalDateTime] - ) :: Nil - - Seq("true", "false").foreach { key => - withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> key) { - withOrcFile(data) { file => - checkAnswer( - spark.read.orc(file), - Row(Seq.fill(2)(null): _*)) - } - } - } - } } class OrcV1QuerySuite extends OrcQuerySuite { From cffe76ba0c6bcfa1423c520c38a48b366ef2cc1c Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Mon, 15 Nov 2021 20:02:37 +0800 Subject: [PATCH 14/23] Update sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala Co-authored-by: Wenchen Fan --- .../apache/spark/sql/execution/datasources/orc/OrcUtils.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index 95a7d31f939b..bc90deeed887 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -298,8 +298,7 @@ object OrcUtils extends Logging { Some(typeDesc) case n: TimestampNTZType => val typeDesc = new TypeDescription(TypeDescription.Category.TIMESTAMP) - typeDesc.setAttribute( - CATALYST_TYPE_ATTRIBUTE_NAME, n.typeName) + typeDesc.setAttribute(CATALYST_TYPE_ATTRIBUTE_NAME, n.typeName) Some(typeDesc) case t: TimestampType => val typeDesc = new TypeDescription(TypeDescription.Category.TIMESTAMP) From 7c273d57d2e2c94f42c111b641d128e7e519515c Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Mon, 15 Nov 2021 20:02:53 +0800 Subject: [PATCH 15/23] Update sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala Co-authored-by: Wenchen Fan --- .../apache/spark/sql/execution/datasources/orc/OrcUtils.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index bc90deeed887..4648ce5feb74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -302,8 +302,7 @@ object OrcUtils extends Logging { Some(typeDesc) case t: TimestampType => val typeDesc = new TypeDescription(TypeDescription.Category.TIMESTAMP) - typeDesc.setAttribute( - CATALYST_TYPE_ATTRIBUTE_NAME, t.typeName) + typeDesc.setAttribute(CATALYST_TYPE_ATTRIBUTE_NAME, t.typeName) Some(typeDesc) case _ => None } From 9243f3b8c09eeed710668e157f26864ab573a9bd Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Mon, 15 Nov 2021 20:03:39 +0800 Subject: [PATCH 16/23] Update sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala Co-authored-by: Wenchen Fan --- .../spark/sql/execution/datasources/orc/OrcSerializer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala index 8d500d44b4bd..9b1b16541e88 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala @@ -217,6 +217,6 @@ class OrcSerializer(dataSchema: StructType) { * Return a Orc value object for the given Spark schema. */ private def createOrcValue(dataType: DataType) = { - OrcStruct.createValue(TypeDescription.fromString(OrcUtils.orcTypeDescriptionString(dataType))) + OrcStruct.createValue(OrcUtils.orcTypeDescription(dataType)) } } From 108c9249680e4a53b4fb90afcf37fe61541fec18 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Mon, 15 Nov 2021 20:03:52 +0800 Subject: [PATCH 17/23] Update sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala Co-authored-by: Wenchen Fan --- .../spark/sql/execution/datasources/orc/OrcQuerySuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala index a690a1d5a994..45c52a21e7e1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala @@ -781,9 +781,7 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession { Seq("true", "false").foreach { key => withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> key) { withOrcFile(data) { file => - checkAnswer( - spark.read.orc(file), - data.toDF().collect()) + checkAnswer(spark.read.orc(file), data.toDF().collect()) } } } From 625bd9f7c82f9fe48aa81fd23044d08fb1708d13 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Tue, 16 Nov 2021 00:04:37 +0800 Subject: [PATCH 18/23] Update code --- .../orc/OrcAtomicColumnVector.java | 4 +-- .../datasources/orc/OrcDeserializer.scala | 5 +-- .../datasources/orc/OrcSerializer.scala | 6 +--- .../datasources/orc/OrcQuerySuite.scala | 33 +++++++++++++++---- .../execution/datasources/orc/OrcTest.scala | 7 ++++ 5 files changed, 37 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java index a8c51fb23974..9dd14b40c665 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java @@ -111,10 +111,8 @@ public int getInt(int rowId) { @Override public long getLong(int rowId) { int index = getRowIndex(rowId); - if (isTimestamp) { + if (isTimestamp || isTimestampNTZ) { return DateTimeUtils.fromJavaTimestamp(timestampData.asScratchTimestamp(index)); - } else if (isTimestampNTZ) { - return timestampData.asScratchTimestamp(index).getTime(); } else { return longData.vector[index]; } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala index d1f558d0f637..88b58022688b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala @@ -126,12 +126,9 @@ class OrcDeserializer( case DateType => (ordinal, value) => updater.setInt(ordinal, OrcShimUtils.getGregorianDays(value)) - case TimestampType => (ordinal, value) => + case TimestampType | TimestampNTZType => (ordinal, value) => updater.setLong(ordinal, DateTimeUtils.fromJavaTimestamp(value.asInstanceOf[OrcTimestamp])) - case TimestampNTZType => (ordinal, value) => - updater.setLong(ordinal, value.asInstanceOf[OrcTimestamp].getTime) - case DecimalType.Fixed(precision, scale) => (ordinal, value) => val v = OrcShimUtils.getDecimal(value) v.changePrecision(precision, scale) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala index 9b1b16541e88..e51c852fc27c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.datasources.orc import org.apache.hadoop.io._ -import org.apache.orc.TypeDescription import org.apache.orc.mapred.{OrcList, OrcMap, OrcStruct, OrcTimestamp} import org.apache.spark.sql.catalyst.InternalRow @@ -142,15 +141,12 @@ class OrcSerializer(dataSchema: StructType) { // The following cases are already expensive, reusing object or not doesn't matter. - case TimestampType => (getter, ordinal) => + case TimestampType | TimestampNTZType => (getter, ordinal) => val ts = DateTimeUtils.toJavaTimestamp(getter.getLong(ordinal)) val result = new OrcTimestamp(ts.getTime) result.setNanos(ts.getNanos) result - case TimestampNTZType => (getter, ordinal) => - new OrcTimestamp(getter.getLong(ordinal)) - case DecimalType.Fixed(precision, scale) => OrcShimUtils.getHiveDecimalWritable(precision, scale) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala index 45c52a21e7e1..295937e474b4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.orc import java.io.File import java.nio.charset.StandardCharsets import java.sql.Timestamp -import java.time.LocalDateTime +import java.time.{LocalDateTime, ZoneOffset} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -778,11 +778,32 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession { (new Timestamp(i), LocalDateTime.of(2019, 3, 21, 0, 2, 3, 456000000 + i)) } :+ (null, null) - Seq("true", "false").foreach { key => - withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> key) { - withOrcFile(data) { file => - checkAnswer(spark.read.orc(file), data.toDF().collect()) - } + withAllOrcReaders { + withOrcFile(data) { file => + checkAnswer(spark.read.orc(file), data.toDF().collect()) + } + } + } + + test("SPARK-36346: read TimestampLTZ as TimestampNTZ") { + val data = (1 to 10).map { i => + val ts = new Timestamp(i) + Row(ts) + } + val answer = (1 to 10).map { i => + // The second parameter is `nanoOfSecond`, while java.sql.Timestamp accepts milliseconds + // as input. So here we multiple the `nanoOfSecond` by NANOS_PER_MILLIS + val ts = LocalDateTime.ofEpochSecond(0, i * 1000000, ZoneOffset.UTC) + Row(ts) + } + val actualSchema = StructType(Seq(StructField("time", TimestampType, false))) + val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, false))) + + withTempPath { file => + val df = spark.createDataFrame(sparkContext.parallelize(data), actualSchema) + df.write.orc(file.getCanonicalPath) + withAllOrcReaders { + checkAnswer(spark.read.schema(providedSchema).orc(file.getCanonicalPath), answer) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala index 4243318ac1dd..cd87374e8574 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala @@ -143,6 +143,13 @@ abstract class OrcTest extends QueryTest with FileBasedDataSourceTest with Befor spark.read.orc(file.getAbsolutePath) } + def withAllOrcReaders(code: => Unit): Unit = { + // test the row-based reader + withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false")(code) + // test the vectorized reader + withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "true")(code) + } + /** * Takes a sequence of products `data` to generate multi-level nested * dataframes as new test data. It tests both non-nested and nested dataframes From 691a9a86401853208a6680ab176fedc6e9aa5854 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Tue, 16 Nov 2021 00:08:06 +0800 Subject: [PATCH 19/23] Update code --- .../spark/sql/execution/datasources/orc/OrcQuerySuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala index 295937e474b4..46c5d2879926 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala @@ -778,8 +778,8 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession { (new Timestamp(i), LocalDateTime.of(2019, 3, 21, 0, 2, 3, 456000000 + i)) } :+ (null, null) - withAllOrcReaders { - withOrcFile(data) { file => + withOrcFile(data) { file => + withAllOrcReaders { checkAnswer(spark.read.orc(file), data.toDF().collect()) } } From 8e66926dc1e93d4b2a0fff457b3466fd4012336c Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Wed, 17 Nov 2021 11:33:22 +0800 Subject: [PATCH 20/23] Update code --- .../orc/OrcAtomicColumnVector.java | 8 ++++++- .../datasources/orc/OrcDeserializer.scala | 8 ++++++- .../datasources/orc/OrcSerializer.scala | 15 +++++++++++- .../datasources/orc/OrcQuerySuite.scala | 23 +++++++++++++++++++ 4 files changed, 51 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java index 9dd14b40c665..d67f02208efa 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java @@ -18,10 +18,12 @@ package org.apache.spark.sql.execution.datasources.orc; import java.math.BigDecimal; +import java.sql.Timestamp; import org.apache.hadoop.hive.ql.exec.vector.*; import org.apache.spark.sql.catalyst.util.DateTimeUtils; +import org.apache.spark.sql.catalyst.util.DateTimeConstants; import org.apache.spark.sql.catalyst.util.RebaseDateTime; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DateType; @@ -111,8 +113,12 @@ public int getInt(int rowId) { @Override public long getLong(int rowId) { int index = getRowIndex(rowId); - if (isTimestamp || isTimestampNTZ) { + if (isTimestamp) { return DateTimeUtils.fromJavaTimestamp(timestampData.asScratchTimestamp(index)); + } else if (isTimestampNTZ) { + Timestamp ts = timestampData.asScratchTimestamp(index); + return DateTimeUtils.millisToMicros(ts.getTime()) + + (ts.getNanos() / DateTimeConstants.NANOS_PER_MICROS) % DateTimeConstants.MICROS_PER_MILLIS; } else { return longData.vector[index]; } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala index 88b58022688b..0545094c34a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala @@ -23,6 +23,7 @@ import org.apache.orc.mapred.{OrcList, OrcMap, OrcStruct, OrcTimestamp} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData} import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.catalyst.util.DateTimeConstants.{MICROS_PER_MILLIS, NANOS_PER_MICROS} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -126,9 +127,14 @@ class OrcDeserializer( case DateType => (ordinal, value) => updater.setInt(ordinal, OrcShimUtils.getGregorianDays(value)) - case TimestampType | TimestampNTZType => (ordinal, value) => + case TimestampType => (ordinal, value) => updater.setLong(ordinal, DateTimeUtils.fromJavaTimestamp(value.asInstanceOf[OrcTimestamp])) + case TimestampNTZType => (ordinal, value) => + val ts = value.asInstanceOf[OrcTimestamp] + updater.setLong(ordinal, DateTimeUtils.millisToMicros(ts.getTime) + + (ts.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS) + case DecimalType.Fixed(precision, scale) => (ordinal, value) => val v = OrcShimUtils.getDecimal(value) v.changePrecision(precision, scale) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala index e51c852fc27c..d261e1d9bf44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala @@ -17,12 +17,15 @@ package org.apache.spark.sql.execution.datasources.orc +import java.sql.Timestamp + import org.apache.hadoop.io._ import org.apache.orc.mapred.{OrcList, OrcMap, OrcStruct, OrcTimestamp} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.catalyst.util.DateTimeConstants.{MICROS_PER_SECOND, MILLIS_PER_SECOND, NANOS_PER_MICROS} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types._ @@ -141,12 +144,22 @@ class OrcSerializer(dataSchema: StructType) { // The following cases are already expensive, reusing object or not doesn't matter. - case TimestampType | TimestampNTZType => (getter, ordinal) => + case TimestampType => (getter, ordinal) => val ts = DateTimeUtils.toJavaTimestamp(getter.getLong(ordinal)) val result = new OrcTimestamp(ts.getTime) result.setNanos(ts.getNanos) result + case TimestampNTZType => (getter, ordinal) => + val micros = getter.getLong(ordinal) + val seconds = Math.floorDiv(micros, MICROS_PER_SECOND) + val ts = new Timestamp(seconds * MILLIS_PER_SECOND) + val nanos = (micros - seconds * MICROS_PER_SECOND) * NANOS_PER_MICROS + ts.setNanos(nanos.toInt) + val result = new OrcTimestamp(ts.getTime) + result.setNanos(ts.getNanos) + result + case DecimalType.Fixed(precision, scale) => OrcShimUtils.getHiveDecimalWritable(precision, scale) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala index 46c5d2879926..f19d54262f47 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala @@ -807,6 +807,29 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession { } } } + + test("SPARK-36346: read TimestampNTZ as TimestampLTZ") { + val data = (1 to 10).map { i => + // The second parameter is `nanoOfSecond`, while java.sql.Timestamp accepts milliseconds + // as input. So here we multiple the `nanoOfSecond` by NANOS_PER_MILLIS + val ts = LocalDateTime.ofEpochSecond(0, i * 1000000, ZoneOffset.UTC) + Row(ts) + } + val answer = (1 to 10).map { i => + val ts = new java.sql.Timestamp(i) + Row(ts) + } + val actualSchema = StructType(Seq(StructField("time", TimestampNTZType, false))) + val providedSchema = StructType(Seq(StructField("time", TimestampType, false))) + + withTempPath { file => + val df = spark.createDataFrame(sparkContext.parallelize(data), actualSchema) + df.write.orc(file.getCanonicalPath) + withAllOrcReaders { + checkAnswer(spark.read.schema(providedSchema).orc(file.getCanonicalPath), answer) + } + } + } } class OrcV1QuerySuite extends OrcQuerySuite { From 51d7651eb4f522bfa7701bb91b546849a6b689e6 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Wed, 17 Nov 2021 13:46:54 +0800 Subject: [PATCH 21/23] Update code --- .../spark/sql/errors/QueryExecutionErrors.scala | 4 ++++ .../sql/execution/datasources/orc/OrcUtils.scala | 13 ++++++++++++- .../execution/datasources/orc/OrcQuerySuite.scala | 7 +++++-- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 9696c3a0b6e1..fb6f379db640 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -1889,4 +1889,8 @@ object QueryExecutionErrors { def hiveTableWithAnsiIntervalsError(tableName: String): Throwable = { new UnsupportedOperationException(s"Hive table $tableName with ANSI intervals is not supported") } + + def cannotConvertOrcTimestampToTimestampNTZError(): Throwable = { + new RuntimeException("Unable to convert timestamp of Orc to data type 'timestamp_ntz'") + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index 4648ce5feb74..828b5752bd70 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -196,7 +196,18 @@ object OrcUtils extends Logging { requiredSchema: StructType, reader: Reader, conf: Configuration): Option[(Array[Int], Boolean)] = { - val orcFieldNames = reader.getSchema.getFieldNames.asScala + def checkTimestampCompatibility(orcCatalystSchema: StructType, dataSchema: StructType): Unit = { + orcCatalystSchema.fields.map(_.dataType).zip(dataSchema.fields.map(_.dataType)).foreach { + case (TimestampType, TimestampNTZType) => + throw QueryExecutionErrors.cannotConvertOrcTimestampToTimestampNTZError() + case (t1: StructType, t2: StructType) => checkTimestampCompatibility(t1, t2) + case _ => + } + } + + val orcSchema = reader.getSchema + checkTimestampCompatibility(toCatalystSchema(orcSchema), dataSchema) + val orcFieldNames = orcSchema.getFieldNames.asScala val forcePositionalEvolution = OrcConf.FORCE_POSITIONAL_EVOLUTION.getBoolean(conf) if (orcFieldNames.isEmpty) { // SPARK-8501: Some old empty ORC files always have an empty schema stored in their footer. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala index f19d54262f47..5b143e6b46a6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala @@ -785,7 +785,7 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession { } } - test("SPARK-36346: read TimestampLTZ as TimestampNTZ") { + test("SPARK-36346: can't read TimestampLTZ as TimestampNTZ") { val data = (1 to 10).map { i => val ts = new Timestamp(i) Row(ts) @@ -803,7 +803,10 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession { val df = spark.createDataFrame(sparkContext.parallelize(data), actualSchema) df.write.orc(file.getCanonicalPath) withAllOrcReaders { - checkAnswer(spark.read.schema(providedSchema).orc(file.getCanonicalPath), answer) + val msg = intercept[SparkException] { + spark.read.schema(providedSchema).orc(file.getCanonicalPath).collect() + }.getMessage + assert(msg.contains("Unable to convert timestamp of Orc to data type 'timestamp_ntz'")) } } } From 3b70990a6484b17146ac84073e059f7ad671c351 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Wed, 17 Nov 2021 13:52:29 +0800 Subject: [PATCH 22/23] Update code --- .../spark/sql/execution/datasources/orc/OrcQuerySuite.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala index 5b143e6b46a6..2d6978a81024 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala @@ -58,9 +58,6 @@ case class AllDataTypesWithNonPrimitiveType( mapValueContainsNull: Map[Int, Option[Long]], data: (Seq[Int], (Int, String))) -case class TimestampsWithNonPrimitiveType( - timestampField: Timestamp, timestampNTZField: LocalDateTime) - case class BinaryData(binaryData: Array[Byte]) case class Contact(name: String, phone: String) From 2c213c398530c5d4d0e95fa7eedb44179fe58045 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Wed, 17 Nov 2021 21:39:30 +0800 Subject: [PATCH 23/23] Update code --- .../datasources/orc/OrcAtomicColumnVector.java | 6 +----- .../datasources/orc/OrcDeserializer.scala | 5 +---- .../datasources/orc/OrcSerializer.scala | 13 +------------ .../execution/datasources/orc/OrcUtils.scala | 18 +++++++++++++++++- 4 files changed, 20 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java index d67f02208efa..b4f7b9924715 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java @@ -18,12 +18,10 @@ package org.apache.spark.sql.execution.datasources.orc; import java.math.BigDecimal; -import java.sql.Timestamp; import org.apache.hadoop.hive.ql.exec.vector.*; import org.apache.spark.sql.catalyst.util.DateTimeUtils; -import org.apache.spark.sql.catalyst.util.DateTimeConstants; import org.apache.spark.sql.catalyst.util.RebaseDateTime; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DateType; @@ -116,9 +114,7 @@ public long getLong(int rowId) { if (isTimestamp) { return DateTimeUtils.fromJavaTimestamp(timestampData.asScratchTimestamp(index)); } else if (isTimestampNTZ) { - Timestamp ts = timestampData.asScratchTimestamp(index); - return DateTimeUtils.millisToMicros(ts.getTime()) + - (ts.getNanos() / DateTimeConstants.NANOS_PER_MICROS) % DateTimeConstants.MICROS_PER_MILLIS; + return OrcUtils.fromOrcNTZ(timestampData.asScratchTimestamp(index)); } else { return longData.vector[index]; } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala index 0545094c34a8..7ab556e330e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala @@ -23,7 +23,6 @@ import org.apache.orc.mapred.{OrcList, OrcMap, OrcStruct, OrcTimestamp} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData} import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.catalyst.util.DateTimeConstants.{MICROS_PER_MILLIS, NANOS_PER_MICROS} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -131,9 +130,7 @@ class OrcDeserializer( updater.setLong(ordinal, DateTimeUtils.fromJavaTimestamp(value.asInstanceOf[OrcTimestamp])) case TimestampNTZType => (ordinal, value) => - val ts = value.asInstanceOf[OrcTimestamp] - updater.setLong(ordinal, DateTimeUtils.millisToMicros(ts.getTime) + - (ts.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS) + updater.setLong(ordinal, OrcUtils.fromOrcNTZ(value.asInstanceOf[OrcTimestamp])) case DecimalType.Fixed(precision, scale) => (ordinal, value) => val v = OrcShimUtils.getDecimal(value) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala index d261e1d9bf44..edd505273963 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala @@ -17,15 +17,12 @@ package org.apache.spark.sql.execution.datasources.orc -import java.sql.Timestamp - import org.apache.hadoop.io._ import org.apache.orc.mapred.{OrcList, OrcMap, OrcStruct, OrcTimestamp} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.catalyst.util.DateTimeConstants.{MICROS_PER_SECOND, MILLIS_PER_SECOND, NANOS_PER_MICROS} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types._ @@ -150,15 +147,7 @@ class OrcSerializer(dataSchema: StructType) { result.setNanos(ts.getNanos) result - case TimestampNTZType => (getter, ordinal) => - val micros = getter.getLong(ordinal) - val seconds = Math.floorDiv(micros, MICROS_PER_SECOND) - val ts = new Timestamp(seconds * MILLIS_PER_SECOND) - val nanos = (micros - seconds * MICROS_PER_SECOND) * NANOS_PER_MICROS - ts.setNanos(nanos.toInt) - val result = new OrcTimestamp(ts.getTime) - result.setNanos(ts.getNanos) - result + case TimestampNTZType => (getter, ordinal) => OrcUtils.toOrcNTZ(getter.getLong(ordinal)) case DecimalType.Fixed(precision, scale) => OrcShimUtils.getHiveDecimalWritable(precision, scale) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index 828b5752bd70..ec161e9e55dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.orc import java.nio.charset.StandardCharsets.UTF_8 +import java.sql.Timestamp import java.util.Locale import scala.collection.JavaConverters._ @@ -28,6 +29,7 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hive.serde2.io.DateWritable import org.apache.hadoop.io.{BooleanWritable, ByteWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, ShortWritable, WritableComparable} import org.apache.orc.{BooleanColumnStatistics, ColumnStatistics, DateColumnStatistics, DoubleColumnStatistics, IntegerColumnStatistics, OrcConf, OrcFile, Reader, TypeDescription, Writer} +import org.apache.orc.mapred.OrcTimestamp import org.apache.spark.{SPARK_VERSION_SHORT, SparkException} import org.apache.spark.deploy.SparkHadoopUtil @@ -36,7 +38,8 @@ import org.apache.spark.sql.{SPARK_VERSION_METADATA_KEY, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.catalyst.util.{quoteIdentifier, CharVarcharUtils} +import org.apache.spark.sql.catalyst.util.{quoteIdentifier, CharVarcharUtils, DateTimeUtils} +import org.apache.spark.sql.catalyst.util.DateTimeConstants._ import org.apache.spark.sql.connector.expressions.aggregate.{Aggregation, Count, CountStar, Max, Min} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.SchemaMergeUtils @@ -513,4 +516,17 @@ object OrcUtils extends Logging { val orcValuesDeserializer = new OrcDeserializer(aggSchema, (0 until aggSchema.length).toArray) orcValuesDeserializer.deserializeFromValues(aggORCValues) } + + def fromOrcNTZ(ts: Timestamp): Long = { + DateTimeUtils.millisToMicros(ts.getTime) + + (ts.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS + } + + def toOrcNTZ(micros: Long): OrcTimestamp = { + val seconds = Math.floorDiv(micros, MICROS_PER_SECOND) + val nanos = (micros - seconds * MICROS_PER_SECOND) * NANOS_PER_MICROS + val result = new OrcTimestamp(seconds * MILLIS_PER_SECOND) + result.setNanos(nanos.toInt) + result + } }