Skip to content

Conversation

@HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Dec 11, 2023

What changes were proposed in this pull request?

This PR is same as #44233 but does not use V1Table but the original DSv2 interface by reusing UDTF execution code.

Why are the changes needed?

In order for Python Data Source to be able to be used in all other place including SparkR, Scala together.

Does this PR introduce any user-facing change?

Yes. Users can register their Python Data Source, and use them in SQL, SparkR, etc.

How was this patch tested?

Unittests were added, and manually tested.

Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44269
Closes #44233
Closes #43784

@HyukjinKwon HyukjinKwon force-pushed the SPARK-45597-3 branch 3 times, most recently from 274d08f to 2e9c06f Compare December 13, 2023 01:04
@HyukjinKwon HyukjinKwon marked this pull request as ready for review December 13, 2023 06:58
* there is no corresponding Data Source V2 implementation, or the provider is configured to
* fallback to Data Source V1 code path.
*/
def lookupDataSourceV2(provider: String, conf: SQLConf): Option[TableProvider] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I like that idea. Can I do it in a followup though? I would like to extract some changes from your PR, and make another PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a followup... I have a concern about changing lookupDataSource which is only used for the DS v1 path. Let's avoid the risk of breaking anything. It's also less code change if we only instantiate the PythonTableProvider here, so that the existing caller of lookupDataSource can still instantiate the objects directly instead of calling the new newDataSourceInstance function.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh okie dokie. I was actually thinking about porting more changes in your PR. I will fix that one alone here for now.

val schema = StructType.fromDDL("id INT, partition INT")
val dataSource = createUserDefinedPythonDataSource(
name = dataSourceName, pythonScript = dataSourceScript)
spark.dataSource.registerPython(dataSourceName, dataSource)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need the extra registration?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously UserDefinedPythonDataSource was able to create a DataFrame directly (from LogicalRelation) via UserDefinedPythonDataSource.apply.

Now it is not possible anymore because we're using DSv2. So, now here we register and load via using DataFrameReader to create a DataFrame to test.

Copy link
Contributor

@allisonwang-db allisonwang-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great!

properties: java.util.Map[String, String]): Table = {
assert(partitioning.isEmpty)
val outputSchema = schema
new Table with SupportsRead {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can create a new class PythonTable to make it more extensible in the future.

Copy link
Member Author

@HyukjinKwon HyukjinKwon Dec 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I intentionally put it together because we should cache dataSourceInPython executed from the Python worker (that contains both schema and pickled datasource), once for schema inference, and once for getting partitions. So it becomes more readable, and localize the scope of the cache. In addition, I think we won't likely extend this Python Table class/instance.

largeVarTypes: Boolean,
pythonRunnerConf: Map[String, String],
pythonMetrics: Map[String, SQLMetric],
pythonMetrics: Option[Map[String, SQLMetric]],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to change this to Optional?

Copy link
Member Author

@HyukjinKwon HyukjinKwon Dec 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to reuse MapInBatchEvaluatorFactory to read the data in executor side. We should integrate this to Scan.supportedCustomMetrics though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here: #44375

*/
@Unstable
def executeCommand(runner: String, command: String, options: Map[String, String]): DataFrame = {
DataSource.lookupDataSource(runner, sessionState.conf) match {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually @cloud-fan that would not work .. E.g., if PythonDataSource implements ExternalCommandRunner, we should load it here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lemme fix it separately. Reading the code path, I think it won't more and less affect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's worry about it when we actually adding this ability to the python data source. We may never add it for simplicity.

// instead of `providingClass`.
cls.getDeclaredConstructor().newInstance() match {
DataSource.newDataSourceInstance(className, cls) match {
case f: FileDataSourceV2 => f.fallbackFileFormat
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and here too

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and tables.scala as well:

    if (DDLUtils.isDatasourceTable(catalogTable)) {
      DataSource.newDataSourceInstance(
          catalogTable.provider.get,
          DataSource.lookupDataSource(catalogTable.provider.get, conf)) match {
        // For datasource table, this command can only support the following File format.
        // TextFileFormat only default to one column "value"
        // Hive type is already considered as hive serde table, so the logic will not
        // come in here.
        case _: CSVFileFormat | _: JsonFileFormat | _: ParquetFileFormat =>
        case _: JsonDataSourceV2 | _: CSVDataSourceV2 |
             _: OrcDataSourceV2 | _: ParquetDataSourceV2 =>
        case s if s.getClass.getCanonicalName.endsWith("OrcFileFormat") =>
        case s =>
          throw QueryCompilationErrors.alterAddColNotSupportDatasourceTableError(s, table)
      }
    }
    catalogTable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and DataStreamReader:

    val v1DataSource = DataSource(
      sparkSession,
      userSpecifiedSchema = userSpecifiedSchema,
      className = source,
      options = optionsWithPath.originalMap)
    val v1Relation = ds match {
      case _: StreamSourceProvider => Some(StreamingRelation(v1DataSource))
      case _ => None
    }
    ds match {
      // file source v2 does not support streaming yet.
      case provider: TableProvider if !provider.isInstanceOf[FileDataSourceV2] =>

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and DataStreamWriter:

      val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
      val disabledSources =
        Utils.stringToSeq(df.sparkSession.sessionState.conf.disabledV2StreamingWriters)
      val useV1Source = disabledSources.contains(cls.getCanonicalName) ||
        // file source v2 does not support streaming yet.
        classOf[FileDataSourceV2].isAssignableFrom(cls)

      val optionsWithPath = if (path.isEmpty) {
        extraOptions
      } else {
        extraOptions + ("path" -> path.get)
      }

      val sink = if (classOf[TableProvider].isAssignableFrom(cls) && !useV1Source) {

}
}

override def supportsExternalMetadata(): Boolean = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am actually thinking about whether we should expose this as an API in Python data source.
If a data source cannot handle external metadata, then .schema(....) or CREATE TABLE table(...) should fail, instead of failing when executing the query.
But I am not sure if this will make the Python API too complicated. WDTY?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For simplicity, I think we can set it to false (default value) for now. It can be difficult to implement a data source that supports user-specified schema actually.

case source if classOf[ExternalCommandRunner].isAssignableFrom(source) =>
Dataset.ofRows(self, ExternalCommandExecutor(
source.getDeclaredConstructor().newInstance()
DataSource.newDataSourceInstance(runner, source)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be arguable that if this is a breaking change. Now people need to worry about python data source in the code that is to deal with DS v1 only.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExternalCommandRunner is DSv2 API..

@HyukjinKwon
Copy link
Member Author

Merged to master.

HyukjinKwon added a commit that referenced this pull request Dec 15, 2023
…taSource.lookupDataSourceV2

### What changes were proposed in this pull request?

This PR is a kind of a followup of #44305 that proposes to create Python Data Source instance at `DataSource.lookupDataSourceV2`

### Why are the changes needed?

Semantically the instance has to be ready at `DataSource.lookupDataSourceV2` level instead of after that. It's more consistent as well.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests should cover.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44374 from HyukjinKwon/SPARK-46423.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
HyukjinKwon added a commit that referenced this pull request Dec 26, 2023
…ation session level

### What changes were proposed in this pull request?

This PR is a followup of #44305. It already works properly with the session-level.

### Why are the changes needed?

To remove unnecessary TODO JIRA.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing Ci in this PR should verify them

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44487 from HyukjinKwon/SPARK-45600.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
@HyukjinKwon HyukjinKwon deleted the SPARK-45597-3 branch January 15, 2024 00:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants