Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
// The capacity of vectorized batch.
private int capacity;

// If the Orc file to be read is written by Spark 3.3 or after, use UTC timestamp.
private boolean useUTCTimestamp;

// Vectorized ORC Row Batch wrap.
private VectorizedRowBatchWrap wrap;

Expand All @@ -74,8 +77,9 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
// The wrapped ORC column vectors.
private org.apache.spark.sql.vectorized.ColumnVector[] orcVectorWrappers;

public OrcColumnarBatchReader(int capacity) {
public OrcColumnarBatchReader(int capacity, boolean useUTCTimestamp) {
this.capacity = capacity;
this.useUTCTimestamp = useUTCTimestamp;
}


Expand Down Expand Up @@ -124,7 +128,8 @@ public void initialize(
fileSplit.getPath(),
OrcFile.readerOptions(conf)
.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
.filesystem(fileSplit.getPath().getFileSystem(conf)));
.filesystem(fileSplit.getPath().getFileSystem(conf))
.useUTCTimestamp(useUTCTimestamp));
Reader.Options options =
OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart(), fileSplit.getLength());
recordReader = reader.rows(options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,11 @@ class OrcFileFormat

val fs = filePath.getFileSystem(conf)
val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
val resultedColPruneInfo =
val (resultedColPruneInfo, isOldOrcFile) =
Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader =>
OrcUtils.requestedColumnIds(
isCaseSensitive, dataSchema, requiredSchema, reader, conf)
(OrcUtils.requestedColumnIds(
isCaseSensitive, dataSchema, requiredSchema, reader, conf),
OrcUtils.isOldOrcFile(reader.getSchema))
}

