Skip to content

Commit c5a31d1

Browse files
bogdanrdchvanhovell
authored andcommitted
[SPARK-20407][TESTS] ParquetQuerySuite 'Enabling/disabling ignoreCorruptFiles' flaky test
## What changes were proposed in this pull request? SharedSQLContext.afterEach now calls DebugFilesystem.assertNoOpenStreams inside eventually. SQLTestUtils withTempDir calls waitForTasksToFinish before deleting the directory. ## How was this patch tested? Added new test in ParquetQuerySuite based on the flaky test Author: Bogdan Raducanu <[email protected]> Closes #17701 from bogdanrdc/SPARK-20407.
1 parent b91873d commit c5a31d1

File tree

3 files changed

+60
-7
lines changed

3 files changed

+60
-7
lines changed

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.sql.Timestamp
2323
import org.apache.hadoop.fs.{FileSystem, Path}
2424
import org.apache.parquet.hadoop.ParquetOutputFormat
2525

26-
import org.apache.spark.SparkException
26+
import org.apache.spark.{DebugFilesystem, SparkException}
2727
import org.apache.spark.sql._
2828
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
2929
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
@@ -316,6 +316,39 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
316316
}
317317
}
318318

319+
/**
320+
* this is part of test 'Enabling/disabling ignoreCorruptFiles' but run in a loop
321+
* to increase the chance of failure
322+
*/
323+
ignore("SPARK-20407 ParquetQuerySuite 'Enabling/disabling ignoreCorruptFiles' flaky test") {
324+
def testIgnoreCorruptFiles(): Unit = {
325+
withTempDir { dir =>
326+
val basePath = dir.getCanonicalPath
327+
spark.range(1).toDF("a").write.parquet(new Path(basePath, "first").toString)
328+
spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, "second").toString)
329+
spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString)
330+
val df = spark.read.parquet(
331+
new Path(basePath, "first").toString,
332+
new Path(basePath, "second").toString,
333+
new Path(basePath, "third").toString)
334+
checkAnswer(
335+
df,
336+
Seq(Row(0), Row(1)))
337+
}
338+
}
339+
340+
for (i <- 1 to 100) {
341+
DebugFilesystem.clearOpenStreams()
342+
withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
343+
val exception = intercept[SparkException] {
344+
testIgnoreCorruptFiles()
345+
}
346+
assert(exception.getMessage().contains("is not a Parquet file"))
347+
}
348+
DebugFilesystem.assertNoOpenStreams()
349+
}
350+
}
351+
319352
test("SPARK-8990 DataFrameReader.parquet() should respect user specified options") {
320353
withTempPath { dir =>
321354
val basePath = dir.getCanonicalPath

sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@ import java.net.URI
2222
import java.nio.file.Files
2323
import java.util.{Locale, UUID}
2424

25+
import scala.concurrent.duration._
2526
import scala.language.implicitConversions
2627
import scala.util.control.NonFatal
2728

2829
import org.apache.hadoop.fs.Path
2930
import org.scalatest.BeforeAndAfterAll
31+
import org.scalatest.concurrent.Eventually
3032

3133
import org.apache.spark.SparkFunSuite
3234
import org.apache.spark.sql._
@@ -49,7 +51,7 @@ import org.apache.spark.util.{UninterruptibleThread, Utils}
4951
* prone to leaving multiple overlapping [[org.apache.spark.SparkContext]]s in the same JVM.
5052
*/
5153
private[sql] trait SQLTestUtils
52-
extends SparkFunSuite
54+
extends SparkFunSuite with Eventually
5355
with BeforeAndAfterAll
5456
with SQLTestData { self =>
5557

@@ -138,6 +140,15 @@ private[sql] trait SQLTestUtils
138140
}
139141
}
140142

143+
/**
144+
* Waits for all tasks on all executors to be finished.
145+
*/
146+
protected def waitForTasksToFinish(): Unit = {
147+
eventually(timeout(10.seconds)) {
148+
assert(spark.sparkContext.statusTracker
149+
.getExecutorInfos.map(_.numRunningTasks()).sum == 0)
150+
}
151+
}
141152
/**
142153
* Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
143154
* returns.
@@ -146,7 +157,11 @@ private[sql] trait SQLTestUtils
146157
*/
147158
protected def withTempDir(f: File => Unit): Unit = {
148159
val dir = Utils.createTempDir().getCanonicalFile
149-
try f(dir) finally Utils.deleteRecursively(dir)
160+
try f(dir) finally {
161+
// wait for all tasks to finish before deleting files
162+
waitForTasksToFinish()
163+
Utils.deleteRecursively(dir)
164+
}
150165
}
151166

152167
/**

sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,18 @@
1717

1818
package org.apache.spark.sql.test
1919

20+
import scala.concurrent.duration._
21+
2022
import org.scalatest.BeforeAndAfterEach
23+
import org.scalatest.concurrent.Eventually
2124

2225
import org.apache.spark.{DebugFilesystem, SparkConf}
2326
import org.apache.spark.sql.{SparkSession, SQLContext}
24-
import org.apache.spark.sql.internal.SQLConf
25-
2627

2728
/**
2829
* Helper trait for SQL test suites where all tests share a single [[TestSparkSession]].
2930
*/
30-
trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach {
31+
trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with Eventually {
3132

3233
protected val sparkConf = new SparkConf()
3334

@@ -84,6 +85,10 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach {
8485

8586
protected override def afterEach(): Unit = {
8687
super.afterEach()
87-
DebugFilesystem.assertNoOpenStreams()
88+
// files can be closed from other threads, so wait a bit
89+
// normally this doesn't take more than 1s
90+
eventually(timeout(10.seconds)) {
91+
DebugFilesystem.assertNoOpenStreams()
92+
}
8893
}
8994
}

0 commit comments

Comments
 (0)