@@ -20,45 +20,85 @@ package org.apache.spark.sql.execution.streaming.sources
2020import org .apache .spark .internal .Logging
2121import org .apache .spark .sql .{Row , SparkSession }
2222import org .apache .spark .sql .sources .v2 .DataSourceV2Options
23+ import org .apache .spark .sql .sources .v2 .streaming .writer .ContinuousWriter
2324import org .apache .spark .sql .sources .v2 .writer .{DataSourceV2Writer , DataWriterFactory , WriterCommitMessage }
2425import org .apache .spark .sql .types .StructType
2526
26- /**
27- * A [[DataSourceV2Writer ]] that collects results to the driver and prints them in the console.
28- * Generated by [[org.apache.spark.sql.execution.streaming.ConsoleSinkProvider ]].
29- *
30- * This sink should not be used for production, as it requires sending all rows to the driver
31- * and does not support recovery.
32- */
33- class ConsoleWriter (batchId : Long , schema : StructType , options : DataSourceV2Options )
34- extends DataSourceV2Writer with Logging {
27+ /** Common methods used to create writes for the the console sink */
28+ trait ConsoleWriter extends Logging {
29+
30+ def options : DataSourceV2Options
31+
3532 // Number of rows to display, by default 20 rows
36- private val numRowsToShow = options.getInt(" numRows" , 20 )
33+ protected val numRowsToShow = options.getInt(" numRows" , 20 )
3734
3835 // Truncate the displayed data if it is too long, by default it is true
39- private val isTruncated = options.getBoolean(" truncate" , true )
36+ protected val isTruncated = options.getBoolean(" truncate" , true )
4037
4138 assert(SparkSession .getActiveSession.isDefined)
42- private val spark = SparkSession .getActiveSession.get
39+ protected val spark = SparkSession .getActiveSession.get
40+
41+ def createWriterFactory (): DataWriterFactory [Row ] = PackedRowWriterFactory
4342
44- override def createWriterFactory () : DataWriterFactory [ Row ] = PackedRowWriterFactory
43+ def abort ( messages : Array [ WriterCommitMessage ]) : Unit = {}
4544
46- override def commit (messages : Array [WriterCommitMessage ]): Unit = synchronized {
47- val batch = messages.collect {
45+ protected def printRows (
46+ commitMessages : Array [WriterCommitMessage ],
47+ schema : StructType ,
48+ printMessage : String ): Unit = {
49+ val rows = commitMessages.collect {
4850 case PackedRowCommitMessage (rows) => rows
4951 }.flatten
5052
5153 // scalastyle:off println
5254 println(" -------------------------------------------" )
53- println(s " Batch: $batchId " )
55+ println(printMessage )
5456 println(" -------------------------------------------" )
5557 // scalastyle:off println
56- spark.createDataFrame(
57- spark.sparkContext.parallelize(batch ), schema)
58+ spark
59+ .createDataFrame( spark.sparkContext.parallelize(rows ), schema)
5860 .show(numRowsToShow, isTruncated)
5961 }
62+ }
63+
64+
65+ /**
66+ * A [[DataSourceV2Writer ]] that collects results from a micro-batch query to the driver and
67+ * prints them in the console. Created by
68+ * [[org.apache.spark.sql.execution.streaming.ConsoleSinkProvider ]].
69+ *
70+ * This sink should not be used for production, as it requires sending all rows to the driver
71+ * and does not support recovery.
72+ */
73+ class ConsoleMicroBatchWriter (batchId : Long , schema : StructType , val options : DataSourceV2Options )
74+ extends DataSourceV2Writer with ConsoleWriter {
75+
76+ override def commit (messages : Array [WriterCommitMessage ]): Unit = {
77+ printRows(messages, schema, s " Batch: $batchId" )
78+ }
79+
80+ override def toString (): String = {
81+ s " ConsoleMicroBatchWriter[numRows= $numRowsToShow, truncate= $isTruncated] "
82+ }
83+ }
6084
61- override def abort (messages : Array [WriterCommitMessage ]): Unit = {}
6285
63- override def toString (): String = s " ConsoleWriter[numRows= $numRowsToShow, truncate= $isTruncated] "
86+ /**
87+ * A [[DataSourceV2Writer ]] that collects results from a continuous query to the driver and
88+ * prints them in the console. Created by
89+ * [[org.apache.spark.sql.execution.streaming.ConsoleSinkProvider ]].
90+ *
91+ * This sink should not be used for production, as it requires sending all rows to the driver
92+ * and does not support recovery.
93+ */
94+ class ConsoleContinuousWriter (schema : StructType , val options : DataSourceV2Options )
95+ extends ContinuousWriter with ConsoleWriter {
96+
97+ override def commit (epochId : Long , messages : Array [WriterCommitMessage ]): Unit = {
98+ printRows(messages, schema, s " Continuous processing epoch $epochId" )
99+ }
100+
101+ override def toString (): String = {
102+ s " ConsoleContinuousWriter[numRows= $numRowsToShow, truncate= $isTruncated] "
103+ }
64104}
0 commit comments