if (resultedColPruneInfo.isEmpty) {
Expand Down Expand Up @@ -174,7 +175,7 @@ class OrcFileFormat
val taskAttemptContext = new TaskAttemptContextImpl(taskConf, attemptId)

if (enableVectorizedReader) {
val batchReader = new OrcColumnarBatchReader(capacity)
val batchReader = new OrcColumnarBatchReader(capacity, !isOldOrcFile)
// SPARK-23399 Register a task completion listener first to call `close()` in all cases.
// There is a possibility that `initialize` and `initBatch` hit some errors (like OOM)
// after opening a file.
Expand All @@ -193,8 +194,8 @@ class OrcFileFormat

iter.asInstanceOf[Iterator[InternalRow]]
} else {
val orcRecordReader = new OrcInputFormat[OrcStruct]
.createRecordReader(fileSplit, taskAttemptContext)
val orcRecordReader =
OrcUtils.createRecordReader[OrcStruct](fileSplit, taskAttemptContext, !isOldOrcFile)
val iter = new RecordReaderIterator[OrcStruct](orcRecordReader)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ private[sql] class OrcOutputWriter(
val filename = orcOutputFormat.getDefaultWorkFile(context, ".orc")
val options = OrcMapRedOutputFormat.buildOptions(context.getConfiguration)
options.setSchema(OrcUtils.orcTypeDescription(dataSchema))
options.useUTCTimestamp(true)
val writer = OrcFile.createWriter(filename, options)
val recordWriter = new OrcMapreduceRecordWriter[OrcStruct](writer)
OrcUtils.addSparkVersionMetadata(writer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hive.serde2.io.DateWritable
import org.apache.hadoop.io.{BooleanWritable, ByteWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, ShortWritable, WritableComparable}
import org.apache.orc.{BooleanColumnStatistics, ColumnStatistics, DateColumnStatistics, DoubleColumnStatistics, IntegerColumnStatistics, OrcConf, OrcFile, Reader, TypeDescription, Writer}
import org.apache.hadoop.mapreduce.{InputSplit, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.orc.{mapreduce, BooleanColumnStatistics, ColumnStatistics, DateColumnStatistics, DoubleColumnStatistics, IntegerColumnStatistics, OrcConf, OrcFile, Reader, TypeDescription, Writer}
import org.apache.orc.mapred.OrcTimestamp

import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
Expand Down Expand Up @@ -142,6 +144,50 @@ object OrcUtils extends Logging {
CharVarcharUtils.replaceCharVarcharWithStringInSchema(toStructType(schema))
}

/**
* Judge the Orc file be read is write by Spark 3.1 or prior.
*/
def isOldOrcFile(schema: TypeDescription): Boolean = {
import TypeDescription.Category

def find(orcType: TypeDescription): Boolean = {
orcType.getCategory match {
case Category.STRUCT => findInStruct(orcType)
case Category.LIST => findInArray(orcType)
case Category.MAP => findInMap(orcType)
case Category.TIMESTAMP =>
if (orcType.getAttributeValue(CATALYST_TYPE_ATTRIBUTE_NAME) == null) {
true
} else {
false
}
case _ => false
}
}

def findInStruct(orcType: TypeDescription): Boolean = {
val fieldTypes = orcType.getChildren.asScala
for (fieldType <- fieldTypes) {
if (find(fieldType)) {
return true
}
}
false
}

def findInArray(orcType: TypeDescription): Boolean = {
val elementType = orcType.getChildren.get(0)
find(elementType)
}

def findInMap(orcType: TypeDescription): Boolean = {
val Seq(keyType, valueType) = orcType.getChildren.asScala.toSeq
find(keyType) || find(valueType)
}

find(schema)
}

def readSchema(sparkSession: SparkSession, files: Seq[FileStatus], options: Map[String, String])
: Option[StructType] = {
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
Expand Down Expand Up @@ -542,4 +588,24 @@ object OrcUtils extends Logging {
result.setNanos(nanos.toInt)
result
}

/**
* This method references createRecordReader of OrcInputFormat.
* Just for call useUTCTimestamp of OrcFile.ReaderOptions.
*
* @return OrcMapreduceRecordReader
*/
def createRecordReader[V <: WritableComparable[_]](
inputSplit: InputSplit,
taskAttemptContext: TaskAttemptContext,
useUTCTimestamp: Boolean): mapreduce.OrcMapreduceRecordReader[V] = {
val split = inputSplit.asInstanceOf[FileSplit]
val conf = taskAttemptContext.getConfiguration()
val readOptions = OrcFile.readerOptions(conf)
.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)).useUTCTimestamp(useUTCTimestamp)
val file = OrcFile.createReader(split.getPath(), readOptions)
val options = org.apache.orc.mapred.OrcInputFormat.buildOptions(
conf, file, split.getStart(), split.getLength()).useSelected(true)
new mapreduce.OrcMapreduceRecordReader(file, options)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,11 @@ case class OrcPartitionReaderFactory(
}
val filePath = new Path(new URI(file.filePath))

val resultedColPruneInfo =
val (resultedColPruneInfo, isOldOrcFile) =
Utils.tryWithResource(createORCReader(filePath, conf)) { reader =>
OrcUtils.requestedColumnIds(
isCaseSensitive, dataSchema, readDataSchema, reader, conf)
(OrcUtils.requestedColumnIds(
isCaseSensitive, dataSchema, readDataSchema, reader, conf),
OrcUtils.isOldOrcFile(reader.getSchema))
}

if (resultedColPruneInfo.isEmpty) {
Expand All @@ -108,8 +109,8 @@ case class OrcPartitionReaderFactory(
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val taskAttemptContext = new TaskAttemptContextImpl(taskConf, attemptId)

val orcRecordReader = new OrcInputFormat[OrcStruct]
.createRecordReader(fileSplit, taskAttemptContext)
val orcRecordReader =
OrcUtils.createRecordReader[OrcStruct](fileSplit, taskAttemptContext, !isOldOrcFile)
val deserializer = new OrcDeserializer(readDataSchema, requestedColIds)
val fileReader = new PartitionReader[InternalRow] {
override def next(): Boolean = orcRecordReader.nextKeyValue()
Expand All @@ -131,10 +132,11 @@ case class OrcPartitionReaderFactory(
}
val filePath = new Path(new URI(file.filePath))

val resultedColPruneInfo =
val (resultedColPruneInfo, isOldOrcFile) =
Utils.tryWithResource(createORCReader(filePath, conf)) { reader =>
OrcUtils.requestedColumnIds(
isCaseSensitive, dataSchema, readDataSchema, reader, conf)
(OrcUtils.requestedColumnIds(
isCaseSensitive, dataSchema, readDataSchema, reader, conf),
OrcUtils.isOldOrcFile(reader.getSchema))
}

if (resultedColPruneInfo.isEmpty) {
Expand All @@ -152,7 +154,7 @@ case class OrcPartitionReaderFactory(
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val taskAttemptContext = new TaskAttemptContextImpl(taskConf, attemptId)

val batchReader = new OrcColumnarBatchReader(capacity)
val batchReader = new OrcColumnarBatchReader(capacity, !isOldOrcFile)
batchReader.initialize(fileSplit, taskAttemptContext)
val requestedPartitionColIds =
Array.fill(readDataSchema.length)(-1) ++ Range(0, partitionSchema.length)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class OrcColumnarBatchReaderSuite extends QueryTest with SharedSparkSession {
requestedDataColIds: Array[Int],
requestedPartitionColIds: Array[Int],
resultFields: Array[StructField]): OrcColumnarBatchReader = {
val reader = new OrcColumnarBatchReader(4096)
val reader = new OrcColumnarBatchReader(4096, true)
reader.initBatch(
orcFileSchema,
resultFields,
Expand Down Expand Up @@ -121,7 +121,7 @@ class OrcColumnarBatchReaderSuite extends QueryTest with SharedSparkSession {
val fileSplit = new FileSplit(new Path(file.getCanonicalPath), 0L, file.length, Array.empty)
val taskConf = sqlContext.sessionState.newHadoopConf()
val orcFileSchema = TypeDescription.fromString(schema.simpleString)
val vectorizedReader = new OrcColumnarBatchReader(4096)
val vectorizedReader = new OrcColumnarBatchReader(4096, true)
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val taskAttemptContext = new TaskAttemptContextImpl(taskConf, attemptId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.File
import java.nio.charset.StandardCharsets
import java.sql.Timestamp
import java.time.{LocalDateTime, ZoneOffset}
import java.util.TimeZone

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -830,6 +831,36 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession {
}
}
}

test("SPARK-37463: read/write Timestamp ntz to Orc with different time zone") {
val localTimeZone = TimeZone.getDefault
try {
TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))

val sqlText = """
|select
| timestamp_ntz '2021-06-01 00:00:00' ts_ntz1,
| timestamp_ntz '1883-11-16 00:00:00.0' as ts_ntz2,
| timestamp_ntz '2021-03-14 02:15:00.0' as ts_ntz3,
| timestamp_ntz'1996-10-27T09:10:25.088353' as ts_ntz4
|""".stripMargin

val df = sql(sqlText)

df.write.mode("overwrite").orc("ts_ntz_orc")

val query = "select * from `orc`.`ts_ntz_orc`"

Seq("America/Los_Angeles", "UTC", "Europe/Amsterdam").foreach { tz =>
TimeZone.setDefault(TimeZone.getTimeZone(tz))
withAllNativeOrcReaders {
checkAnswer(sql(query), df)
}
}
} finally {
TimeZone.setDefault(localTimeZone)
}
}
}

class OrcV1QuerySuite extends OrcQuerySuite {
Expand Down