Skip to content

Commit 80036ad

Browse files
committed
data source v2 API refactor: streaming write
1 parent 4dce45a commit 80036ad

39 files changed

+373
-389
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ import org.apache.spark.sql.sources._
3333
import org.apache.spark.sql.sources.v2._
3434
import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
3535
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
36-
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
36+
import org.apache.spark.sql.sources.v2.writer.WriteBuilder
37+
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWrite, SupportsOutputMode}
3738
import org.apache.spark.sql.streaming.OutputMode
3839
import org.apache.spark.sql.types.StructType
3940

@@ -47,7 +48,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
4748
with StreamSinkProvider
4849
with RelationProvider
4950
with CreatableRelationProvider
50-
with StreamingWriteSupportProvider
5151
with TableProvider
5252
with Logging {
5353
import KafkaSourceProvider._
@@ -180,20 +180,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
180180
}
181181
}
182182

183-
override def createStreamingWriteSupport(
184-
queryId: String,
185-
schema: StructType,
186-
mode: OutputMode,
187-
options: DataSourceOptions): StreamingWriteSupport = {
188-
import scala.collection.JavaConverters._
189-
190-
val topic = Option(options.get(TOPIC_OPTION_KEY).orElse(null)).map(_.trim)
191-
// We convert the options argument from V2 -> Java map -> scala mutable -> scala immutable.
192-
val producerParams = kafkaParamsForProducer(options.asMap.asScala.toMap)
193-
194-
new KafkaStreamingWriteSupport(topic, producerParams, schema)
195-
}
196-
197183
private def strategy(caseInsensitiveParams: Map[String, String]) =
198184
caseInsensitiveParams.find(x => STRATEGY_OPTION_KEYS.contains(x._1)).get match {
199185
case ("assign", value) =>
@@ -365,7 +351,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
365351
}
366352

367353
class KafkaTable(strategy: => ConsumerStrategy) extends Table
368-
with SupportsMicroBatchRead with SupportsContinuousRead {
354+
with SupportsMicroBatchRead with SupportsContinuousRead with SupportsStreamingWrite {
369355

370356
override def name(): String = s"Kafka $strategy"
371357

@@ -374,6 +360,28 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
374360
override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new ScanBuilder {
375361
override def build(): Scan = new KafkaScan(options)
376362
}
363+
364+
override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
365+
new WriteBuilder with SupportsOutputMode {
366+
private var inputSchema: StructType = _
367+
368+
override def withInputDataSchema(schema: StructType): WriteBuilder = {
369+
this.inputSchema = schema
370+
this
371+
}
372+
373+
override def outputMode(mode: OutputMode): WriteBuilder = this
374+
375+
override def buildForStreaming(): StreamingWrite = {
376+
import scala.collection.JavaConverters._
377+
378+
assert(inputSchema != null)
379+
val topic = Option(options.get(TOPIC_OPTION_KEY).orElse(null)).map(_.trim)
380+
val producerParams = kafkaParamsForProducer(options.asMap.asScala.toMap)
381+
new KafkaStreamingWrite(topic, producerParams, inputSchema)
382+
}
383+
}
384+
}
377385
}
378386

379387
class KafkaScan(options: DataSourceOptions) extends Scan {
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
2323
import org.apache.spark.sql.catalyst.expressions.Attribute
2424
import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
2525
import org.apache.spark.sql.sources.v2.writer._
26-
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport}
26+
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
2727
import org.apache.spark.sql.types.StructType
2828

