Skip to content

Conversation

@gengliangwang
Copy link
Member

What changes were proposed in this pull request?

Migrate ORC file format read path to data source V2.

Supports:

  1. Scan ColumnarBatch
  2. Scan UnsafeRow
  3. Push down filters
  4. Push down required columns

Not supported( due to limitation of data source V2):

  1. Read multiple file path
  2. Read bucketed file.

How was this patch tested?

unit test

@SparkQA
Copy link

SparkQA commented Mar 29, 2018

Test build #88702 has finished for PR 20933 at commit 40b33c3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class FallBackToOrcV1(sparkSession: SparkSession) extends Rule[LogicalPlan]
  • class EmptyDataReader[T] extends DataReader[T]
  • case class OrcBatchDataReaderFactory(
  • case class OrcColumnarBatchDataReader(iter: Iterator[InternalRow])
  • class OrcDataSourceV2 extends DataSourceV2 with ReadSupport with ReadSupportWithSchema
  • class OrcDataSourceReader(options: DataSourceOptions, userSpecifiedSchema: Option[StructType])
  • case class OrcUnsafeRowReaderFactory(
  • case class OrcUnsafeRowDataReader(iter: Iterator[InternalRow])

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requestedPartitionColds -> requestedPartitionColIds

@gatorsmile
Copy link
Member

gatorsmile commented Mar 29, 2018

Let us trigger more tests by changing spark.sql.sources.default to orc and see whether all the tests can pass.

@SparkQA
Copy link

SparkQA commented Mar 29, 2018

Test build #88711 has finished for PR 20933 at commit a3e084a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member Author

@jose-torres @cloud-fan

Copy link
Contributor

@jose-torres jose-torres left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few high level questions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems weird that DataFrameReader is modified here. Will DataSourceV2 implementations generally need to modify DataFrameReader, or is it just a temporary hack because of the mentioned lack of support? In the latter case, is there a plan to add this support soon?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is temporary hack. I think @cloud-fan will create a PR to support reading multiple files recently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea it's a temporary hack. We will support multi-path soon.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about bucketed reads? WIll they need a similar change here, or is that lack of support handled elsewhere? (Or am I misunderstanding something about that part of the description - I'm not super familiar with the ORC source)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only support bucket with tables, while data source v2 can't work with tables now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does v2 not support reading multiple files?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we need to define how to pass multiple paths via options. I have a PR to fix it, I'll bring it update to date.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to split the refactoring changes into their own PR? It's hard to tell at a glance which parts of the change are refactoring and which are new V2 implementation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that is a good idea.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid these refactors only make sense in this PR, for reusing code between v1 and v2.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this only make sense for this PR? It looks like this is a reasonable refactor that could be stand-alone.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moving code need a reason. The reason here is to help us to reuse the code. But if we do it in another PR, what is the reason? It doesn't make the code more clear IMO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better organization to support other changes like this one is the reason.

@jose-torres was right to point out that these changes are self-contained enough to go in a separate PR and @gengliangwang and I both agreed. Why make this commit larger than necessary?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we agree that a separated PR is self-contained as it can help this PR, I'm also OK with it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think the commit itself would be self-contained reorganization. The motivation is to refactor for this PR, which is okay.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't Spark handle this already when it sees that OrcDataSourceV2 doesn't implement WriteSupport?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for InsertIntoTable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this also a temporary hack, then? It seems like Spark should know it can't write to a source which doesn't implement WriteSupport, no matter what the shape of the query performing the write is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a tempory hack. v1 has the same problem: when inserting into a table which is backed by a non-writable data source, Spark would fail during planning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @jose-torres. If there is a general problem when writes aren't supported, then shouldn't this be a generic rule that provides a good error message?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

param doc for them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not only missing columns not, but also partition columns

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are a lot of duplication between this and OrcBatchDataReaderFactory, and we may have more when migrating other file formats.

@SparkQA
Copy link

SparkQA commented Mar 31, 2018

Test build #88773 has finished for PR 20933 at commit 29de999.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

also cc @rdblue

@SparkQA
Copy link

SparkQA commented Apr 3, 2018

Test build #88843 has finished for PR 20933 at commit e4cd8a3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 4, 2018

Test build #88859 has finished for PR 20933 at commit ffbf2f8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 4, 2018

Test build #88897 has finished for PR 20933 at commit 35b74c0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member Author

retest this please.

@SparkQA
Copy link

SparkQA commented Apr 5, 2018

Test build #88923 has finished for PR 20933 at commit 35b74c0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member Author

retest this please.

@cloud-fan
Copy link
Contributor

retest this please

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this class public? Isn't this internal to HadoopFsRelation's v2 implementation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.sql.disabledV2DataSources

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is following disabledV2StreamingMicroBatchReaders. And currently this PR only supports reading.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a better name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its v2 implementation is disabled. Reads from these sources will fall back to the V1 implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implies requestedColIds[i] and requestedPartitionColIds[i] may both be non-negative, is it possible?

Copy link
Member Author

@gengliangwang gengliangwang Apr 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, here requestedColIds means the actual required columns, including the partition columns.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also apply this check in the copyToSpark branch

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add comment for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only support bucket with tables, while data source v2 can't work with tables now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also move this log to PartitionedFileUtil.maxSplitBytes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't hardcode it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we have both PartitionedFileUtil and FilePartitionUtil?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PartitionedFileUtil is about how we get PartitionedFile.
FilePartitionUtil is about how we get FilePartition and convert them to InternalRow

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explain why this works, i.e. we use type erase hack to return columnar batch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which file source doesn't support this? I think all file sources support partitioning.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought some of the file source would choose SupportsPushDownFilters instead of SupportsPushDownCatalystFilters. Not very sure about this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should not remove tests

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is a bad idea to continue using PartitionedFile => Iterator[InternalRow] in v2.

I understand not wanting to change much about how this works, just to get the code behind the v2 API. But this pattern is broken and causes resource problems that the v2 API nudges implementers to fix.

What resource problems? This doesn't implement close properly, forcing close to happen at task end by calling functions registered when files are opened. We've gone back through and replaced the iterators with closeable versions so that we release resources more quickly because the callback-based close does not scale.

I would like to see this problem fixed instead of copying it into v2.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I am quite frustrated when I update the code and use PartitionedFile => Iterator[InternalRow] as V1 did.
I was trying to reduce duplicated code between vectorized reader and unsafe row reader. And we can reuse the code in FileScanRDD.
I know this makes the V2 implementation meaningless. I will keep finding a good solution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw i think it's also ok if we know what we want in the final version, and the intermediate change tries to minimize code changes (i haven't looked at the pr at all so don't interpret this comment as endorsing or not endorsing the pr design)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With #21029, we can get rid of this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this named "compute" and not "open" or something more specific?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just in case that this implementation has bug or regression. Following DISABLED_V2_STREAMING_MICROBATCH_READERS

@SparkQA
Copy link

SparkQA commented Apr 5, 2018

Test build #88943 has finished for PR 20933 at commit 35b74c0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 9, 2018

Test build #89061 has finished for PR 20933 at commit 9bde159.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member Author

gengliangwang commented Apr 9, 2018

Discussed with @cloud-fan offline. The conclusion is that we can use simple factory pattern for data factory, so that we can avoid redundant code easily and stop using PartitionedFile => Iterator[InternalRow].
He has created #21029. I will continue updating this one after his PR merged.

@SparkQA
Copy link

SparkQA commented Apr 16, 2018

Test build #89392 has finished for PR 20933 at commit 80b36f3.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 14, 2018

Test build #90584 has finished for PR 20933 at commit 67b1748.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val jobId = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
.format(new Date()) + "-" + UUID.randomUUID()
val writer = ds.asInstanceOf[WriteSupport]
.createWriter(jobId, df.logicalPlan.schema, mode, options)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I understand this: why do use .createWriter here, but we do not use .createReader in DataFrameReader. It seems "unsymmetrical" to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is. We're still evolving the v2 API and integration with Spark. This problem is addressed in PR #21305, which is the first of a series of changes to standardize the logical plans and fix problems like this one.

There's also an open proposal for those changes.

@gengliangwang
Copy link
Member Author

Status update: we are working on new proposal for changing the Data source API, to resolve the problems exposed in this PR.
Before the new proposal is adopted or denied, this PR remains open.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants