@@ -537,21 +537,21 @@ Most of the common operations on DataFrame/Dataset are supported for streaming.
537537<div data-lang =" scala " markdown =" 1 " >
538538
539539{% highlight scala %}
540- case class DeviceData(device: String, type : String, signal: Double, time: DateTime)
540+ case class DeviceData(device: String, deviceType : String, signal: Double, time: DateTime)
541541
542- val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, type : string, signal: double, time: string }
542+ val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType : string, signal: double, time: string }
543543val ds: Dataset[ DeviceData] = df.as[ DeviceData] // streaming Dataset with IOT device data
544544
545545// Select the devices which have signal more than 10
546546df.select("device").where("signal > 10") // using untyped APIs
547547ds.filter(_ .signal > 10).map(_ .device) // using typed APIs
548548
549549// Running count of the number of updates for each device type
550- df.groupBy("type ").count() // using untyped API
550+ df.groupBy("deviceType ").count() // using untyped API
551551
552552// Running average signal for each device type
553- import org.apache.spark.sql.expressions.scalalang.typed. _
554- ds.groupByKey(_ .type ).agg(typed.avg(_ .signal)) // using typed API
553+ import org.apache.spark.sql.expressions.scalalang.typed
554+ ds.groupByKey(_ .deviceType ).agg(typed.avg(_ .signal)) // using typed API
555555{% endhighlight %}
556556
557557</div >
@@ -565,7 +565,7 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
565565
566566public class DeviceData {
567567 private String device;
568- private String type ;
568+ private String deviceType ;
569569 private Double signal;
570570 private java.sql.Date time;
571571 ...
@@ -590,13 +590,13 @@ ds.filter(new FilterFunction<DeviceData>() { // using typed APIs
590590}, Encoders.STRING());
591591
592592// Running count of the number of updates for each device type
593- df.groupBy("type ").count(); // using untyped API
593+ df.groupBy("deviceType ").count(); // using untyped API
594594
595595// Running average signal for each device type
596596ds.groupByKey(new MapFunction<DeviceData, String>() { // using typed API
597597 @Override
598598 public String call(DeviceData value) throws Exception {
599- return value.getType ();
599+ return value.getDeviceType ();
600600 }
601601}, Encoders.STRING()).agg(typed.avg(new MapFunction<DeviceData, Double>() {
602602 @Override
@@ -611,13 +611,13 @@ ds.groupByKey(new MapFunction<DeviceData, String>() { // using typed API
611611<div data-lang =" python " markdown =" 1 " >
612612
613613{% highlight python %}
614- df = ... # streaming DataFrame with IOT device data with schema { device: string, type : string, signal: double, time: DateType }
614+ df = ... # streaming DataFrame with IOT device data with schema { device: string, deviceType : string, signal: double, time: DateType }
615615
616616# Select the devices which have signal more than 10
617617df.select("device").where("signal > 10")
618618
619619# Running count of the number of updates for each device type
620- df.groupBy("type ").count()
620+ df.groupBy("deviceType ").count()
621621{% endhighlight %}
622622</div >
623623</div >
@@ -973,7 +973,7 @@ Here is a table of all the sinks, and the corresponding settings.
973973 <tr >
974974 <td><b>File Sink</b></td>
975975 <td>Append</td>
976- <td><pre>writeStream<br/> .format("parquet")<br/> .start()</pre></td>
976+ <td><pre>writeStream<br/> .format("parquet")<br/> .option(<br/> "checkpointLocation",<br/> "path/to/checkpoint/dir")<br/> .option(<br/> "path",<br/> "path/to/destination/dir")<br/> . start()</pre></td>
977977 <td>Yes</td>
978978 <td>Supports writes to partitioned tables. Partitioning by time may be useful.</td>
979979 </tr >
@@ -1026,7 +1026,9 @@ noAggDF
10261026// Write new data to Parquet files
10271027noAggDF
10281028 .writeStream
1029- .parquet("path/to/destination/directory")
1029+ .format("parquet")
1030+ .option("checkpointLocation", "path/to/checkpoint/dir")
1031+ .option("path", "path/to/destination/dir")
10301032 .start()
10311033
10321034// ========== DF with aggregation ==========
@@ -1066,7 +1068,9 @@ noAggDF
10661068// Write new data to Parquet files
10671069noAggDF
10681070 .writeStream()
1069- .parquet("path/to/destination/directory")
1071+ .format("parquet")
1072+ .option("checkpointLocation", "path/to/checkpoint/dir")
1073+ .option("path", "path/to/destination/dir")
10701074 .start();
10711075
10721076// ========== DF with aggregation ==========
@@ -1106,7 +1110,9 @@ noAggDF \
11061110# Write new data to Parquet files
11071111noAggDF \
11081112 .writeStream() \
1109- .parquet("path/to/destination/directory") \
1113+ .format("parquet") \
1114+ .option("checkpointLocation", "path/to/checkpoint/dir") \
1115+ .option("path", "path/to/destination/dir") \
11101116 .start()
11111117
11121118# ========== DF with aggregation ==========
@@ -1120,11 +1126,11 @@ aggDF \
11201126 .start()
11211127
11221128# Have all the aggregates in an in memory table. The query name will be the table name
1123- aggDF\
1124- .writeStream()\
1125- .queryName("aggregates")\
1126- .outputMode("complete")\
1127- .format("memory")\
1129+ aggDF \
1130+ .writeStream() \
1131+ .queryName("aggregates") \
1132+ .outputMode("complete") \
1133+ .format("memory") \
11281134 .start()
11291135
11301136spark.sql("select * from aggregates").show() # interactively query in-memory table
@@ -1159,7 +1165,9 @@ The `StreamingQuery` object created when a query is started can be used to monit
11591165{% highlight scala %}
11601166val query = df.writeStream.format("console").start() // get the query object
11611167
1162- query.id // get the unique identifier of the running query
1168+ query.id // get the unique identifier of the running query that persists across restarts from checkpoint data
1169+
1170+ query.runId // get the unique id of this run of the query, which will be generated at every start/restart
11631171
11641172query.name // get the name of the auto-generated or user-specified name
11651173
@@ -1169,11 +1177,11 @@ query.stop() // stop the query
11691177
11701178query.awaitTermination() // block until query is terminated, with stop() or with error
11711179
1172- query.exception() // the exception if the query has been terminated with error
1180+ query.exception // the exception if the query has been terminated with error
11731181
1174- query.sourceStatus() // progress information about data has been read from the input sources
1182+ query.recentProgress // an array of the most recent progress updates for this query
11751183
1176- query.sinkStatus() // progress information about data written to the output sink
1184+ query.lastProgress // the most recent progress update of this streaming query
11771185{% endhighlight %}
11781186
11791187
@@ -1183,21 +1191,23 @@ query.sinkStatus() // progress information about data written to the output si
11831191{% highlight java %}
11841192StreamingQuery query = df.writeStream().format("console").start(); // get the query object
11851193
1186- query.id(); // get the unique identifier of the running query
1194+ query.id(); // get the unique identifier of the running query that persists across restarts from checkpoint data
1195+
1196+ query.runId(); // get the unique id of this run of the query, which will be generated at every start/restart
11871197
11881198query.name(); // get the name of the auto-generated or user-specified name
11891199
11901200query.explain(); // print detailed explanations of the query
11911201
1192- query.stop(); // stop the query
1202+ query.stop(); // stop the query
11931203
11941204query.awaitTermination(); // block until query is terminated, with stop() or with error
11951205
1196- query.exception(); // the exception if the query has been terminated with error
1206+ query.exception(); // the exception if the query has been terminated with error
11971207
1198- query.sourceStatus (); // progress information about data has been read from the input sources
1208+ query.recentProgress (); // an array of the most recent progress updates for this query
11991209
1200- query.sinkStatus (); // progress information about data written to the output sink
1210+ query.lastProgress (); // the most recent progress update of this streaming query
12011211
12021212{% endhighlight %}
12031213
@@ -1207,7 +1217,9 @@ query.sinkStatus(); // progress information about data written to the output s
12071217{% highlight python %}
12081218query = df.writeStream().format("console").start() # get the query object
12091219
1210- query.id() # get the unique identifier of the running query
1220+ query.id() # get the unique identifier of the running query that persists across restarts from checkpoint data
1221+
1222+ query.runId() # get the unique id of this run of the query, which will be generated at every start/restart
12111223
12121224query.name() # get the name of the auto-generated or user-specified name
12131225
@@ -1217,11 +1229,11 @@ query.stop() # stop the query
12171229
12181230query.awaitTermination() # block until query is terminated, with stop() or with error
12191231
1220- query.exception() # the exception if the query has been terminated with error
1232+ query.exception() # the exception if the query has been terminated with error
12211233
1222- query.sourceStatus () # progress information about data has been read from the input sources
1234+ query.recentProgress () # an array of the most recent progress updates for this query
12231235
1224- query.sinkStatus () # progress information about data written to the output sink
1236+ query.lastProgress () # the most recent progress update of this streaming query
12251237
12261238{% endhighlight %}
12271239
@@ -1491,14 +1503,17 @@ spark.streams.addListener(new StreamingQueryListener() {
14911503{% highlight java %}
14921504SparkSession spark = ...
14931505
1494- spark.streams.addListener(new StreamingQueryListener() {
1495- @Overrides void onQueryStarted(QueryStartedEvent queryStarted) {
1506+ spark.streams().addListener(new StreamingQueryListener() {
1507+ @Override
1508+ public void onQueryStarted(QueryStartedEvent queryStarted) {
14961509 System.out.println("Query started: " + queryStarted.id());
14971510 }
1498- @Overrides void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
1511+ @Override
1512+ public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
14991513 System.out.println("Query terminated: " + queryTerminated.id());
15001514 }
1501- @Overrides void onQueryProgress(QueryProgressEvent queryProgress) {
1515+ @Override
1516+ public void onQueryProgress(QueryProgressEvent queryProgress) {
15021517 System.out.println("Query made progress: " + queryProgress.progress());
15031518 }
15041519});
0 commit comments