-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-10364][SQL] Support Parquet logical type TIMESTAMP_MILLIS #15332
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #66268 has finished for PR 15332 at commit
|
|
Paruqet support both millisecond and microseconds: We should be able to read both of them, but write it as TIMESTAMP_MICROS by default? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where does this come from? It's different than that in Parquet doc (since epoch)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@davies Thank you for your comments Davies. Per Lian's comment in the jira,
https://issues.apache.org/jira/browse/SPARK-8824
parquet 1.8 which is what we are using currently does not have support for TIMESTAMP_MICROS yet. He suggested we implement TIMESTAMP_MILLIS for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@davies On your second comment , let me please check and get back to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@davies Many thanks !! You are right. We need to write it as milliseconds since epoc. I will send a update .. Thanks again !!
|
Test build #66307 has finished for PR 15332 at commit
|
|
retest this please |
|
@davies Unfortunately parquet-mr 1.8.1, which is used by the current master, hadn't included |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isParquetINT64AsTimestampMillis here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya Thank you. will change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return type should be SQLTimestamp? Input type should be Long?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya Will make the change. Thanks !!
|
Test build #66309 has finished for PR 15332 at commit
|
|
Test build #66339 has finished for PR 15332 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For vectorized reader, I think we should also add TimestampType support for INT64 in decodeDictionaryIds?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dilipbiswal Per our offline discussion, I think you should add TimestampType support for INT64 in decodeDictionaryIds. In order to test it, a test case of mixing dictionary-encoded values and non dictionary-encoded values is needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've tested the following test case:
test("SPARK-10634 timestamp written and read as INT64 - TIMESTAMP_MILLIS") {
val data = (1 to 1000).map { i =>
if (i < 500) {
Row(new java.sql.Timestamp(10))
} else {
Row(new java.sql.Timestamp(i))
}
}
val schema = StructType(List(StructField("time", TimestampType, false)).toArray)
withSQLConf(ParquetOutputFormat.DICTIONARY_PAGE_SIZE -> "64",
ParquetOutputFormat.PAGE_SIZE -> "128") {
withSQLConf(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key -> "true") {
withTempPath { file =>
val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
df.coalesce(1).write.parquet(file.getCanonicalPath)
("true" :: Nil).foreach { vectorized =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
val df2 = spark.read.parquet(file.getCanonicalPath)
checkAnswer(df2, df.collect().toSeq)
}
}
}
}
}
}
It will cause an exception:
[info] org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 4, localhost): java.lang.UnsupportedOperationException: Unimplemented type: TimestampType
[info] at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.decodeDictionaryIds(VectorizedColumnReader.java:256)
[info] at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:177)
[info] at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:230)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya Thanks Simon. Very good catch !! I have made the changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a block of comments and codes for TimestampType below. Can we move this branch in the block? And we should add few comments about this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a test that can we read a timestamp field written with SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS as true, but reading it with SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS as false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we merge these configs into a single one, spark.sql.parquet.timestampAs (a better name?), which could be int96, millisecond, or microsecond (support in future)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@davies Thanks Davies. I have a couple of questions.
- If we externalized an config in prior releases, could we just change it or we need to be backward compatible.
- I was reading the description and usage of existing config 'spark.sql.parquet.int96AsTimestamp' , it seems that this is applicable for read where as the new one we have introduced in this PR is applicable for write.
- Should we change the semantics of the proposed common property to control the write encoding only and make reading solely based on the schema metadata i.e type + original type? If you agree then may be we could go with spark.sql.parquet.timestamp.encoding ? I am ok with spark.sql.parquet.timestampAs as well.
Did you want this change as part of this PR ? Thanks a lot for your input as always.
|
Test build #66401 has finished for PR 15332 at commit
|
|
Test build #66403 has finished for PR 15332 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to convert the time back to micros and in case of lazy decoding, we don't get that chance ?
|
Test build #66435 has finished for PR 15332 at commit
|
|
LGTM. see if @davies @liancheng have other comments about this. |
a907563 to
c376b4e
Compare
|
Test build #67278 has finished for PR 15332 at commit
|
|
Would it be helpful to submit a new PR with the conflicts resolved? If not, what are the next steps for this issue? |
|
@saulshanabrook Hello, thanks for your comment. Currently, i am waiting for feedback from @liancheng and @davies. Perhaps this is not a priority now. I will try to resolve the conflicts and push in any case. |
c376b4e to
796b6b1
Compare
|
Test build #75306 has finished for PR 15332 at commit
|
ueshin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, except for minor comments.
| writeLegacyParquetFormat = true) | ||
|
|
||
| testSchema( | ||
| "Timestmp written and read as INT64 with TIMESTAMP_MILLIS", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Timestmp -> Timestamp
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ueshin Thanks. Done.
|
|
||
| if (us < 0 && (us % MILLIS_PER_SECOND < 0)) { | ||
| millis = millis - 1 | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we use Math.floor() here as the same as millisToDays?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ueshin Thanks a lot. I have made the change.
|
Test build #75347 has finished for PR 15332 at commit
|
squito
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just found a couple of tiny nits. I will defer to others on this change.
I guess we'll want to add something similar to SPARK-12297 for int64 as well eventually, but I dont' think they need to go in together, especially as both are in-flight right now.
| } | ||
|
|
||
| /* | ||
| * Converts the timestamp to milliseconds since epoc. In spark timestamp values have microseconds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: epoch
| DecimalType.is64BitDecimalType(column.dataType())) { | ||
| defColumn.readLongs( | ||
| num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); | ||
| num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this change in indentation? if anything, looks like it should be indented less than the original.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@squito Thank you for reviewing. I have fixed the indentation.
|
Test build #75361 has finished for PR 15332 at commit
|
|
Thanks! Merging to master. |
|
Thanks a lot @ueshin @viirya @gatorsmile |
|
Hi @dilipbiswal , do you mind to share how you generate the testing parquet file? thanks! |
|
@cloud-fan Hi Wenchen, its been a while .. i am trying my best to recollect. I think once i had the write code implemented in spark, i used it to produce files. Depending on the data, parquet uses different encoding (plain or dictionary). I examined the encodings and the data using parquet tools. I produced two files and used the merge option in the tools to merge them in one file. This is to the best of my recollection :-) |
|
great, thanks! |
**Description** from JIRA The TimestampType in Spark SQL is of microsecond precision. Ideally, we should convert Spark SQL timestamp values into Parquet TIMESTAMP_MICROS. But unfortunately parquet-mr hasn't supported it yet. For the read path, we should be able to read TIMESTAMP_MILLIS Parquet values and pad a 0 microsecond part to read values. For the write path, currently we are writing timestamps as INT96, similar to Impala and Hive. One alternative is that, we can have a separate SQL option to let users be able to write Spark SQL timestamp values as TIMESTAMP_MILLIS. Of course, in this way the microsecond part will be truncated. Added new tests in ParquetQuerySuite and ParquetIOSuite Author: Dilip Biswal <[email protected]> Closes apache#15332 from dilipbiswal/parquet-time-millis.
What changes were proposed in this pull request?
Description from JIRA
The TimestampType in Spark SQL is of microsecond precision. Ideally, we should convert Spark SQL timestamp values into Parquet TIMESTAMP_MICROS. But unfortunately parquet-mr hasn't supported it yet.
For the read path, we should be able to read TIMESTAMP_MILLIS Parquet values and pad a 0 microsecond part to read values.
For the write path, currently we are writing timestamps as INT96, similar to Impala and Hive. One alternative is that, we can have a separate SQL option to let users be able to write Spark SQL timestamp values as TIMESTAMP_MILLIS. Of course, in this way the microsecond part will be truncated.
How was this patch tested?
Added new tests in ParquetQuerySuite and ParquetIOSuite