Skip to content

Conversation

@cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Apr 10, 2018

What changes were proposed in this pull request?

This API change is inspired by the problems we meet when migrating streaming and file-based data sources to the data souce v2 API.

For the streaming side, we need a variant of the DataReader/WriterFactory(see an example). This brings a lot of trouble for scanning/writing optimized data format like InternalRow, ColumnarBatch, etc.

These special scanning/writing interfaces are defined like

interface SupportsScanColumnarBatch {
  List<DataReaderFactory<UnsafeRow>> createUnsafeRowReaderFactories();
}

This can't work with ContinuousDataReaderFactory at all, or we have to do runtime type cast and make the variant extends DataReader/WriterFactory. We have the same problem on the write path too.

For the file-based data source side, we have a problem with code duplication. Let's take ORC data source as an example. To support both unsafe row and columnar batch scan, we need something like

class OrcUnsafeRowDataReader extends DataReader[UnsafeRow] {
  ...
}

class OrcColumnarBatchDataReader extends DataReader[ColumnarBatch] {
  ...
}

class OrcUnsafeRowFactory(...) extends DataReaderFactory[UnsafeRow] {
  def createDataReader ...
}

class OrcColumnarBatchFactory(...) extends DataReaderFactory[ColumnarBatch] {
  def createDataReader ...
}

class OrcDataSourceReader extends DataSourceReader {
  def createUnsafeRowFactories = ... // logic to prepare the parameters and create factories

  def createColumnarBatchFactories = ... // logic to prepare the parameters and create factories
}

You can see that we have duplicated logic for preparing parameters and defining the factory. After this change, we can simplify the code to

class OrcReaderFactory(...) extends DataReaderFactory {
  def createUnsafeRowReader ...

  def createColumnarBatchReader ...
}

class OrcDataSourceReader extends DataSourceReader {
  def createReadFactories = ... // logic to prepare the parameters and create factories
} 

The proposed change is: remove the type parameter and embed the special scanning/writing format to the factory. e.g.

interface DataReaderFactory {
  DataFormat dataFormat;

  default DataReader<Row> createRowDataReader() {
    throw new IllegalStateException(
      "createRowDataReader must be implemented if the data format is ROW.");
  }

  default DataReader<UnsafeRow> createUnsafeRowDataReader() {
    throw new IllegalStateException(
      "createUnsafeRowDataReader must be implemented if the data format is UNSAFE_ROW.");
  }

  default DataReader<ColumnarBatch> createColumnarBatchDataReader() {
    throw new IllegalStateException(
      "createColumnarBatchDataReader must be implemented if the data format is COLUMNAR_BATCH.");
  }
}

A potential benefit after this change: now it's up to the factory to decide which data format to use(UnsafeRow, ColumnarBatch, etc.). Which means, it's possible to allow different data partitions to be scanned with different formats. Some hybrid storage may keep realtime data in row format, and history data in columnar format, and it can fit the new API well.

TODO:
apply this change to the write path(next PR)

How was this patch tested?

existing tests

@cloud-fan cloud-fan changed the title remove type parameter in DataReaderFactory [WIP][SPARK-23952] remove type parameter in DataReaderFactory Apr 10, 2018
@cloud-fan
Copy link
Contributor Author

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @jose-torres do you know what's missing for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't, because I'm not really sure how it works in the batch case. How does it work to do

new DataSourceRDD(sparkContext, batchReaderFactories).asInstanceOf[RDD[InternalRow]]

when the type parameter of batchReaderFactories doesn't match InternalRow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use a type erase hack, and lie to the Scala compiler that we are outputting InternalRow. At runtime, we cast the data to ColumnarBatch in codegen.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then the missing piece is codegen. This is difficult because the continuous stream reader does a lot of auxiliary work, so I don't know if it will happen in the near future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've thought about this further. Shouldn't it be trivial to write a wrapper that simply converts a DataReader[ColumnarBatch] to a DataReader[InternalRow]? If so then we can easily support it after the current PR.

Copy link
Contributor Author

@cloud-fan cloud-fan Apr 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have seen this pattern many times, the java List is a little trouble because it's invariance. Shall we change the interface to use array?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like that, but I don't know if that would make things harder for data source implementers working in Java.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Array is a java-friendly type.

@SparkQA
Copy link

SparkQA commented Apr 10, 2018

Test build #89133 has finished for PR 21029 at commit d44105d.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member

gengliangwang commented Apr 10, 2018

+1
From the PR #20933, we can see that there is a lot of common code between DataReaderFactory[ColumnarBatch] and DataReaderFactory[UnsafeRow], with the current Factory method pattern.
This change makes data source implementation easier, and we don't need to do runtime type cast.

Copy link
Member

@gengliangwang gengliangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In DataReader.java:

/**
 * A data reader returned by {@link DataReaderFactory#createDataReader()} and is responsible for
 * outputting data for a RDD partition.
 *
 * Note that, Currently the type `T` can only be {@link org.apache.spark.sql.Row} for normal data
 * source readers, or {@link org.apache.spark.sql.catalyst.expressions.UnsafeRow} for data source
 * readers that mix in {@link SupportsScanUnsafeRow}.
 */

The first and last @link needs update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @jose-torres , seems this method is never used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine to remove this. We've deferred or reworked all of the things that were going to use this method; it makes sense to rethink how to provide this functionality after the rest is polished and stable-ish.

@cloud-fan cloud-fan changed the title [WIP][SPARK-23952] remove type parameter in DataReaderFactory [SPARK-23952] remove type parameter in DataReaderFactory Apr 16, 2018
@SparkQA
Copy link

SparkQA commented Apr 16, 2018

Test build #89400 has finished for PR 21029 at commit dac302f.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 16, 2018

Test build #89401 has finished for PR 21029 at commit 556eef2.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 17, 2018

Test build #89430 has finished for PR 21029 at commit 36031f2.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 17, 2018

Test build #89436 has finished for PR 21029 at commit b5c3b39.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 17, 2018

Test build #89458 has finished for PR 21029 at commit c5071b4.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 17, 2018

Test build #89461 has finished for PR 21029 at commit 1ae4b6d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 18, 2018

Test build #89482 has finished for PR 21029 at commit 18e391a.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Apr 18, 2018

Test build #89490 has finished for PR 21029 at commit 18e391a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Apr 18, 2018

Test build #89505 has finished for PR 21029 at commit 18e391a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* <li>@{@link DataFormat#COLUMNAR_BATCH}: {@link #createColumnarBatchDataReader()}</li>
* </ul>
*/
DataFormat dataFormat();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the data format is determined when the factory is created, then I don't see why it is necessary to change the API. This just makes it more confusing.


package org.apache.spark.sql.sources.v2.reader.streaming;

import java.util.Optional;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this is a cosmetic change that should be reverted before committing.

case DataFormat.COLUMNAR_BATCH =>
new DataReaderIterator(factory.createColumnarBatchDataReader())
// TODO: remove this type erase hack.
.asInstanceOf[DataReaderIterator[UnsafeRow]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this change intended to avoid these casts?

range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer)
}
factories.map(_.asInstanceOf[DataReaderFactory[UnsafeRow]]).asJava
factories.map(_.asInstanceOf[DataReaderFactory]).asJava
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this cast necessary?

@cloud-fan cloud-fan closed this Jul 6, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants