We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
There was an error while loading. Please reload this page.
1 parent 50cf3e6 commit ac9009fCopy full SHA for ac9009f
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
@@ -253,6 +253,21 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf
253
query.stop()
254
}
255
256
+
257
+ test("foreach sink should support metrics") {
258
+ val inputData = MemoryStream[Int]
259
+ val query = inputData.toDS()
260
+ .writeStream
261
+ .foreach(new TestForeachWriter())
262
+ .start()
263
+ try {
264
+ inputData.addData(10, 11, 12)
265
+ query.processAllAvailable()
266
+ assert(query.lastProgress.numInputRows === 3)
267
+ } finally {
268
+ query.stop()
269
+ }
270
271
272
273
/** A global object to collect events in the executor */
0 commit comments