Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,15 @@ Configuration of Parquet can be done using the `setConf` method on SQLContext or
flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems.
</td>
</tr>
<tr>
<td><code>spark.sql.parquet.int96AsTimestamp</code></td>
<td>true</td>
<td>
Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. Spark would also
store Timestamp as INT96 because we need to avoid precision lost of the nanoseconds field. This
flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems.
</td>
</tr>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm probably OK with this property. Just wanna ask doesn't Impala store original type information (TIMESTAMP) together in Parquet metainfo?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do they? As far as I can tell the parquet spec does not have a nanosecond
precision timestamp type.
On Jan 20, 2015 12:26 AM, "Cheng Lian" [email protected] wrote:

In docs/sql-programming-guide.md
#3820 (diff):

@@ -581,6 +581,15 @@ Configuration of Parquet can be done using the setConf method on SQLContext or

- spark.sql.parquet.int96AsTimestamp - true - - Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. Spark would also - store Timestamp as INT96 because we need to avoid precision lost of the nanoseconds field. This - flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems. - +

I'm probably OK with this property. Just wanna ask doesn't Impala store
original type information (TIMESTAMP) together in Parquet metainfo?


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/3820/files#r23207993.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see the difference here. Double checked, Parquet only provides TIMESTAMP and TIMESTAMP_MILLIS

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree that it's weird though. Perhaps we should ask the parquet
list why they don't support the int 96 version.
On Jan 20, 2015 11:21 AM, "Cheng Lian" [email protected] wrote:

In docs/sql-programming-guide.md
#3820 (diff):

@@ -581,6 +581,15 @@ Configuration of Parquet can be done using the setConf method on SQLContext or

- spark.sql.parquet.int96AsTimestamp - true - - Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. Spark would also - store Timestamp as INT96 because we need to avoid precision lost of the nanoseconds field. This - flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems. - +

Oh, I see the difference here. Double checked, Parquet only provides
TIMESTAMP and TIMESTAMP_MILLIS


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/3820/files#r23247492.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my digging only Parquet-format 2.2 has the TIMESTAMP and TIMESTAMP_MILLS types. Cloudera is still on 1.5.0
Hive/Impala has been writing this INT96 nano sec format that's different.

--- Original Message ---

