From e1665bec0c9ce3f923312bc7d1953e9e28ed6c6f Mon Sep 17 00:00:00 2001 From: Changgyoo Park Date: Wed, 18 Sep 2024 09:59:22 +0200 Subject: [PATCH 1/2] Fix --- .../sql/connect/planner/SparkConnectServiceSuite.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala index 579fdb47aef3..1bded17b144f 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala @@ -873,8 +873,11 @@ class SparkConnectServiceSuite val listenerBus = sparkContext.listenerBus val LISTENER_BUS_TIMEOUT = 30000 def executeHolder: ExecuteHolder = { - assert(listener.executeHolder.isDefined) - listener.executeHolder.get + // An ExecuteHolder will eventually be set + Eventually.eventually(timeout(1.seconds)) { + assert(listener.executeHolder.isDefined) + listener.executeHolder.get + } } def onNext(v: proto.ExecutePlanResponse): Unit = { if (v.hasSchema) { From ff683fd1a54aa993fa678fbf2c11dbbd5dded24a Mon Sep 17 00:00:00 2001 From: Changgyoo Park Date: Wed, 18 Sep 2024 20:02:30 +0200 Subject: [PATCH 2/2] Address review feedback --- .../planner/SparkConnectServiceSuite.scala | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala index 1bded17b144f..62146f19328a 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala @@ -871,11 +871,14 @@ class SparkConnectServiceSuite class VerifyEvents(val sparkContext: SparkContext) { val listener: MockSparkListener = new MockSparkListener() val listenerBus = sparkContext.listenerBus + val EVENT_WAIT_TIMEOUT = timeout(10.seconds) val LISTENER_BUS_TIMEOUT = 30000 def executeHolder: ExecuteHolder = { - // An ExecuteHolder will eventually be set - Eventually.eventually(timeout(1.seconds)) { - assert(listener.executeHolder.isDefined) + // An ExecuteHolder shall be set eventually through MockSparkListener + Eventually.eventually(EVENT_WAIT_TIMEOUT) { + assert( + listener.executeHolder.isDefined, + s"No events have been posted in $EVENT_WAIT_TIMEOUT") listener.executeHolder.get } } @@ -894,8 +897,10 @@ class SparkConnectServiceSuite def onCompleted(producedRowCount: Option[Long] = None): Unit = { assert(executeHolder.eventsManager.getProducedRowCount == producedRowCount) // The eventsManager is closed asynchronously - Eventually.eventually(timeout(1.seconds)) { - assert(executeHolder.eventsManager.status == ExecuteStatus.Closed) + Eventually.eventually(EVENT_WAIT_TIMEOUT) { + assert( + executeHolder.eventsManager.status == ExecuteStatus.Closed, + s"Execution has not been completed in $EVENT_WAIT_TIMEOUT") } } def onCanceled(): Unit = {