Skip to content

Commit 72cf1d1

Browse files
committed
reverted mistake commit
1 parent 03a4281 commit 72cf1d1

File tree

3 files changed

+7
-60
lines changed

3 files changed

+7
-60
lines changed

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala

Lines changed: 1 addition & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.sql.Timestamp
2323
import org.apache.hadoop.fs.{FileSystem, Path}
2424
import org.apache.parquet.hadoop.ParquetOutputFormat
2525

26-
import org.apache.spark.{DebugFilesystem, SparkException}
26+
import org.apache.spark.SparkException
2727
import org.apache.spark.sql._
2828
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
2929
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
@@ -316,39 +316,6 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
316316
}
317317
}
318318

319-
/**
320-
* this is part of test 'Enabling/disabling ignoreCorruptFiles' but run in a loop
321-
* to increase the chance of failure
322-
*/
323-
test("SPARK-20407 ParquetQuerySuite 'Enabling/disabling ignoreCorruptFiles' flaky test") {
324-
def testIgnoreCorruptFiles(): Unit = {
325-
withTempDir { dir =>
326-
val basePath = dir.getCanonicalPath
327-
spark.range(1).toDF("a").write.parquet(new Path(basePath, "first").toString)
328-
spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, "second").toString)
329-
spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString)
330-
val df = spark.read.parquet(
331-
new Path(basePath, "first").toString,
332-
new Path(basePath, "second").toString,
333-
new Path(basePath, "third").toString)
334-
checkAnswer(
335-
df,
336-
Seq(Row(0), Row(1)))
337-
}
338-
}
339-
340-
for (i <- 1 to 100) {
341-
DebugFilesystem.clearOpenStreams()
342-
withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
343-
val exception = intercept[SparkException] {
344-
testIgnoreCorruptFiles()
345-
}
346-
assert(exception.getMessage().contains("is not a Parquet file"))
347-
}
348-
DebugFilesystem.assertNoOpenStreams()
349-
}
350-
}
351-
352319
test("SPARK-8990 DataFrameReader.parquet() should respect user specified options") {
353320
withTempPath { dir =>
354321
val basePath = dir.getCanonicalPath

sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,11 @@ import java.net.URI
2222
import java.nio.file.Files
2323
import java.util.UUID
2424

25-
import scala.concurrent.duration._
2625
import scala.language.implicitConversions
2726
import scala.util.control.NonFatal
2827

2928
import org.apache.hadoop.fs.Path
3029
import org.scalatest.BeforeAndAfterAll
31-
import org.scalatest.concurrent.Eventually
3230

3331
import org.apache.spark.SparkFunSuite
3432
import org.apache.spark.sql._
@@ -51,7 +49,7 @@ import org.apache.spark.util.{UninterruptibleThread, Utils}
5149
* prone to leaving multiple overlapping [[org.apache.spark.SparkContext]]s in the same JVM.
5250
*/
5351
private[sql] trait SQLTestUtils
54-
extends SparkFunSuite with Eventually
52+
extends SparkFunSuite
5553
with BeforeAndAfterAll
5654
with SQLTestData { self =>
5755

@@ -140,15 +138,6 @@ private[sql] trait SQLTestUtils
140138
}
141139
}
142140

143-
/**
144-
* Waits for all tasks on all executors to be finished.
145-
*/
146-
protected def waitForTasksToFinish(): Unit = {
147-
eventually(timeout(10.seconds)) {
148-
assert(spark.sparkContext.statusTracker
149-
.getExecutorInfos.map(_.numRunningTasks()).sum == 0)
150-
}
151-
}
152141
/**
153142
* Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
154143
* returns.
@@ -157,11 +146,7 @@ private[sql] trait SQLTestUtils
157146
*/
158147
protected def withTempDir(f: File => Unit): Unit = {
159148
val dir = Utils.createTempDir().getCanonicalFile
160-
try f(dir) finally {
161-
// wait for all tasks to finish before deleting files
162-
waitForTasksToFinish()
163-
Utils.deleteRecursively(dir)
164-
}
149+
try f(dir) finally Utils.deleteRecursively(dir)
165150
}
166151

167152
/**

sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,17 @@
1717

1818
package org.apache.spark.sql.test
1919

20-
import scala.concurrent.duration._
21-
2220
import org.scalatest.BeforeAndAfterEach
23-
import org.scalatest.concurrent.Eventually
2421

2522
import org.apache.spark.{DebugFilesystem, SparkConf}
2623
import org.apache.spark.sql.{SparkSession, SQLContext}
24+
import org.apache.spark.sql.internal.SQLConf
25+
2726

2827
/**
2928
* Helper trait for SQL test suites where all tests share a single [[TestSparkSession]].
3029
*/
31-
trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with Eventually {
30+
trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach {
3231

3332
protected val sparkConf = new SparkConf()
3433

@@ -85,10 +84,6 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with Eventua
8584

8685
protected override def afterEach(): Unit = {
8786
super.afterEach()
88-
// files can be closed from other threads, so wait a bit
89-
// normally this doesn't take more than 1s
90-
eventually(timeout(10.seconds)) {
91-
DebugFilesystem.assertNoOpenStreams()
92-
}
87+
DebugFilesystem.assertNoOpenStreams()
9388
}
9489
}

0 commit comments

Comments
 (0)