From efececf49069212761af077c32caaabfd94c2066 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Tue, 6 Dec 2016 17:24:39 -0800 Subject: [PATCH 1/2] [SPARK-18754][SQL] Rename recentProgresses to recentProgress --- .../spark/sql/kafka010/KafkaSourceSuite.scala | 2 +- project/MimaExcludes.scala | 2 +- python/pyspark/sql/streaming.py | 6 +++--- python/pyspark/sql/tests.py | 4 ++-- .../execution/streaming/ProgressReporter.scala | 2 +- .../apache/spark/sql/internal/SQLConf.scala | 2 +- .../spark/sql/streaming/StreamingQuery.scala | 4 ++-- .../execution/streaming/ForeachSinkSuite.scala | 4 ++-- .../sql/streaming/FileStreamSourceSuite.scala | 2 +- .../StreamingQueryListenerSuite.scala | 4 ++-- .../sql/streaming/StreamingQuerySuite.scala | 18 +++++++++--------- 11 files changed, 25 insertions(+), 25 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index 2d6ccb22ddb06..c598b646b2306 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -447,7 +447,7 @@ class KafkaSourceSuite extends KafkaSourceTest { AddKafkaData(Set(topic), 1, 2, 3), CheckAnswer(2, 3, 4), AssertOnQuery { query => - val recordsRead = query.recentProgresses.map(_.numInputRows).sum + val recordsRead = query.recentProgress.map(_.numInputRows).sum recordsRead == 3 } ) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 82d50f9891d9f..b215d8867d2f4 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -91,7 +91,7 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.sourceStatuses"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"), ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.lastProgress"), - ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.recentProgresses"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.recentProgress"), ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryManager.get"), diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index ee7a26d00df4b..6b417de4ba021 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -114,12 +114,12 @@ def status(self): @property @since(2.1) - def recentProgresses(self): + def recentProgress(self): """Returns an array of the most recent [[StreamingQueryProgress]] updates for this query. The number of progress updates retained for each stream is configured by Spark session - configuration `spark.sql.streaming.numRecentProgresses`. + configuration `spark.sql.streaming.numrecentProgress`. """ - return [json.loads(p.json()) for p in self._jsq.recentProgresses()] + return [json.loads(p.json()) for p in self._jsq.recentProgress()] @property @since(2.1) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 66a3490a640ba..50df68b14483d 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1116,11 +1116,11 @@ def test_stream_status_and_progress(self): try: q.processAllAvailable() lastProgress = q.lastProgress - recentProgresses = q.recentProgresses + recentProgress = q.recentProgress status = q.status self.assertEqual(lastProgress['name'], q.name) self.assertEqual(lastProgress['id'], q.id) - self.assertTrue(any(p == lastProgress for p in recentProgresses)) + self.assertTrue(any(p == lastProgress for p in recentProgress)) self.assertTrue( "message" in status and "isDataAvailable" in status and diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index d95f55267e142..e57a996cd5c21 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -90,7 +90,7 @@ trait ProgressReporter extends Logging { def status: StreamingQueryStatus = currentStatus /** Returns an array containing the most recent query progress updates. */ - def recentProgresses: Array[StreamingQueryProgress] = progressBuffer.synchronized { + def recentProgress: Array[StreamingQueryProgress] = progressBuffer.synchronized { progressBuffer.toArray } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 5b45df69e6791..ea5127baa88a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -617,7 +617,7 @@ object SQLConf { .createWithDefault(false) val STREAMING_PROGRESS_RETENTION = - SQLConfigBuilder("spark.sql.streaming.numRecentProgresses") + SQLConfigBuilder("spark.sql.streaming.numrecentProgress") .doc("The number of progress updates to retain for a streaming query") .intConf .createWithDefault(100) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala index 1794e75462cfd..3a06427a2cf87 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala @@ -87,11 +87,11 @@ trait StreamingQuery { /** * Returns an array of the most recent [[StreamingQueryProgress]] updates for this query. * The number of progress updates retained for each stream is configured by Spark session - * configuration `spark.sql.streaming.numRecentProgresses`. + * configuration `spark.sql.streaming.numrecentProgress`. * * @since 2.1.0 */ - def recentProgresses: Array[StreamingQueryProgress] + def recentProgress: Array[StreamingQueryProgress] /** * Returns the most recent [[StreamingQueryProgress]] update of this streaming query. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala index 4a3eeb70b1702..9137d650e906b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala @@ -263,9 +263,9 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf try { inputData.addData(10, 11, 12) query.processAllAvailable() - val recentProgress = query.recentProgresses.filter(_.numInputRows != 0).headOption + val recentProgress = query.recentProgress.filter(_.numInputRows != 0).headOption assert(recentProgress.isDefined && recentProgress.get.numInputRows === 3, - s"recentProgresses[${query.recentProgresses.toList}] doesn't contain correct metrics") + s"recentProgress[${query.recentProgress.toList}] doesn't contain correct metrics") } finally { query.stop() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index ff1f3e26f1593..7b6fe83b9a597 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1006,7 +1006,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { AddTextFileData("100", src, tmp), CheckAnswer("100"), AssertOnQuery { query => - val actualProgress = query.recentProgresses + val actualProgress = query.recentProgress .find(_.numInputRows > 0) .getOrElse(sys.error("Could not find records with data.")) assert(actualProgress.numInputRows === 1) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 1cd503c6de696..b78d1353e8dcb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -237,9 +237,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } true } - // `recentProgresses` should not receive too many no data events + // `recentProgress` should not receive too many no data events actions += AssertOnQuery { q => - q.recentProgresses.size > 1 && q.recentProgresses.size <= 11 + q.recentProgress.size > 1 && q.recentProgress.size <= 11 } testStream(input.toDS)(actions: _*) spark.sparkContext.listenerBus.waitUntilEmpty(10000) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 893cb762c6580..bc8499d3acd69 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -152,7 +152,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { ) } - testQuietly("status, lastProgress, and recentProgresses") { + testQuietly("status, lastProgress, and recentProgress") { import StreamingQuerySuite._ clock = new StreamManualClock @@ -201,7 +201,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { AssertOnQuery(_.status.isDataAvailable === false), AssertOnQuery(_.status.isTriggerActive === false), AssertOnQuery(_.status.message === "Waiting for next trigger"), - AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0), + AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0), // Test status and progress while offset is being fetched AddData(inputData, 1, 2), @@ -210,7 +210,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { AssertOnQuery(_.status.isDataAvailable === false), AssertOnQuery(_.status.isTriggerActive === true), AssertOnQuery(_.status.message.startsWith("Getting offsets from")), - AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0), + AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0), // Test status and progress while batch is being fetched AdvanceManualClock(200), // time = 300 to unblock getOffset, will block on getBatch @@ -218,14 +218,14 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { AssertOnQuery(_.status.isDataAvailable === true), AssertOnQuery(_.status.isTriggerActive === true), AssertOnQuery(_.status.message === "Processing new data"), - AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0), + AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0), // Test status and progress while batch is being processed AdvanceManualClock(300), // time = 600 to unblock getBatch, will block in Spark job AssertOnQuery(_.status.isDataAvailable === true), AssertOnQuery(_.status.isTriggerActive === true), AssertOnQuery(_.status.message === "Processing new data"), - AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0), + AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0), // Test status and progress while batch processing has completed AdvanceManualClock(500), // time = 1100 to unblock job @@ -236,8 +236,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { AssertOnQuery(_.status.message === "Waiting for next trigger"), AssertOnQuery { query => assert(query.lastProgress != null) - assert(query.recentProgresses.exists(_.numInputRows > 0)) - assert(query.recentProgresses.last.eq(query.lastProgress)) + assert(query.recentProgress.exists(_.numInputRows > 0)) + assert(query.recentProgress.last.eq(query.lastProgress)) val progress = query.lastProgress assert(progress.id === query.id) @@ -274,7 +274,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { AssertOnQuery(_.status.isTriggerActive === false), AssertOnQuery(_.status.message === "Waiting for next trigger"), AssertOnQuery { query => - assert(query.recentProgresses.last.eq(query.lastProgress)) + assert(query.recentProgress.last.eq(query.lastProgress)) assert(query.lastProgress.batchId === 1) assert(query.lastProgress.sources(0).inputRowsPerSecond === 1.818) true @@ -408,7 +408,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { try { val q = streamingDF.writeStream.format("memory").queryName("test").start() q.processAllAvailable() - q.recentProgresses.head + q.recentProgress.head } finally { spark.streams.active.map(_.stop()) } From 184a6d182b84ad297c7bbff65362a703dbbad2b1 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Tue, 6 Dec 2016 17:23:19 -0800 Subject: [PATCH 2/2] update config option too --- python/pyspark/sql/streaming.py | 2 +- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- .../scala/org/apache/spark/sql/streaming/StreamingQuery.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 6b417de4ba021..9cfb3fe25cdcc 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -117,7 +117,7 @@ def status(self): def recentProgress(self): """Returns an array of the most recent [[StreamingQueryProgress]] updates for this query. The number of progress updates retained for each stream is configured by Spark session - configuration `spark.sql.streaming.numrecentProgress`. + configuration `spark.sql.streaming.numRecentProgressUpdates`. """ return [json.loads(p.json()) for p in self._jsq.recentProgress()] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index ea5127baa88a2..91f3fe0fe9549 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -617,7 +617,7 @@ object SQLConf { .createWithDefault(false) val STREAMING_PROGRESS_RETENTION = - SQLConfigBuilder("spark.sql.streaming.numrecentProgress") + SQLConfigBuilder("spark.sql.streaming.numRecentProgressUpdates") .doc("The number of progress updates to retain for a streaming query") .intConf .createWithDefault(100) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala index 3a06427a2cf87..596bd90140cc9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala @@ -87,7 +87,7 @@ trait StreamingQuery { /** * Returns an array of the most recent [[StreamingQueryProgress]] updates for this query. * The number of progress updates retained for each stream is configured by Spark session - * configuration `spark.sql.streaming.numrecentProgress`. + * configuration `spark.sql.streaming.numRecentProgressUpdates`. * * @since 2.1.0 */