Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.io.File
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.parquet.hadoop.ParquetOutputFormat

import org.apache.spark.SparkException
import org.apache.spark.{DebugFilesystem, SparkException}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
Expand Down Expand Up @@ -242,6 +242,39 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
}
}

/**
* this is part of test 'Enabling/disabling ignoreCorruptFiles' but run in a loop
* to increase the chance of failure
*/
ignore("SPARK-20407 ParquetQuerySuite 'Enabling/disabling ignoreCorruptFiles' flaky test") {
def testIgnoreCorruptFiles(): Unit = {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
spark.range(1).toDF("a").write.parquet(new Path(basePath, "first").toString)
spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, "second").toString)
spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString)
val df = spark.read.parquet(
new Path(basePath, "first").toString,
new Path(basePath, "second").toString,
new Path(basePath, "third").toString)
checkAnswer(
df,
Seq(Row(0), Row(1)))
}
}

for (i <- 1 to 100) {
DebugFilesystem.clearOpenStreams()
withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
val exception = intercept[SparkException] {
testIgnoreCorruptFiles()
}
assert(exception.getMessage().contains("is not a Parquet file"))
}
DebugFilesystem.assertNoOpenStreams()
}
}

test("SPARK-8990 DataFrameReader.parquet() should respect user specified options") {
withTempPath { dir =>
val basePath = dir.getCanonicalPath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ package org.apache.spark.sql.test
import java.io.File
import java.util.UUID

import scala.concurrent.duration._
import scala.language.implicitConversions
import scala.util.Try
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.Eventually

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql._
Expand All @@ -48,7 +50,7 @@ import org.apache.spark.util.{UninterruptibleThread, Utils}
* prone to leaving multiple overlapping [[org.apache.spark.SparkContext]]s in the same JVM.
*/
private[sql] trait SQLTestUtils
extends SparkFunSuite
extends SparkFunSuite with Eventually
with BeforeAndAfterAll
with SQLTestData { self =>

Expand Down Expand Up @@ -122,6 +124,15 @@ private[sql] trait SQLTestUtils
try f(path) finally Utils.deleteRecursively(path)
}

/**
* Waits for all tasks on all executors to be finished.
*/
protected def waitForTasksToFinish(): Unit = {
eventually(timeout(10.seconds)) {
assert(spark.sparkContext.statusTracker
.getExecutorInfos.map(_.numRunningTasks()).sum == 0)
}
}
/**
* Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
* returns.
Expand All @@ -130,7 +141,11 @@ private[sql] trait SQLTestUtils
*/
protected def withTempDir(f: File => Unit): Unit = {
val dir = Utils.createTempDir().getCanonicalFile
try f(dir) finally Utils.deleteRecursively(dir)
try f(dir) finally {
// wait for all tasks to finish before deleting files
waitForTasksToFinish()
Utils.deleteRecursively(dir)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.spark.sql.test

import scala.concurrent.duration._

import org.scalatest.BeforeAndAfterEach
import org.scalatest.concurrent.Eventually

import org.apache.spark.{DebugFilesystem, SparkConf}
import org.apache.spark.sql.{SparkSession, SQLContext}
Expand All @@ -26,7 +29,7 @@ import org.apache.spark.sql.{SparkSession, SQLContext}
/**
* Helper trait for SQL test suites where all tests share a single [[TestSparkSession]].
*/
trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach {
trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with Eventually {

protected val sparkConf = new SparkConf()

Expand Down Expand Up @@ -86,6 +89,10 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach {

protected override def afterEach(): Unit = {
super.afterEach()
DebugFilesystem.assertNoOpenStreams()
// files can be closed from other threads, so wait a bit
// normally this doesn't take more than 1s
eventually(timeout(10.seconds)) {
DebugFilesystem.assertNoOpenStreams()
}
}
}