Skip to content

Commit 2859ea3

Browse files
committed
[SPARK-16020][SQL] Fix complete mode aggregation with console sink
## What changes were proposed in this pull request? We cannot use `limit` on DataFrame in ConsoleSink because it will use a wrong planner. This PR just collects `DataFrame` and calls `show` on a batch DataFrame based on the result. This is fine since ConsoleSink is only for debugging. ## How was this patch tested? Manually confirmed ConsoleSink now works with complete mode aggregation. Author: Shixiong Zhu <[email protected]> Closes #13740 from zsxwing/complete-console. (cherry picked from commit d0ac0e6) Signed-off-by: Shixiong Zhu <[email protected]>
1 parent 8b7e561 commit 2859ea3

File tree

3 files changed

+105
-1
lines changed

3 files changed

+105
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ trait Sink {
3030
* Adds a batch of data to this sink. The data for a given `batchId` is deterministic and if
3131
* this method is called more than once with the same batchId (which will happen in the case of
3232
* failures), then `data` should only be added once.
33+
*
34+
* Note: You cannot apply any operators on `data` except consuming it (e.g., `collect/foreach`).
35+
* Otherwise, you may get a wrong result.
3336
*/
3437
def addBatch(batchId: Long, data: DataFrame): Unit
3538
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ class ConsoleSink(options: Map[String, String]) extends Sink with Logging {
4545
println(batchIdStr)
4646
println("-------------------------------------------")
4747
// scalastyle:off println
48-
data.show(numRowsToShow, isTruncated)
48+
data.sparkSession.createDataFrame(
49+
data.sparkSession.sparkContext.parallelize(data.collect()), data.schema)
50+
.show(numRowsToShow, isTruncated)
4951
}
5052
}
5153

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.streaming
19+
20+
import java.io.{ByteArrayOutputStream, PrintStream}
21+
import java.nio.charset.StandardCharsets.UTF_8
22+
23+
import org.scalatest.BeforeAndAfter
24+
25+
import org.apache.spark.sql.streaming.StreamTest
26+
27+
class ConsoleSinkSuite extends StreamTest with BeforeAndAfter {
28+
29+
import testImplicits._
30+
31+
after {
32+
sqlContext.streams.active.foreach(_.stop())
33+
}
34+
35+
test("SPARK-16020 Complete mode aggregation with console sink") {
36+
withTempDir { checkpointLocation =>
37+
val origOut = System.out
38+
val stdout = new ByteArrayOutputStream()
39+
try {
40+
// Hook Java System.out.println
41+
System.setOut(new PrintStream(stdout))
42+
// Hook Scala println
43+
Console.withOut(stdout) {
44+
val input = MemoryStream[String]
45+
val df = input.toDF().groupBy("value").count()
46+
val query = df.writeStream
47+
.format("console")
48+
.outputMode("complete")
49+
.option("checkpointLocation", checkpointLocation.getAbsolutePath)
50+
.start()
51+
input.addData("a")
52+
query.processAllAvailable()
53+
input.addData("a", "b")
54+
query.processAllAvailable()
55+
input.addData("a", "b", "c")
56+
query.processAllAvailable()
57+
query.stop()
58+
}
59+
System.out.flush()
60+
} finally {
61+
System.setOut(origOut)
62+
}
63+
64+
val expected = """-------------------------------------------
65+
|Batch: 0
66+
|-------------------------------------------
67+
|+-----+-----+
68+
||value|count|
69+
|+-----+-----+
70+
|| a| 1|
71+
|+-----+-----+
72+
|
73+
|-------------------------------------------
74+
|Batch: 1
75+
|-------------------------------------------
76+
|+-----+-----+
77+
||value|count|
78+
|+-----+-----+
79+
|| a| 2|
80+
|| b| 1|
81+
|+-----+-----+
82+
|
83+
|-------------------------------------------
84+
|Batch: 2
85+
|-------------------------------------------
86+
|+-----+-----+
87+
||value|count|
88+
|+-----+-----+
89+
|| a| 3|
90+
|| b| 2|
91+
|| c| 1|
92+
|+-----+-----+
93+
|
94+
|""".stripMargin
95+
assert(expected === new String(stdout.toByteArray, UTF_8))
96+
}
97+
}
98+
99+
}

0 commit comments

Comments
 (0)