Skip to content

Commit 4342efb

Browse files
committed
responded to comments
1 parent cd18bda commit 4342efb

File tree

1 file changed

+3
-50
lines changed

1 file changed

+3
-50
lines changed

docs/structured-streaming-programming-guide.md

Lines changed: 3 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -637,18 +637,7 @@ Since this windowing is similar to grouping, in code, you can use `groupBy()` an
637637
{% highlight scala %}
638638
import spark.implicits._
639639

640-
// Create DataFrame representing the stream of input lines from connection to host:port
641-
val lines = spark.readStream
642-
.format("socket")
643-
.option("host", "localhost")
644-
.option("port", 9999)
645-
.option("includeTimestamp", true)
646-
.load().as[(String, Timestamp)]
647-
648-
// Split the lines into words, retaining timestamps
649-
val words = lines.flatMap(line =>
650-
line._1.split(" ").map(word => (word, line._2))
651-
).toDF("word", "timestamp")
640+
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
652641

653642
// Group the data by window and word and compute the count of each group
654643
val windowedCounts = words.groupBy(
@@ -660,29 +649,7 @@ val windowedCounts = words.groupBy(
660649
<div data-lang="java" markdown="1">
661650

662651
{% highlight java %}
663-
// Create DataFrame representing the stream of input lines from connection to host:port
664-
Dataset<Tuple2<String, Timestamp>> lines = spark
665-
.readStream()
666-
.format("socket")
667-
.option("host", "localhost")
668-
.option("port", 9999)
669-
.option("includeTimestamp", true)
670-
.load().as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()));
671-
672-
// Split the lines into words, retaining timestamps
673-
Dataset<Row> words = lines.flatMap(
674-
new FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>() {
675-
@Override
676-
public Iterator<Tuple2<String, Timestamp>> call(Tuple2<String, Timestamp> t) {
677-
List<Tuple2<String, Timestamp>> result = new ArrayList<>();
678-
for (String word : t._1.split(" ")) {
679-
result.add(new Tuple2<>(word, t._2));
680-
}
681-
return result.iterator();
682-
}
683-
},
684-
Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())
685-
).toDF("word", "timestamp");
652+
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
686653

687654
// Group the data by window and word and compute the count of each group
688655
Dataset<Row> windowedCounts = words.groupBy(
@@ -694,21 +661,7 @@ Dataset<Row> windowedCounts = words.groupBy(
694661
</div>
695662
<div data-lang="python" markdown="1">
696663
{% highlight python %}
697-
# Create DataFrame representing the stream of input lines from connection to host:port
698-
lines = spark\
699-
.readStream\
700-
.format('socket')\
701-
.option('host', 'localhost')\
702-
.option('port', 9999)\
703-
.option('includeTimestamp', 'true')\
704-
.load()
705-
706-
# Split the lines into words, retaining timestamps
707-
# split() splits each line into an array, and explode() turns the array into multiple rows
708-
words = lines.select(
709-
explode(split(lines.value, ' ')).alias('word'),
710-
lines.timestamp
711-
)
664+
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
712665

713666
# Group the data by window and word and compute the count of each group
714667
windowedCounts = words.groupBy(

0 commit comments

Comments
 (0)