From: "Michael Armbrust" [email protected]
Sent: January 20, 2015 11:25 AM
To: "apache/spark" [email protected]
Cc: "Felix Cheung" [email protected]
Subject: Re: [spark] [SPARK-4987] [SQL] parquet timestamp type support (#3820)

@@ -581,6 +581,15 @@ Configuration of Parquet can be done using the setConf method on SQLContext or

- spark.sql.parquet.int96AsTimestamp - true - - Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. Spark would also - store Timestamp as INT96 because we need to avoid precision lost of the nanoseconds field. This - flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems. - +

Yeah, I agree that it's weird though. Perhaps we should ask the parquet
list why they don't support the int 96 version.
On Jan 20, 2015 11:21 AM, "Cheng Lian" [email protected] wrote:

In docs/sql-programming-guide.md
#3820 (diff):

@@ -581,6 +581,15 @@ Configuration of Parquet can be done using the setConf method on SQLContext or

- spark.sql.parquet.int96AsTimestamp - true - - Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. Spark would also - store Timestamp as INT96 because we need to avoid precision lost of the nanoseconds field. This - flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems. - +

Oh, I see the difference here. Double checked, Parquet only provides
TIMESTAMP and TIMESTAMP_MILLIS


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/3820/files#r23247492.


Reply to this email directly or view it on GitHub:
https://github.com/apache/spark/pull/3820/files#r23247815

<tr>
<td><code>spark.sql.parquet.cacheMetadata</code></td>
<td>true</td>
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@
<scala.binary.version>2.10</scala.binary.version>
<jline.version>${scala.version}</jline.version>
<jline.groupid>org.scala-lang</jline.groupid>
<jodd.version>3.6.3</jodd.version>
<codehaus.jackson.version>1.8.8</codehaus.jackson.version>
<snappy.version>1.1.1.6</snappy.version>

Expand Down
5 changes: 5 additions & 0 deletions sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@
<artifactId>jackson-databind</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.jodd</groupId>
<artifactId>jodd-core</artifactId>
<version>${jodd.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
7 changes: 7 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ private[spark] object SQLConf {
val DIALECT = "spark.sql.dialect"

val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
val PARQUET_INT96_AS_TIMESTAMP = "spark.sql.parquet.int96AsTimestamp"
val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec"
val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown"
Expand Down Expand Up @@ -140,6 +141,12 @@ private[sql] class SQLConf extends Serializable {
private[spark] def isParquetBinaryAsString: Boolean =
getConf(PARQUET_BINARY_AS_STRING, "false").toBoolean

/**
* When set to true, we always treat INT96Values in Parquet files as timestamp.
*/
private[spark] def isParquetINT96AsTimestamp: Boolean =
getConf(PARQUET_INT96_AS_TIMESTAMP, "true").toBoolean

/**
* When set to true, partition pruning for in-memory columnar tables is enabled.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@

package org.apache.spark.sql.parquet

import java.sql.Timestamp
import java.util.{TimeZone, Calendar}

import scala.collection.mutable.{Buffer, ArrayBuffer, HashMap}

import jodd.datetime.JDateTime
import parquet.column.Dictionary
import parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, Converter}
import parquet.schema.MessageType

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.parquet.CatalystConverter.FieldType
import org.apache.spark.sql.types._
import org.apache.spark.sql.parquet.timestamp.NanoTime

/**
* Collection of converters of Parquet types (group and primitive types) that
Expand Down Expand Up @@ -123,6 +128,12 @@ private[sql] object CatalystConverter {
parent.updateDecimal(fieldIndex, value, d)
}
}
case TimestampType => {
new CatalystPrimitiveConverter(parent, fieldIndex) {
override def addBinary(value: Binary): Unit =
parent.updateTimestamp(fieldIndex, value)
}
}
// All other primitive types use the default converter
case ctype: PrimitiveType => { // note: need the type tag here!
new CatalystPrimitiveConverter(parent, fieldIndex)
Expand Down Expand Up @@ -197,9 +208,11 @@ private[parquet] abstract class CatalystConverter extends GroupConverter {
protected[parquet] def updateString(fieldIndex: Int, value: String): Unit =
updateField(fieldIndex, value)

protected[parquet] def updateDecimal(fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = {
protected[parquet] def updateTimestamp(fieldIndex: Int, value: Binary): Unit =
updateField(fieldIndex, readTimestamp(value))

protected[parquet] def updateDecimal(fieldIndex: Int, value: Binary, ctype: DecimalType): Unit =
updateField(fieldIndex, readDecimal(new Decimal(), value, ctype))
}

protected[parquet] def isRootConverter: Boolean = parent == null

Expand Down Expand Up @@ -232,6 +245,13 @@ private[parquet] abstract class CatalystConverter extends GroupConverter {
unscaled = (unscaled << (64 - numBits)) >> (64 - numBits)
dest.set(unscaled, precision, scale)
}

/**
* Read a Timestamp value from a Parquet Int96Value
*/
protected[parquet] def readTimestamp(value: Binary): Timestamp = {
CatalystTimestampConverter.convertToTimestamp(value)
}
}

/**
Expand Down Expand Up @@ -384,6 +404,9 @@ private[parquet] class CatalystPrimitiveRowConverter(
override protected[parquet] def updateString(fieldIndex: Int, value: String): Unit =
current.setString(fieldIndex, value)

override protected[parquet] def updateTimestamp(fieldIndex: Int, value: Binary): Unit =
current.update(fieldIndex, readTimestamp(value))

override protected[parquet] def updateDecimal(
fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = {
var decimal = current(fieldIndex).asInstanceOf[Decimal]
Expand Down Expand Up @@ -454,6 +477,73 @@ private[parquet] object CatalystArrayConverter {
val INITIAL_ARRAY_SIZE = 20
}

private[parquet] object CatalystTimestampConverter {
// TODO most part of this comes from Hive-0.14
// Hive code might have some issues, so we need to keep an eye on it.
// Also we use NanoTime and Int96Values from parquet-examples.
// We utilize jodd to convert between NanoTime and Timestamp
val parquetTsCalendar = new ThreadLocal[Calendar]
def getCalendar = {
// this is a cache for the calendar instance.
if (parquetTsCalendar.get == null) {
parquetTsCalendar.set(Calendar.getInstance(TimeZone.getTimeZone("GMT")))
}
parquetTsCalendar.get
}
val NANOS_PER_SECOND: Long = 1000000000
val SECONDS_PER_MINUTE: Long = 60
val MINUTES_PER_HOUR: Long = 60
val NANOS_PER_MILLI: Long = 1000000

def convertToTimestamp(value: Binary): Timestamp = {
val nt = NanoTime.fromBinary(value)
val timeOfDayNanos = nt.getTimeOfDayNanos
val julianDay = nt.getJulianDay
val jDateTime = new JDateTime(julianDay.toDouble)
val calendar = getCalendar
calendar.set(Calendar.YEAR, jDateTime.getYear)
calendar.set(Calendar.MONTH, jDateTime.getMonth - 1)
calendar.set(Calendar.DAY_OF_MONTH, jDateTime.getDay)

// written in command style
var remainder = timeOfDayNanos
calendar.set(
Calendar.HOUR_OF_DAY,
(remainder / (NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR)).toInt)
remainder = remainder % (NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR)
calendar.set(
Calendar.MINUTE, (remainder / (NANOS_PER_SECOND * SECONDS_PER_MINUTE)).toInt)
remainder = remainder % (NANOS_PER_SECOND * SECONDS_PER_MINUTE)
calendar.set(Calendar.SECOND, (remainder / NANOS_PER_SECOND).toInt)
val nanos = remainder % NANOS_PER_SECOND
val ts = new Timestamp(calendar.getTimeInMillis)
ts.setNanos(nanos.toInt)
ts
}

def convertFromTimestamp(ts: Timestamp): Binary = {
val calendar = getCalendar
calendar.setTime(ts)
val jDateTime = new JDateTime(calendar.get(Calendar.YEAR),
calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH))
// Hive-0.14 didn't set hour before get day number, while the day number should
// has something to do with hour, since julian day number grows at 12h GMT
// here we just follow what hive does.
val julianDay = jDateTime.getJulianDayNumber

val hour = calendar.get(Calendar.HOUR_OF_DAY)
val minute = calendar.get(Calendar.MINUTE)
val second = calendar.get(Calendar.SECOND)
val nanos = ts.getNanos
// Hive-0.14 would use hours directly, that might be wrong, since the day starts
// from 12h in Julian. here we just follow what hive does.
val nanosOfDay = nanos + second * NANOS_PER_SECOND +
minute * NANOS_PER_SECOND * SECONDS_PER_MINUTE +
hour * NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR
NanoTime(julianDay, nanosOfDay).toBinary
}
}

/**
* A `parquet.io.api.GroupConverter` that converts a single-element groups that
* match the characteristics of an array (see
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ private[sql] case class ParquetRelation(
ParquetTypesConverter.readSchemaFromFile(
new Path(path.split(",").head),
conf,
sqlContext.conf.isParquetBinaryAsString)

sqlContext.conf.isParquetBinaryAsString,
sqlContext.conf.isParquetINT96AsTimestamp)
lazy val attributeMap = AttributeMap(output.map(o => o -> o))

override def newInstance() = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging {
// TODO: Why it can be null?
if (schema == null) {
log.debug("falling back to Parquet read schema")
schema = ParquetTypesConverter.convertToAttributes(parquetSchema, false)
schema = ParquetTypesConverter.convertToAttributes(
parquetSchema, false, true)
}
log.debug(s"list of attributes that will be read: $schema")
new RowRecordMaterializer(parquetSchema, schema)
Expand Down Expand Up @@ -184,12 +185,12 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
case t @ StructType(_) => writeStruct(
t,
value.asInstanceOf[CatalystConverter.StructScalaType[_]])
case _ => writePrimitive(schema.asInstanceOf[PrimitiveType], value)
case _ => writePrimitive(schema.asInstanceOf[NativeType], value)
}
}
}

private[parquet] def writePrimitive(schema: PrimitiveType, value: Any): Unit = {
private[parquet] def writePrimitive(schema: DataType, value: Any): Unit = {
if (value != null) {
schema match {
case StringType => writer.addBinary(
Expand All @@ -202,6 +203,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
case IntegerType => writer.addInteger(value.asInstanceOf[Int])
case ShortType => writer.addInteger(value.asInstanceOf[Short])
case LongType => writer.addLong(value.asInstanceOf[Long])
case TimestampType => writeTimestamp(value.asInstanceOf[java.sql.Timestamp])
case ByteType => writer.addInteger(value.asInstanceOf[Byte])
case DoubleType => writer.addDouble(value.asInstanceOf[Double])
case FloatType => writer.addFloat(value.asInstanceOf[Float])
Expand Down Expand Up @@ -307,6 +309,10 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
writer.addBinary(Binary.fromByteArray(scratchBytes, 0, numBytes))
}

private[parquet] def writeTimestamp(ts: java.sql.Timestamp): Unit = {
val binaryNanoTime = CatalystTimestampConverter.convertFromTimestamp(ts)
writer.addBinary(binaryNanoTime)
}
}

// Optimized for non-nested rows
Expand Down Expand Up @@ -351,6 +357,7 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
case DoubleType => writer.addDouble(record.getDouble(index))
case FloatType => writer.addFloat(record.getFloat(index))
case BooleanType => writer.addBoolean(record.getBoolean(index))
case TimestampType => writeTimestamp(record(index).asInstanceOf[java.sql.Timestamp])
case d: DecimalType =>
if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) {
sys.error(s"Unsupported datatype $d, cannot write to consumer")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.spark.sql.test.TestSQLContext

import parquet.example.data.{GroupWriter, Group}
import parquet.example.data.simple.SimpleGroup
import parquet.example.data.simple.{NanoTime, SimpleGroup}
import parquet.hadoop.{ParquetReader, ParquetFileReader, ParquetWriter}
import parquet.hadoop.api.WriteSupport
import parquet.hadoop.api.WriteSupport.WriteContext
Expand Down Expand Up @@ -63,6 +63,7 @@ private[sql] object ParquetTestData {
optional int64 mylong;
optional float myfloat;
optional double mydouble;
optional int96 mytimestamp;
}"""

// field names for test assertion error messages
Expand All @@ -72,7 +73,8 @@ private[sql] object ParquetTestData {
"mystring:String",
"mylong:Long",
"myfloat:Float",
"mydouble:Double"
"mydouble:Double",
"mytimestamp:Timestamp"
)

val subTestSchema =
Expand All @@ -98,6 +100,7 @@ private[sql] object ParquetTestData {
optional int64 myoptlong;
optional float myoptfloat;
optional double myoptdouble;
optional int96 mytimestamp;
}
"""

Expand Down Expand Up @@ -236,6 +239,7 @@ private[sql] object ParquetTestData {
record.add(3, i.toLong << 33)
record.add(4, 2.5F)
record.add(5, 4.5D)
record.add(6, new NanoTime(1,2))
writer.write(record)
}
writer.close()
Expand Down
Loading