Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ class KafkaSourceSuite extends KafkaSourceTest {
AddKafkaData(Set(topic), 1, 2, 3),
CheckAnswer(2, 3, 4),
AssertOnQuery { query =>
val recordsRead = query.recentProgresses.map(_.numInputRows).sum
val recordsRead = query.recentProgress.map(_.numInputRows).sum
recordsRead == 3
}
)
Expand Down
2 changes: 1 addition & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ object MimaExcludes {
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.sourceStatuses"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.lastProgress"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.recentProgresses"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.recentProgress"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryManager.get"),

Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ def status(self):

@property
@since(2.1)
def recentProgresses(self):
def recentProgress(self):
"""Returns an array of the most recent [[StreamingQueryProgress]] updates for this query.
The number of progress updates retained for each stream is configured by Spark session
configuration `spark.sql.streaming.numRecentProgresses`.
configuration `spark.sql.streaming.numRecentProgressUpdates`.
"""
return [json.loads(p.json()) for p in self._jsq.recentProgresses()]
return [json.loads(p.json()) for p in self._jsq.recentProgress()]

@property
@since(2.1)
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1116,11 +1116,11 @@ def test_stream_status_and_progress(self):
try:
q.processAllAvailable()
lastProgress = q.lastProgress
recentProgresses = q.recentProgresses
recentProgress = q.recentProgress
status = q.status
self.assertEqual(lastProgress['name'], q.name)
self.assertEqual(lastProgress['id'], q.id)
self.assertTrue(any(p == lastProgress for p in recentProgresses))
self.assertTrue(any(p == lastProgress for p in recentProgress))
self.assertTrue(
"message" in status and
"isDataAvailable" in status and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ trait ProgressReporter extends Logging {
def status: StreamingQueryStatus = currentStatus

/** Returns an array containing the most recent query progress updates. */
def recentProgresses: Array[StreamingQueryProgress] = progressBuffer.synchronized {
def recentProgress: Array[StreamingQueryProgress] = progressBuffer.synchronized {
progressBuffer.toArray
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ object SQLConf {
.createWithDefault(false)

val STREAMING_PROGRESS_RETENTION =
SQLConfigBuilder("spark.sql.streaming.numRecentProgresses")
SQLConfigBuilder("spark.sql.streaming.numRecentProgressUpdates")
.doc("The number of progress updates to retain for a streaming query")
.intConf
.createWithDefault(100)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,11 @@ trait StreamingQuery {
/**
* Returns an array of the most recent [[StreamingQueryProgress]] updates for this query.
* The number of progress updates retained for each stream is configured by Spark session
* configuration `spark.sql.streaming.numRecentProgresses`.
* configuration `spark.sql.streaming.numRecentProgressUpdates`.
*
* @since 2.1.0
*/
def recentProgresses: Array[StreamingQueryProgress]
def recentProgress: Array[StreamingQueryProgress]

/**
* Returns the most recent [[StreamingQueryProgress]] update of this streaming query.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,9 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf
try {
inputData.addData(10, 11, 12)
query.processAllAvailable()
val recentProgress = query.recentProgresses.filter(_.numInputRows != 0).headOption
val recentProgress = query.recentProgress.filter(_.numInputRows != 0).headOption
assert(recentProgress.isDefined && recentProgress.get.numInputRows === 3,
s"recentProgresses[${query.recentProgresses.toList}] doesn't contain correct metrics")
s"recentProgress[${query.recentProgress.toList}] doesn't contain correct metrics")
} finally {
query.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
AddTextFileData("100", src, tmp),
CheckAnswer("100"),
AssertOnQuery { query =>
val actualProgress = query.recentProgresses
val actualProgress = query.recentProgress
.find(_.numInputRows > 0)
.getOrElse(sys.error("Could not find records with data."))
assert(actualProgress.numInputRows === 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
true
}
// `recentProgresses` should not receive too many no data events
// `recentProgress` should not receive too many no data events
actions += AssertOnQuery { q =>
q.recentProgresses.size > 1 && q.recentProgresses.size <= 11
q.recentProgress.size > 1 && q.recentProgress.size <= 11
}
testStream(input.toDS)(actions: _*)
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
)
}

testQuietly("status, lastProgress, and recentProgresses") {
testQuietly("status, lastProgress, and recentProgress") {
import StreamingQuerySuite._
clock = new StreamManualClock

Expand Down Expand Up @@ -201,7 +201,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
AssertOnQuery(_.status.isDataAvailable === false),
AssertOnQuery(_.status.isTriggerActive === false),
AssertOnQuery(_.status.message === "Waiting for next trigger"),
AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),

// Test status and progress while offset is being fetched
AddData(inputData, 1, 2),
Expand All @@ -210,22 +210,22 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
AssertOnQuery(_.status.isDataAvailable === false),
AssertOnQuery(_.status.isTriggerActive === true),
AssertOnQuery(_.status.message.startsWith("Getting offsets from")),
AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),

// Test status and progress while batch is being fetched
AdvanceManualClock(200), // time = 300 to unblock getOffset, will block on getBatch
AssertStreamExecThreadToWaitForClock(),
AssertOnQuery(_.status.isDataAvailable === true),
AssertOnQuery(_.status.isTriggerActive === true),
AssertOnQuery(_.status.message === "Processing new data"),
AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),

// Test status and progress while batch is being processed
AdvanceManualClock(300), // time = 600 to unblock getBatch, will block in Spark job
AssertOnQuery(_.status.isDataAvailable === true),
AssertOnQuery(_.status.isTriggerActive === true),
AssertOnQuery(_.status.message === "Processing new data"),
AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),

// Test status and progress while batch processing has completed
AdvanceManualClock(500), // time = 1100 to unblock job
Expand All @@ -236,8 +236,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
AssertOnQuery(_.status.message === "Waiting for next trigger"),
AssertOnQuery { query =>
assert(query.lastProgress != null)
assert(query.recentProgresses.exists(_.numInputRows > 0))
assert(query.recentProgresses.last.eq(query.lastProgress))
assert(query.recentProgress.exists(_.numInputRows > 0))
assert(query.recentProgress.last.eq(query.lastProgress))

val progress = query.lastProgress
assert(progress.id === query.id)
Expand Down Expand Up @@ -274,7 +274,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
AssertOnQuery(_.status.isTriggerActive === false),
AssertOnQuery(_.status.message === "Waiting for next trigger"),
AssertOnQuery { query =>
assert(query.recentProgresses.last.eq(query.lastProgress))
assert(query.recentProgress.last.eq(query.lastProgress))
assert(query.lastProgress.batchId === 1)
assert(query.lastProgress.sources(0).inputRowsPerSecond === 1.818)
true
Expand Down Expand Up @@ -408,7 +408,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
try {
val q = streamingDF.writeStream.format("memory").queryName("test").start()
q.processAllAvailable()
q.recentProgresses.head
q.recentProgress.head
} finally {
spark.streams.active.map(_.stop())
}
Expand Down