From 9f7066e4c9f83476012d896b39844d9ee395bd3e Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 4 Jan 2018 15:03:45 -0800 Subject: [PATCH 01/12] fail if attempt number isn't 0 --- .../streaming/continuous/ContinuousDataSourceRDDIter.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala index d79e4bd65f56..777064fbcf61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala @@ -52,6 +52,10 @@ class ContinuousDataSourceRDD( } override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { + if (context.attemptNumber() != 0) { + throw new SparkException("Continuous processing does not support task retry") + } + val reader = split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader() val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY) From f641be015be4848747ae88265389a5be2e763d00 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 4 Jan 2018 16:54:07 -0800 Subject: [PATCH 02/12] capture task retry and convert it to global retry --- .../ContinuousDataSourceRDDIter.scala | 2 +- .../continuous/ContinuousExecution.scala | 12 +++-- .../ContinuousTaskRetryException.scala | 23 ++++++++ .../continuous/ContinuousSuite.scala | 53 +++++++++++-------- 4 files changed, 63 insertions(+), 27 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTaskRetryException.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala index 777064fbcf61..b7338dcc4a36 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala @@ -53,7 +53,7 @@ class ContinuousDataSourceRDD( override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { if (context.attemptNumber() != 0) { - throw new SparkException("Continuous processing does not support task retry") + throw new ContinuousTaskRetryException() } val reader = split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 9657b5e26d77..1794194930eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, Map => MutableMap} -import org.apache.spark.SparkEnv +import org.apache.spark.{SparkEnv, SparkException} import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -82,11 +82,13 @@ class ContinuousExecution( try { runContinuous(sparkSessionForStream) } catch { - case _: InterruptedException if state.get().equals(RECONFIGURING) => - // swallow exception and run again - state.set(ACTIVE) + // Capture task retries (transformed to an exception by ContinuousDataSourceRDDIter) and + // convert them to global retries by letting the while loop spin. + case s: SparkException + if s.getCause != null && s.getCause.getCause != null && + s.getCause.getCause.isInstanceOf[ContinuousTaskRetryException] => () } - } while (state.get() == ACTIVE) + } while (state.updateAndGet(stateUpdate) == ACTIVE) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTaskRetryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTaskRetryException.scala new file mode 100644 index 000000000000..0bf004053bbc --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTaskRetryException.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark.SparkException + +class ContinuousTaskRetryException + extends SparkException("Continuous execution does not support task retry", null) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala index 9562c10feafe..14a031194004 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala @@ -17,36 +17,16 @@ package org.apache.spark.sql.streaming.continuous -import java.io.{File, InterruptedIOException, IOException, UncheckedIOException} -import java.nio.channels.ClosedByInterruptException -import java.util.concurrent.{CountDownLatch, ExecutionException, TimeoutException, TimeUnit} - -import scala.reflect.ClassTag -import scala.util.control.ControlThrowable - -import com.google.common.util.concurrent.UncheckedExecutionException -import org.apache.commons.io.FileUtils -import org.apache.hadoop.conf.Configuration - import org.apache.spark.{SparkContext, SparkEnv} -import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} +import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, SparkListenerTaskStart} import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.plans.logical.Range -import org.apache.spark.sql.catalyst.streaming.InternalOutputModes -import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, WriteToDataSourceV2Exec} import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous._ import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2 -import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreConf, StateStoreId, StateStoreProvider} import org.apache.spark.sql.functions._ -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.StreamSourceProvider import org.apache.spark.sql.streaming.{StreamTest, Trigger} -import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.test.TestSparkSession -import org.apache.spark.sql.types._ -import org.apache.spark.util.Utils class ContinuousSuiteBase extends StreamTest { // We need more than the default local[2] to be able to schedule all partitions simultaneously. @@ -219,6 +199,37 @@ class ContinuousSuite extends ContinuousSuiteBase { StopStream) } + test("kill task") { + val df = spark.readStream + .format("rate") + .option("numPartitions", "5") + .option("rowsPerSecond", "5") + .load() + .select('value) + + // Get an arbitrary task from this query to kill. It doesn't matter which one. + var taskId: Long = -1 + val listener = new SparkListener() { + override def onTaskStart(start: SparkListenerTaskStart): Unit = { + taskId = start.taskInfo.taskId + } + } + spark.sparkContext.addSparkListener(listener) + + testStream(df, useV2Sink = true)( + StartStream(Trigger.Continuous(100)), + Execute(waitForRateSourceTriggers(_, 2)), + Execute { _ => + eventually(timeout(streamingTimeout)) { assert(taskId != -1) } + spark.sparkContext.killTaskAttempt(taskId) + }, + Execute(waitForRateSourceTriggers(_, 4)), + IncrementEpoch(), + CheckAnswerRowsContains(scala.Range(0, 20).map(Row(_)))) + + spark.sparkContext.removeSparkListener(listener) + } + test("query without test harness") { val df = spark.readStream .format("rate") From 761fd26b93f404716887ec3d75d084f9db608c91 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Wed, 10 Jan 2018 15:57:25 -0800 Subject: [PATCH 03/12] bring in state update --- .../streaming/continuous/ContinuousExecution.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 1794194930eb..150a9483fcac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.streaming.continuous import java.util.concurrent.TimeUnit +import java.util.function.UnaryOperator import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, Map => MutableMap} @@ -78,6 +79,14 @@ class ContinuousExecution( } override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = { + val stateUpdate = new UnaryOperator[State] { + override def apply(s: State) = s match { + // If we ended the query to reconfigure, reset the state to active. + case RECONFIGURING => ACTIVE + case _ => s + } + } + do { try { runContinuous(sparkSessionForStream) From 1bf613f2162ad07289d99c7ef3cbd0a7e2b73558 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Wed, 10 Jan 2018 15:59:51 -0800 Subject: [PATCH 04/12] bring in stream test sync --- .../scala/org/apache/spark/sql/streaming/StreamTest.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index d46461fa9bf6..f6ac236db876 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -473,6 +473,12 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be // after starting the query. try { currentStream.awaitInitialization(streamingTimeout.toMillis) + currentStream match { + case s: ContinuousExecution => eventually("IncrementalExecution was not created") { + s.lastExecution.executedPlan // will fail if lastExecution is null + } + case _ => + } } catch { case _: StreamingQueryException => // Ignore the exception. `StopStream` or `ExpectFailure` will catch it as well. From f1750949b12827d7c1e83e7c16c296c95285b2a5 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 11 Jan 2018 11:18:53 -0800 Subject: [PATCH 05/12] stop before check in rate stream stress --- .../org/apache/spark/sql/streaming/StreamTest.scala | 5 ++++- .../sql/streaming/continuous/ContinuousSuite.scala | 12 +++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index f6ac236db876..291030c11bc9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -641,7 +641,10 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be } case CheckAnswerRowsContains(expectedAnswer, lastOnly) => - val sparkAnswer = fetchStreamAnswer(currentStream, lastOnly) + val sparkAnswer = currentStream match { + case null => fetchStreamAnswer(lastStream, lastOnly) + case s => fetchStreamAnswer(s, lastOnly) + } QueryTest.includesRows(expectedAnswer, sparkAnswer).foreach { error => failTest(error) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala index 14a031194004..601a0d115301 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala @@ -269,13 +269,9 @@ class ContinuousStressSuite extends ContinuousSuiteBase { AwaitEpoch(0), Execute(waitForRateSourceTriggers(_, 201)), IncrementEpoch(), - Execute { query => - val data = query.sink.asInstanceOf[MemorySinkV2].allData - val vals = data.map(_.getLong(0)).toSet - assert(scala.Range(0, 25000).forall { i => - vals.contains(i) - }) - }) + StopStream, + CheckAnswerRowsContains(scala.Range(0, 25000).map(Row(_))) + ) } test("automatic epoch advancement") { @@ -291,6 +287,7 @@ class ContinuousStressSuite extends ContinuousSuiteBase { AwaitEpoch(0), Execute(waitForRateSourceTriggers(_, 201)), IncrementEpoch(), + StopStream, CheckAnswerRowsContains(scala.Range(0, 25000).map(Row(_)))) } @@ -322,6 +319,7 @@ class ContinuousStressSuite extends ContinuousSuiteBase { StopStream, StartStream(Trigger.Continuous(2012)), AwaitEpoch(50), + StopStream, CheckAnswerRowsContains(scala.Range(0, 25000).map(Row(_)))) } } From 3b19fcbc617730a38e9e241046ec6bbbb954f7ff Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 11 Jan 2018 11:42:04 -0800 Subject: [PATCH 06/12] fix merge --- .../org/apache/spark/sql/streaming/StreamTest.scala | 5 ----- .../sql/streaming/continuous/ContinuousSuite.scala | 11 ++++++++++- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 0b279bda62c9..c75247e0f6ed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -472,13 +472,8 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be currentStream.awaitInitialization(streamingTimeout.toMillis) currentStream match { case s: ContinuousExecution => eventually("IncrementalExecution was not created") { -<<<<<<< HEAD s.lastExecution.executedPlan // will fail if lastExecution is null } -======= - s.lastExecution.executedPlan // will fail if lastExecution is null - } ->>>>>>> apache/master case _ => } } catch { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala index 601a0d115301..fe6e6f4f26d8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.streaming.continuous +import java.util.UUID + import org.apache.spark.{SparkContext, SparkEnv} import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, SparkListenerTaskStart} import org.apache.spark.sql._ @@ -216,15 +218,22 @@ class ContinuousSuite extends ContinuousSuiteBase { } spark.sparkContext.addSparkListener(listener) + + var originalRunId: UUID = null testStream(df, useV2Sink = true)( StartStream(Trigger.Continuous(100)), Execute(waitForRateSourceTriggers(_, 2)), - Execute { _ => + Execute { query => + // Wait until a task is started, then kill its first attempt. eventually(timeout(streamingTimeout)) { assert(taskId != -1) } + originalRunId = query.runId spark.sparkContext.killTaskAttempt(taskId) }, Execute(waitForRateSourceTriggers(_, 4)), IncrementEpoch(), + // Rather than just restarting the task we killed, there should have been a + // ContinuousExecution restart changing the run ID. + AssertOnQuery(_.runId != originalRunId), CheckAnswerRowsContains(scala.Range(0, 20).map(Row(_)))) spark.sparkContext.removeSparkListener(listener) From ad2f20685667ccd17e670729d68cc32b2a5533f5 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 11 Jan 2018 13:19:05 -0800 Subject: [PATCH 07/12] rename test --- .../apache/spark/sql/streaming/continuous/ContinuousSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala index fe6e6f4f26d8..2f55776695b4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala @@ -201,7 +201,7 @@ class ContinuousSuite extends ContinuousSuiteBase { StopStream) } - test("kill task") { + test("task failure triggers a ContinuousExecution restart") { val df = spark.readStream .format("rate") .option("numPartitions", "5") From 54d3a2c1c07e9101e5e7f51ec7de67105b069d19 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 11 Jan 2018 14:10:41 -0800 Subject: [PATCH 08/12] stop the query rather than retry looping when task tries to retry --- .../continuous/ContinuousExecution.scala | 10 +--------- .../continuous/ContinuousSuite.scala | 20 +++++++++---------- 2 files changed, 10 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index ee85d768d7d4..45b794c70a50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -89,15 +89,7 @@ class ContinuousExecution( } do { - try { - runContinuous(sparkSessionForStream) - } catch { - // Capture task retries (transformed to an exception by ContinuousDataSourceRDDIter) and - // convert them to global retries by letting the while loop spin. - case s: SparkException - if s.getCause != null && s.getCause.getCause != null && - s.getCause.getCause.isInstanceOf[ContinuousTaskRetryException] => () - } + runContinuous(sparkSessionForStream) } while (state.updateAndGet(stateUpdate) == ACTIVE) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala index 2f55776695b4..e71817d78f33 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala @@ -201,7 +201,7 @@ class ContinuousSuite extends ContinuousSuiteBase { StopStream) } - test("task failure triggers a ContinuousExecution restart") { + test("task failure kills the query") { val df = spark.readStream .format("rate") .option("numPartitions", "5") @@ -218,23 +218,21 @@ class ContinuousSuite extends ContinuousSuiteBase { } spark.sparkContext.addSparkListener(listener) - - var originalRunId: UUID = null testStream(df, useV2Sink = true)( StartStream(Trigger.Continuous(100)), Execute(waitForRateSourceTriggers(_, 2)), Execute { query => // Wait until a task is started, then kill its first attempt. eventually(timeout(streamingTimeout)) { assert(taskId != -1) } - originalRunId = query.runId spark.sparkContext.killTaskAttempt(taskId) - }, - Execute(waitForRateSourceTriggers(_, 4)), - IncrementEpoch(), - // Rather than just restarting the task we killed, there should have been a - // ContinuousExecution restart changing the run ID. - AssertOnQuery(_.runId != originalRunId), - CheckAnswerRowsContains(scala.Range(0, 20).map(Row(_)))) + eventually(timeout(streamingTimeout)) { + assert(query.exception.isDefined) + } + assert( + query.exception.get.getCause != null && + query.exception.get.getCause.getCause != null && + query.exception.get.getCause.getCause.getCause.isInstanceOf[ContinuousTaskRetryException]) + }) spark.sparkContext.removeSparkListener(listener) } From b40c8f0fa1f8986f22a140eb58f316d12f0457c1 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 11 Jan 2018 15:10:57 -0800 Subject: [PATCH 09/12] use finally --- .../continuous/ContinuousSuite.scala | 40 ++++++++++--------- 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala index e71817d78f33..af88d9591b56 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala @@ -217,24 +217,28 @@ class ContinuousSuite extends ContinuousSuiteBase { } } spark.sparkContext.addSparkListener(listener) - - testStream(df, useV2Sink = true)( - StartStream(Trigger.Continuous(100)), - Execute(waitForRateSourceTriggers(_, 2)), - Execute { query => - // Wait until a task is started, then kill its first attempt. - eventually(timeout(streamingTimeout)) { assert(taskId != -1) } - spark.sparkContext.killTaskAttempt(taskId) - eventually(timeout(streamingTimeout)) { - assert(query.exception.isDefined) - } - assert( - query.exception.get.getCause != null && - query.exception.get.getCause.getCause != null && - query.exception.get.getCause.getCause.getCause.isInstanceOf[ContinuousTaskRetryException]) - }) - - spark.sparkContext.removeSparkListener(listener) + try { + testStream(df, useV2Sink = true)( + StartStream(Trigger.Continuous(100)), + Execute(waitForRateSourceTriggers(_, 2)), + Execute { query => + // Wait until a task is started, then kill its first attempt. + eventually(timeout(streamingTimeout)) { + assert(taskId != -1) + } + spark.sparkContext.killTaskAttempt(taskId) + eventually(timeout(streamingTimeout)) { + assert(query.exception.isDefined) + } + assert( + query.exception.get.getCause != null && + query.exception.get.getCause.getCause != null && + query.exception.get.getCause.getCause.getCause + .isInstanceOf[ContinuousTaskRetryException]) + }) + } finally { + spark.sparkContext.removeSparkListener(listener) + } } test("query without test harness") { From b5d621b70416199311a69f4b3b9fbd6c03acf6df Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 11 Jan 2018 15:20:33 -0800 Subject: [PATCH 10/12] use ExpectFailure --- .../streaming/continuous/ContinuousSuite.scala | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala index af88d9591b56..4b4ed82dc652 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming.continuous import java.util.UUID -import org.apache.spark.{SparkContext, SparkEnv} +import org.apache.spark.{SparkContext, SparkEnv, SparkException} import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, SparkListenerTaskStart} import org.apache.spark.sql._ import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, WriteToDataSourceV2Exec} @@ -221,20 +221,15 @@ class ContinuousSuite extends ContinuousSuiteBase { testStream(df, useV2Sink = true)( StartStream(Trigger.Continuous(100)), Execute(waitForRateSourceTriggers(_, 2)), - Execute { query => + Execute { _ => // Wait until a task is started, then kill its first attempt. eventually(timeout(streamingTimeout)) { assert(taskId != -1) } spark.sparkContext.killTaskAttempt(taskId) - eventually(timeout(streamingTimeout)) { - assert(query.exception.isDefined) - } - assert( - query.exception.get.getCause != null && - query.exception.get.getCause.getCause != null && - query.exception.get.getCause.getCause.getCause - .isInstanceOf[ContinuousTaskRetryException]) + }, + ExpectFailure[SparkException] { e => + e.getCause != null && e.getCause.getCause.isInstanceOf[ContinuousTaskRetryException] }) } finally { spark.sparkContext.removeSparkListener(listener) From cea2ddc255e92aec654a853a699957be7f462edd Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 11 Jan 2018 15:42:05 -0800 Subject: [PATCH 11/12] add docs --- .../streaming/continuous/ContinuousDataSourceRDDIter.scala | 1 + .../streaming/continuous/ContinuousTaskRetryException.scala | 3 +++ 2 files changed, 4 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala index f34047964862..67b134020639 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala @@ -52,6 +52,7 @@ class ContinuousDataSourceRDD( } override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { + // If attempt number isn't 0, this is a task retry, which we don't support. if (context.attemptNumber() != 0) { throw new ContinuousTaskRetryException() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTaskRetryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTaskRetryException.scala index 0bf004053bbc..e0a6f6dd50bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTaskRetryException.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTaskRetryException.scala @@ -19,5 +19,8 @@ package org.apache.spark.sql.execution.streaming.continuous import org.apache.spark.SparkException +/** + * An exception thrown when a continuous processing task runs with a nonzero attempt ID. + */ class ContinuousTaskRetryException extends SparkException("Continuous execution does not support task retry", null) From 49f1eb63840b80984542553ff1037f61e7149a83 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 11 Jan 2018 17:30:57 -0800 Subject: [PATCH 12/12] fix sync --- .../apache/spark/sql/kafka010/KafkaSourceSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index d66908f86ccc..33c76a3e49d9 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -742,14 +742,14 @@ class KafkaSourceSuiteBase extends KafkaSourceTest { val query = kafka .writeStream .format("memory") - .outputMode("append") .queryName("kafkaColumnTypes") .trigger(defaultTrigger) .start() - query.processAllAvailable() - val rows = spark.table("kafkaColumnTypes").collect() - assert(rows.length === 1, s"Unexpected results: ${rows.toList}") - val row = rows(0) + eventually(timeout(streamingTimeout)) { + assert(spark.table("kafkaColumnTypes").count == 1, + s"Unexpected results: ${spark.table("kafkaColumnTypes").collectAsList()}") + } + val row = spark.table("kafkaColumnTypes").head() assert(row.getAs[Array[Byte]]("key") === null, s"Unexpected results: $row") assert(row.getAs[Array[Byte]]("value") === "1".getBytes(UTF_8), s"Unexpected results: $row") assert(row.getAs[String]("topic") === topic, s"Unexpected results: $row")