Skip to content

Commit bc11dd5

Browse files
committed
Simplify and fix tests.
1 parent cc6d98a commit bc11dd5

File tree

1 file changed

+11
-18
lines changed

1 file changed

+11
-18
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -85,30 +85,23 @@ class FileScanRDD(
8585

8686
/** Advances to the next file. Returns true if a new non-empty iterator is available. */
8787
private def nextIterator(): Boolean = {
88-
if (asyncIO) {
89-
if (nextFile == null) return false
90-
} else {
91-
if (!files.hasNext) return false
92-
}
93-
94-
// Wait for the async task to complete
9588
val file = if (asyncIO) {
89+
if (nextFile == null) return false
90+
// Wait for the async task to complete
9691
Await.result(nextFile, Duration.Inf)
9792
} else {
93+
if (!files.hasNext) return false
9894
val f = files.next()
99-
val it = readFunction(f)
100-
NextFile(f, it)
95+
NextFile(f, readFunction(f))
10196
}
10297

10398
// This is only used to evaluate the rest of the execution so we can safely set it here.
10499
SqlNewHadoopRDDState.setInputFileName(file.file.filePath)
105100
currentIterator = file.iter
106101

107-
if (asyncIO && files.hasNext) {
102+
if (asyncIO) {
108103
// Asynchronously start the next file.
109104
nextFile = prepareNextFile()
110-
} else {
111-
nextFile = null
112105
}
113106

114107
hasNext
@@ -119,17 +112,17 @@ class FileScanRDD(
119112
}
120113

121114
def prepareNextFile() = {
122-
Future {
123-
if (files.hasNext) {
115+
if (files.hasNext) {
116+
Future {
124117
val file = files.next()
125118
val it = readFunction(file)
126119
// Read something from the file to trigger some initial IO.
127120
it.hasNext
128121
NextFile(file, it)
129-
} else {
130-
null
131-
}
132-
}(FileScanRDD.ioExecutionContext)
122+
}(FileScanRDD.ioExecutionContext)
123+
} else {
124+
null
125+
}
133126
}
134127
}
135128

0 commit comments

Comments
 (0)