Skip to content

Commit f938614

Browse files
committed
address comments
1 parent ca80080 commit f938614

File tree

11 files changed

+58
-38
lines changed

11 files changed

+58
-38
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ private[kafka010] class KafkaMicroBatchReadSupport(
7373

7474
private val rangeCalculator = KafkaOffsetRangeCalculator(options)
7575

76+
private var endPartitionOffsets: KafkaSourceOffset = _
77+
7678
/**
7779
* Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
7880
* called in StreamExecutionThread. Otherwise, interrupting a thread while running
@@ -85,11 +87,12 @@ private[kafka010] class KafkaMicroBatchReadSupport(
8587
override def latestOffset(start: Offset): Offset = {
8688
val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
8789
val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
88-
KafkaSourceOffset(maxOffsetsPerTrigger.map { maxOffsets =>
90+
endPartitionOffsets = KafkaSourceOffset(maxOffsetsPerTrigger.map { maxOffsets =>
8991
rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
9092
}.getOrElse {
9193
latestPartitionOffsets
9294
})
95+
endPartitionOffsets
9396
}
9497

9598
override def fullSchema(): StructType = KafkaOffsetReader.kafkaSchema
@@ -153,10 +156,11 @@ private[kafka010] class KafkaMicroBatchReadSupport(
153156
KafkaMicroBatchReaderFactory
154157
}
155158

156-
override def getCustomMetrics(config: ScanConfig): CustomMetrics = {
157-
val endPartitionOffsets = config.asInstanceOf[SimpleStreamingScanConfig]
158-
.end.get.asInstanceOf[KafkaSourceOffset].partitionToOffsets
159-
KafkaCustomMetrics(kafkaOffsetReader.fetchLatestOffsets(), endPartitionOffsets)
159+
// TODO: figure out the life cycle of custom metrics, and make this method take `ScanConfig` as
160+
// a parameter.
161+
override def getCustomMetrics(): CustomMetrics = {
162+
KafkaCustomMetrics(
163+
kafkaOffsetReader.fetchLatestOffsets(), endPartitionOffsets.partitionToOffsets)
160164
}
161165

162166
override def deserializeOffset(json: String): Offset = {

sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchReadSupportProvider.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@ public interface BatchReadSupportProvider extends DataSourceV2 {
3434

3535
/**
3636
* Creates a {@link BatchReadSupport} instance to load the data from this data source with a user
37-
* specified schema.
37+
* specified schema, which is called by Spark at the beginning of each batch query.
38+
*
39+
* Spark will call this method at the beginning of each batch query to create a
40+
* {@link BatchReadSupport} instance.
3841
*
3942
* By default this method throws {@link UnsupportedOperationException}, implementations should
4043
* override this method to handle user specified schema.
@@ -48,7 +51,8 @@ default BatchReadSupport createBatchReadSupport(StructType schema, DataSourceOpt
4851
}
4952

5053
/**
51-
* Creates a {@link BatchReadSupport} instance to scan the data from this data source.
54+
* Creates a {@link BatchReadSupport} instance to scan the data from this data source, which is
55+
* called by Spark at the beginning of each batch query.
5256
*
5357
* @param options the options for the returned data source reader, which is an immutable
5458
* case-insensitive string-to-string map.

sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@
3535
public interface BatchWriteSupportProvider extends DataSourceV2 {
3636

3737
/**
38-
* Creates an optional {@link BatchWriteSupport} instance to save the data to this data source.
38+
* Creates an optional {@link BatchWriteSupport} instance to save the data to this data source,
39+
* which is called by Spark at the beginning of each batch query.
40+
*
3941
* Data sources can return None if there is no writing needed to be done according to the save
4042
* mode.
4143
*

sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupportProvider.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ public interface ContinuousReadSupportProvider extends DataSourceV2 {
3434

3535
/**
3636
* Creates a {@link ContinuousReadSupport} instance to scan the data from this streaming data
37-
* source with a user specified schema.
37+
* source with a user specified schema, which is called by Spark at the beginning of each
38+
* continuous streaming query.
3839
*
3940
* By default this method throws {@link UnsupportedOperationException}, implementations should
4041
* override this method to handle user specified schema.
@@ -55,7 +56,7 @@ default ContinuousReadSupport createContinuousReadSupport(
5556

5657
/**
5758
* Creates a {@link ContinuousReadSupport} instance to scan the data from this streaming data
58-
* source.
59+
* source, which is called by Spark at the beginning of each continuous streaming query.
5960
*
6061
* @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
6162
* recovery. Readers for the same logical source in the same query

sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupportProvider.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ public interface MicroBatchReadSupportProvider extends DataSourceV2 {
3434

3535
/**
3636
* Creates a {@link MicroBatchReadSupport} instance to scan the data from this streaming data
37-
* source with a user specified schema.
37+
* source with a user specified schema, which is called by Spark at the beginning of each
38+
* micro-batch streaming query.
3839
*
3940
* By default this method throws {@link UnsupportedOperationException}, implementations should
4041
* override this method to handle user specified schema.
@@ -55,7 +56,7 @@ default MicroBatchReadSupport createMicroBatchReadSupport(
5556

5657
/**
5758
* Creates a {@link MicroBatchReadSupport} instance to scan the data from this streaming data
58-
* source.
59+
* source, which is called by Spark at the beginning of each micro-batch streaming query.
5960
*
6061
* @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
6162
* recovery. Readers for the same logical source in the same query

sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@
3434
public interface StreamingWriteSupportProvider extends DataSourceV2, BaseStreamingSink {
3535

3636
/**
37-
* Creates a {@link StreamingWriteSupport} instance to save the data to this data source.
37+
* Creates a {@link StreamingWriteSupport} instance to save the data to this data source, which is
38+
* called by Spark at the beginning of each streaming query.
3839
*
3940
* @param queryId A unique string for the writing query. It's possible that there are many
4041
* writing queries running at the same time, and the returned

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/BatchReadSupport.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,20 @@
2424
*
2525
* The execution engine will get an instance of this interface from a data source provider
2626
* (e.g. {@link org.apache.spark.sql.sources.v2.BatchReadSupportProvider}) at the start of a batch
27-
* query, then call {@link #newScanConfigBuilder()} to create an instance of {@link ScanConfig}. The
28-
* {@link ScanConfigBuilder} can apply operator pushdown and keep the pushdown result in
27+
* query, then call {@link #newScanConfigBuilder()} and create an instance of {@link ScanConfig}.
28+
* The {@link ScanConfigBuilder} can apply operator pushdown and keep the pushdown result in
2929
* {@link ScanConfig}. The {@link ScanConfig} will be used to create input partitions and reader
30-
* factory to scan data from the data source.
30+
* factory to scan data from the data source with a Spark job.
3131
*/
3232
@InterfaceStability.Evolving
3333
public interface BatchReadSupport extends ReadSupport {
3434

3535
/**
36-
* Returns a builder of {@link ScanConfig}. The builder can take some query specific information
37-
* to do operators pushdown, and keep these information in the created {@link ScanConfig}.
36+
* Returns a builder of {@link ScanConfig}. Spark will call this method and create a
37+
* {@link ScanConfig} for each data scanning job.
38+
*
39+
* The builder can take some query specific information to do operators pushdown, and keep these
40+
* information in the created {@link ScanConfig}.
3841
*
3942
* This is the first step of the data scan. All other methods in {@link BatchReadSupport} needs
4043
* to take {@link ScanConfig} as an input.

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
import org.apache.spark.sql.types.StructType;
2222

2323
/**
24-
* An interface that carries query specific information for the data scan, like operator pushdown
25-
* information and streaming query offsets. This is defined as an empty interface, and data sources
26-
* should define their own {@link ScanConfig} classes.
24+
* An interface that carries query specific information for the data scanning job, like operator
25+
* pushdown information and streaming query offsets. This is defined as an empty interface, and data
26+
* sources should define their own {@link ScanConfig} classes.
2727
*
2828
* For APIs that take a {@link ScanConfig} as input, like
2929
* {@link ReadSupport#planInputPartitions(ScanConfig)},

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,20 +29,22 @@
2929
*
3030
* The execution engine will get an instance of this interface from a data source provider
3131
* (e.g. {@link org.apache.spark.sql.sources.v2.ContinuousReadSupportProvider}) at the start of a
32-
* streaming query, then call {@link #newScanConfigBuilder(Offset)} to create an instance of
32+
* streaming query, then call {@link #newScanConfigBuilder(Offset)} and create an instance of
3333
* {@link ScanConfig} for the duration of the streaming query or until
3434
* {@link #needsReconfiguration(ScanConfig)} is true. The {@link ScanConfig} will be used to create
35-
* input partitions and reader factory to scan data for its duration. At the end {@link #stop()}
36-
* will be called when the streaming execution is completed. Note that a single query may have
37-
* multiple executions due to restart or failure recovery.
35+
* input partitions and reader factory to scan data with a Spark job for its duration. At the end
36+
* {@link #stop()} will be called when the streaming execution is completed. Note that a single
37+
* query may have multiple executions due to restart or failure recovery.
3838
*/
3939
@InterfaceStability.Evolving
4040
public interface ContinuousReadSupport extends StreamingReadSupport, BaseStreamingSource {
4141

4242
/**
43-
* Returns a builder of {@link ScanConfig}. The builder can take some query specific information
44-
* to do operators pushdown, streaming offsets, etc., and keep these information in the
45-
* created {@link ScanConfig}.
43+
* Returns a builder of {@link ScanConfig}. Spark will call this method and create a
44+
* {@link ScanConfig} for each data scanning job.
45+
*
46+
* The builder can take some query specific information to do operators pushdown, store streaming
47+
* offsets, etc., and keep these information in the created {@link ScanConfig}.
4648
*
4749
* This is the first step of the data scan. All other methods in {@link ContinuousReadSupport}
4850
* needs to take {@link ScanConfig} as an input.

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReadSupport.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,21 @@
2727
*
2828
* The execution engine will get an instance of this interface from a data source provider
2929
* (e.g. {@link org.apache.spark.sql.sources.v2.MicroBatchReadSupportProvider}) at the start of a
30-
* streaming query, then call {@link #newScanConfigBuilder(Offset, Offset)} to create an instance of
31-
* {@link ScanConfig} for each micro-batch. The {@link ScanConfig} will be used to create input
32-
* partitions and reader factory to scan a micro-batch. At the end {@link #stop()} will be called
33-
* when the streaming execution is completed. Note that a single query may have multiple executions
34-
* due to restart or failure recovery.
30+
* streaming query, then call {@link #newScanConfigBuilder(Offset, Offset)} and create an instance
31+
* of {@link ScanConfig} for each micro-batch. The {@link ScanConfig} will be used to create input
32+
* partitions and reader factory to scan a micro-batch with a Spark job. At the end {@link #stop()}
33+
* will be called when the streaming execution is completed. Note that a single query may have
34+
* multiple executions due to restart or failure recovery.
3535
*/
3636
@InterfaceStability.Evolving
3737
public interface MicroBatchReadSupport extends StreamingReadSupport, BaseStreamingSource {
3838

3939
/**
40-
* Returns a builder of {@link ScanConfig}. The builder can take some query specific information
41-
* to do operators pushdown, take streaming offsets, etc., and keep these information in the
42-
* created {@link ScanConfig}.
40+
* Returns a builder of {@link ScanConfig}. Spark will call this method and create a
41+
* {@link ScanConfig} for each data scanning job.
42+
*
43+
* The builder can take some query specific information to do operators pushdown, store streaming
44+
* offsets, etc., and keep these information in the created {@link ScanConfig}.
4345
*
4446
* This is the first step of the data scan. All other methods in {@link MicroBatchReadSupport}
4547
* needs to take {@link ScanConfig} as an input.

0 commit comments

Comments
 (0)