Skip to content

Commit 6f48fc3

Browse files
committed
merged
1 parent 66e7a8f commit 6f48fc3

File tree

3 files changed

+60
-5
lines changed

3 files changed

+60
-5
lines changed

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.io.File
2222
import org.apache.hadoop.fs.{FileSystem, Path}
2323
import org.apache.parquet.hadoop.ParquetOutputFormat
2424

25-
import org.apache.spark.SparkException
25+
import org.apache.spark.{DebugFilesystem, SparkException}
2626
import org.apache.spark.sql._
2727
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
2828
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
@@ -242,6 +242,39 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
242242
}
243243
}
244244

245+
/**
246+
* this is part of test 'Enabling/disabling ignoreCorruptFiles' but run in a loop
247+
* to increase the chance of failure
248+
*/
249+
ignore("SPARK-20407 ParquetQuerySuite 'Enabling/disabling ignoreCorruptFiles' flaky test") {
250+
def testIgnoreCorruptFiles(): Unit = {
251+
withTempDir { dir =>
252+
val basePath = dir.getCanonicalPath
253+
spark.range(1).toDF("a").write.parquet(new Path(basePath, "first").toString)
254+
spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, "second").toString)
255+
spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString)
256+
val df = spark.read.parquet(
257+
new Path(basePath, "first").toString,
258+
new Path(basePath, "second").toString,
259+
new Path(basePath, "third").toString)
260+
checkAnswer(
261+
df,
262+
Seq(Row(0), Row(1)))
263+
}
264+
}
265+
266+
for (i <- 1 to 100) {
267+
DebugFilesystem.clearOpenStreams()
268+
withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
269+
val exception = intercept[SparkException] {
270+
testIgnoreCorruptFiles()
271+
}
272+
assert(exception.getMessage().contains("is not a Parquet file"))
273+
}
274+
DebugFilesystem.assertNoOpenStreams()
275+
}
276+
}
277+
245278
test("SPARK-8990 DataFrameReader.parquet() should respect user specified options") {
246279
withTempPath { dir =>
247280
val basePath = dir.getCanonicalPath

sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@ package org.apache.spark.sql.test
2020
import java.io.File
2121
import java.util.UUID
2222

23+
import scala.concurrent.duration._
2324
import scala.language.implicitConversions
2425
import scala.util.Try
2526
import scala.util.control.NonFatal
2627

2728
import org.apache.hadoop.conf.Configuration
2829
import org.scalatest.BeforeAndAfterAll
30+
import org.scalatest.concurrent.Eventually
2931

3032
import org.apache.spark.SparkFunSuite
3133
import org.apache.spark.sql._
@@ -48,7 +50,7 @@ import org.apache.spark.util.{UninterruptibleThread, Utils}
4850
* prone to leaving multiple overlapping [[org.apache.spark.SparkContext]]s in the same JVM.
4951
*/
5052
private[sql] trait SQLTestUtils
51-
extends SparkFunSuite
53+
extends SparkFunSuite with Eventually
5254
with BeforeAndAfterAll
5355
with SQLTestData { self =>
5456

@@ -122,6 +124,15 @@ private[sql] trait SQLTestUtils
122124
try f(path) finally Utils.deleteRecursively(path)
123125
}
124126

127+
/**
128+
* Waits for all tasks on all executors to be finished.
129+
*/
130+
protected def waitForTasksToFinish(): Unit = {
131+
eventually(timeout(10.seconds)) {
132+
assert(spark.sparkContext.statusTracker
133+
.getExecutorInfos.map(_.numRunningTasks()).sum == 0)
134+
}
135+
}
125136
/**
126137
* Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
127138
* returns.
@@ -130,7 +141,11 @@ private[sql] trait SQLTestUtils
130141
*/
131142
protected def withTempDir(f: File => Unit): Unit = {
132143
val dir = Utils.createTempDir().getCanonicalFile
133-
try f(dir) finally Utils.deleteRecursively(dir)
144+
try f(dir) finally {
145+
// wait for all tasks to finish before deleting files
146+
waitForTasksToFinish()
147+
Utils.deleteRecursively(dir)
148+
}
134149
}
135150

136151
/**

sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717

1818
package org.apache.spark.sql.test
1919

20+
import scala.concurrent.duration._
21+
2022
import org.scalatest.BeforeAndAfterEach
23+
import org.scalatest.concurrent.Eventually
2124

2225
import org.apache.spark.{DebugFilesystem, SparkConf}
2326
import org.apache.spark.sql.{SparkSession, SQLContext}
@@ -26,7 +29,7 @@ import org.apache.spark.sql.{SparkSession, SQLContext}
2629
/**
2730
* Helper trait for SQL test suites where all tests share a single [[TestSparkSession]].
2831
*/
29-
trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach {
32+
trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with Eventually {
3033

3134
protected val sparkConf = new SparkConf()
3235

@@ -86,6 +89,10 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach {
8689

8790
protected override def afterEach(): Unit = {
8891
super.afterEach()
89-
DebugFilesystem.assertNoOpenStreams()
92+
// files can be closed from other threads, so wait a bit
93+
// normally this doesn't take more than 1s
94+
eventually(timeout(10.seconds)) {
95+
DebugFilesystem.assertNoOpenStreams()
96+
}
9097
}
9198
}

0 commit comments

Comments
 (0)