Skip to content

Commit cd18bda

Browse files
committed
updated structured streaming guide
1 parent f376c37 commit cd18bda

File tree

1 file changed

+70
-27
lines changed

1 file changed

+70
-27
lines changed

docs/structured-streaming-programming-guide.md

Lines changed: 70 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -626,52 +626,95 @@ The result tables would look something like the following.
626626

627627
![Window Operations](img/structured-streaming-window.png)
628628

629-
Since this windowing is similar to grouping, in code, you can use `groupBy()` and `window()` operations to express windowed aggregations.
629+
Since this windowing is similar to grouping, in code, you can use `groupBy()` and `window()` operations to express windowed aggregations. You can see the full code for the below examples in
630+
[Scala]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala)/
631+
[Java]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java)/
632+
[Python]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py).
630633

631634
<div class="codetabs">
632635
<div data-lang="scala" markdown="1">
633636

634637
{% highlight scala %}
635-
// Number of events in every 1 minute time windows
636-
df.groupBy(window(df.col("time"), "1 minute"))
637-
.count()
638-
638+
import spark.implicits._
639639

640-
// Average number of events for each device type in every 1 minute time windows
641-
df.groupBy(
642-
df.col("type"),
643-
window(df.col("time"), "1 minute"))
644-
.avg("signal")
640+
// Create DataFrame representing the stream of input lines from connection to host:port
641+
val lines = spark.readStream
642+
.format("socket")
643+
.option("host", "localhost")
644+
.option("port", 9999)
645+
.option("includeTimestamp", true)
646+
.load().as[(String, Timestamp)]
647+
648+
// Split the lines into words, retaining timestamps
649+
val words = lines.flatMap(line =>
650+
line._1.split(" ").map(word => (word, line._2))
651+
).toDF("word", "timestamp")
652+
653+
// Group the data by window and word and compute the count of each group
654+
val windowedCounts = words.groupBy(
655+
window($"timestamp", "10 minutes", "5 minutes"), $"word"
656+
).count().orderBy("window")
645657
{% endhighlight %}
646658

647659
</div>
648660
<div data-lang="java" markdown="1">
649661

650662
{% highlight java %}
651-
import static org.apache.spark.sql.functions.window;
652-
653-
// Number of events in every 1 minute time windows
654-
df.groupBy(window(df.col("time"), "1 minute"))
655-
.count();
656-
657-
// Average number of events for each device type in every 1 minute time windows
658-
df.groupBy(
659-
df.col("type"),
660-
window(df.col("time"), "1 minute"))
661-
.avg("signal");
662-
663+
// Create DataFrame representing the stream of input lines from connection to host:port
664+
Dataset<Tuple2<String, Timestamp>> lines = spark
665+
.readStream()
666+
.format("socket")
667+
.option("host", "localhost")
668+
.option("port", 9999)
669+
.option("includeTimestamp", true)
670+
.load().as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()));
671+
672+
// Split the lines into words, retaining timestamps
673+
Dataset<Row> words = lines.flatMap(
674+
new FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>() {
675+
@Override
676+
public Iterator<Tuple2<String, Timestamp>> call(Tuple2<String, Timestamp> t) {
677+
List<Tuple2<String, Timestamp>> result = new ArrayList<>();
678+
for (String word : t._1.split(" ")) {
679+
result.add(new Tuple2<>(word, t._2));
680+
}
681+
return result.iterator();
682+
}
683+
},
684+
Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())
685+
).toDF("word", "timestamp");
686+
687+
// Group the data by window and word and compute the count of each group
688+
Dataset<Row> windowedCounts = words.groupBy(
689+
functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
690+
words.col("word")
691+
).count().orderBy("window");
663692
{% endhighlight %}
664693

665694
</div>
666695
<div data-lang="python" markdown="1">
667696
{% highlight python %}
668-
from pyspark.sql.functions import window
697+
# Create DataFrame representing the stream of input lines from connection to host:port
698+
lines = spark\
699+
.readStream\
700+
.format('socket')\
701+
.option('host', 'localhost')\
702+
.option('port', 9999)\
703+
.option('includeTimestamp', 'true')\
704+
.load()
669705

670-
# Number of events in every 1 minute time windows
671-
df.groupBy(window("time", "1 minute")).count()
706+
# Split the lines into words, retaining timestamps
707+
# split() splits each line into an array, and explode() turns the array into multiple rows
708+
words = lines.select(
709+
explode(split(lines.value, ' ')).alias('word'),
710+
lines.timestamp
711+
)
672712

673-
# Average number of events for each device type in every 1 minute time windows
674-
df.groupBy("type", window("time", "1 minute")).avg("signal")
713+
# Group the data by window and word and compute the count of each group
714+
windowedCounts = words.groupBy(
715+
window(words.timestamp, '10 minutes', '5 minutes'),
716+
words.word
717+
).count().orderBy('window')
675718
{% endhighlight %}
676719

677720
</div>

0 commit comments

Comments
 (0)