Skip to content

Commit 9e12d9f

Browse files
committed
do not use RunnableCommand for writing plan
1 parent acf00ec commit 9e12d9f

File tree

3 files changed

+27
-19
lines changed

3 files changed

+27
-19
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan
2929
import org.apache.spark.sql.execution.SQLExecution
3030
import org.apache.spark.sql.execution.command.DDLUtils
3131
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation}
32-
import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Command
32+
import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2
3333
import org.apache.spark.sql.sources.BaseRelation
3434
import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options, WriteSupport}
3535
import org.apache.spark.sql.types.StructType
@@ -241,7 +241,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
241241
val writer = ds.createWriter(df.logicalPlan.schema, mode, options)
242242
if (writer.isPresent) {
243243
runCommand(df.sparkSession, "save") {
244-
WriteToDataSourceV2Command(writer.get(), df.logicalPlan)
244+
WriteToDataSourceV2(writer.get(), df.logicalPlan)
245245
}
246246
}
247247

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,17 @@
1818
package org.apache.spark.sql.execution.datasources.v2
1919

2020
import org.apache.spark.sql.Strategy
21-
import org.apache.spark.sql.catalyst.expressions._
22-
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
2321
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
24-
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
25-
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
26-
import org.apache.spark.sql.sources.Filter
27-
import org.apache.spark.sql.sources.v2.reader._
22+
import org.apache.spark.sql.execution.SparkPlan
2823

2924
object DataSourceV2Strategy extends Strategy {
30-
// TODO: write path
3125
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
3226
case DataSourceV2Relation(output, reader) =>
3327
DataSourceV2ScanExec(output, reader) :: Nil
3428

29+
case WriteToDataSourceV2(writer, query) =>
30+
WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil
31+
3532
case _ => Nil
3633
}
3734
}
Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,35 +19,46 @@ package org.apache.spark.sql.execution.datasources.v2
1919

2020
import org.apache.spark.{SparkException, TaskContext}
2121
import org.apache.spark.internal.Logging
22-
import org.apache.spark.sql.{Dataset, Row, SparkSession}
22+
import org.apache.spark.rdd.RDD
23+
import org.apache.spark.sql.Row
2324
import org.apache.spark.sql.catalyst.InternalRow
2425
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
25-
import org.apache.spark.sql.catalyst.plans.QueryPlan
26+
import org.apache.spark.sql.catalyst.expressions.Attribute
2627
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
27-
import org.apache.spark.sql.execution.command.RunnableCommand
28+
import org.apache.spark.sql.execution.SparkPlan
2829
import org.apache.spark.sql.sources.v2.writer._
2930
import org.apache.spark.sql.types.StructType
3031
import org.apache.spark.util.Utils
3132

32-
case class WriteToDataSourceV2Command(writer: DataSourceV2Writer, query: LogicalPlan)
33-
extends RunnableCommand {
33+
/**
34+
* The logical plan for writing data into data source v2.
35+
*/
36+
case class WriteToDataSourceV2(writer: DataSourceV2Writer, query: LogicalPlan) extends LogicalPlan {
37+
override def children: Seq[LogicalPlan] = Seq(query)
38+
override def output: Seq[Attribute] = Nil
39+
}
3440

35-
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
41+
/**
42+
* The physical plan for writing data into data source v2.
43+
*/
44+
case class WriteToDataSourceV2Exec(writer: DataSourceV2Writer, query: SparkPlan) extends SparkPlan {
45+
override def children: Seq[SparkPlan] = Seq(query)
46+
override def output: Seq[Attribute] = Nil
3647

37-
override def run(sparkSession: SparkSession): Seq[Row] = {
48+
override protected def doExecute(): RDD[InternalRow] = {
3849
val writeTask = writer match {
3950
case w: SupportsWriteInternalRow => w.createInternalRowWriterFactory()
4051
case _ => new RowToInternalRowDataWriterFactory(writer.createWriterFactory(), query.schema)
4152
}
4253

43-
val rdd = Dataset.ofRows(sparkSession, query).queryExecution.toRdd
54+
val rdd = query.execute()
4455
val messages = new Array[WriterCommitMessage](rdd.partitions.length)
4556

4657
logInfo(s"Start processing data source writer: $writer. " +
4758
s"The input RDD has ${messages.length} partitions.")
4859

4960
try {
50-
sparkSession.sparkContext.runJob(
61+
sparkContext.runJob(
5162
rdd,
5263
(context: TaskContext, iter: Iterator[InternalRow]) =>
5364
DataWritingSparkTask.run(writeTask, context, iter),
@@ -73,7 +84,7 @@ case class WriteToDataSourceV2Command(writer: DataSourceV2Writer, query: Logical
7384
throw new SparkException("Writing job aborted.", cause)
7485
}
7586

76-
Nil
87+
sparkContext.emptyRDD
7788
}
7889
}
7990

0 commit comments

Comments
 (0)