2929
/**
@@ -33,18 +33,18 @@ import org.apache.spark.sql.types.StructType
3333
case object KafkaWriterCommitMessage extends WriterCommitMessage
3434

3535
/**
36-
* A [[StreamingWriteSupport]] for Kafka writing. Responsible for generating the writer factory.
36+
* A [[StreamingWrite]] for Kafka writing. Responsible for generating the writer factory.
3737
*
3838
* @param topic The topic this writer is responsible for. If None, topic will be inferred from
3939
* a `topic` field in the incoming data.
4040
* @param producerParams Parameters for Kafka producers in each task.
4141
* @param schema The schema of the input data.
4242
*/
43-
class KafkaStreamingWriteSupport(
43+
class KafkaStreamingWrite(
4444
topic: Option[String],
4545
producerParams: ju.Map[String, Object],
4646
schema: StructType)
47-
extends StreamingWriteSupport {
47+
extends StreamingWrite {
4848

4949
validateQuery(schema.toAttributes, producerParams, topic)
5050

sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020
import org.apache.spark.annotation.Evolving;
2121

2222
/**
23-
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
23+
* A mix-in interface for {@link TableProvider}. Data sources can implement this interface to
2424
* propagate session configs with the specified key-prefix to all data source operations in this
2525
* session.
2626
*/
2727
@Evolving
28-
public interface SessionConfigSupport extends DataSourceV2 {
28+
public interface SessionConfigSupport extends TableProvider {
2929

3030
/**
3131
* Key prefix of the session configs to propagate, which is usually the data source name. Spark

sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java

Lines changed: 0 additions & 54 deletions
This file was deleted.

sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
* An empty mix-in interface for {@link Table}, to indicate this table supports batch write.
2525
* <p>
2626
* If a {@link Table} implements this interface, the
27-
* {@link SupportsWrite#newWriteBuilder(DataSourceOptions)} must return a {@link WriteBuilder}
27+
* {@link SupportsWrite#newWriteBuilder(DataSourceOptions)} must return a {@link WriteBuilder}
2828
* with {@link WriteBuilder#buildForBatch()} implemented.
2929
* </p>
3030
*/
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2;
19+
20+
import org.apache.spark.annotation.Evolving;
21+
import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
22+
import org.apache.spark.sql.sources.v2.writer.WriteBuilder;
23+
24+
/**
25+
* An empty mix-in interface for {@link Table}, to indicate this table supports streaming write.
26+
* <p>
27+
* If a {@link Table} implements this interface, the
28+
* {@link SupportsWrite#newWriteBuilder(DataSourceOptions)} must return a {@link WriteBuilder}
29+
* with {@link WriteBuilder#buildForStreaming()} implemented.
30+
* </p>
31+
*/
32+
@Evolving
33+
public interface SupportsStreamingWrite extends SupportsWrite, BaseStreamingSink { }

sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@
2929
* </p>
3030
*/
3131
@Evolving
32-
// TODO: do not extend `DataSourceV2`, after we finish the API refactor completely.
33-
public interface TableProvider extends DataSourceV2 {
32+
public interface TableProvider {
3433

3534
/**
3635
* Return a {@link Table} instance to do read/write with user-specified options.

sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.spark.annotation.Evolving;
2121
import org.apache.spark.sql.sources.v2.SupportsBatchWrite;
2222
import org.apache.spark.sql.sources.v2.Table;
23+
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite;
2324
import org.apache.spark.sql.types.StructType;
2425

2526
/**
@@ -64,6 +65,12 @@ default WriteBuilder withInputDataSchema(StructType schema) {
6465
* {@link SupportsSaveMode}.
6566
*/
6667
default BatchWrite buildForBatch() {
67-
throw new UnsupportedOperationException("Batch scans are not supported");
68+
throw new UnsupportedOperationException(getClass().getName() +
69+
" does not support batch write");
70+
}
71+
72+
default StreamingWrite buildForStreaming() {
73+
throw new UnsupportedOperationException(getClass().getName() +
74+
" does not support streaming write");
6875
}
6976
}

sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020
import java.io.Serializable;
2121

2222
import org.apache.spark.annotation.Evolving;
23-
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport;
23+
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite;
2424

2525
/**
2626
* A commit message returned by {@link DataWriter#commit()} and will be sent back to the driver side
2727
* as the input parameter of {@link BatchWrite#commit(WriterCommitMessage[])} or
28-
* {@link StreamingWriteSupport#commit(long, WriterCommitMessage[])}.
28+
* {@link StreamingWrite#commit(long, WriterCommitMessage[])}.
2929
*
3030
* This is an empty interface, data sources should define their own message class and use it when
3131
* generating messages at executor side and handling the messages at driver side.

sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
/**
2828
* A factory of {@link DataWriter} returned by
29-
* {@link StreamingWriteSupport#createStreamingWriterFactory()}, which is responsible for creating
29+
* {@link StreamingWrite#createStreamingWriterFactory()}, which is responsible for creating
3030
* and initializing the actual data writer at executor side.
3131
*
3232
* Note that, the writer factory will be serialized and sent to executors, then the data writer

0 commit comments

Comments
 (0)