diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala index 579fdb47aef3..62146f19328a 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala @@ -871,10 +871,16 @@ class SparkConnectServiceSuite class VerifyEvents(val sparkContext: SparkContext) { val listener: MockSparkListener = new MockSparkListener() val listenerBus = sparkContext.listenerBus + val EVENT_WAIT_TIMEOUT = timeout(10.seconds) val LISTENER_BUS_TIMEOUT = 30000 def executeHolder: ExecuteHolder = { - assert(listener.executeHolder.isDefined) - listener.executeHolder.get + // An ExecuteHolder shall be set eventually through MockSparkListener + Eventually.eventually(EVENT_WAIT_TIMEOUT) { + assert( + listener.executeHolder.isDefined, + s"No events have been posted in $EVENT_WAIT_TIMEOUT") + listener.executeHolder.get + } } def onNext(v: proto.ExecutePlanResponse): Unit = { if (v.hasSchema) { @@ -891,8 +897,10 @@ class SparkConnectServiceSuite def onCompleted(producedRowCount: Option[Long] = None): Unit = { assert(executeHolder.eventsManager.getProducedRowCount == producedRowCount) // The eventsManager is closed asynchronously - Eventually.eventually(timeout(1.seconds)) { - assert(executeHolder.eventsManager.status == ExecuteStatus.Closed) + Eventually.eventually(EVENT_WAIT_TIMEOUT) { + assert( + executeHolder.eventsManager.status == ExecuteStatus.Closed, + s"Execution has not been completed in $EVENT_WAIT_TIMEOUT") } } def onCanceled(): Unit = {