Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ case class FileSourceScanExec(
logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
val filesGroupedToBuckets =
selectedPartitions.flatMap { p =>
p.files.map { f =>
p.files.filter(_.getLen > 0).map { f =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do the filtering inside the map?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test case for this line?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean changing filter...map... to flatMap? I don't have a strong preference about it.

The updated test cases and the new test case are for this change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally prefer filter + map as it's shorter and clearer. I don't know if one is faster; two transformations vs having to return Some/None. For a Dataset operation I'd favor one operation, but this is just local Scala code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's non-critical path in terms of performance. Should be okay.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This createBucketedReadRDD is for the bucket table, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, and the same change is also in createNonBucketedReadRDD

val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen)
PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen, hosts)
}
Expand Down Expand Up @@ -418,7 +418,7 @@ case class FileSourceScanExec(
s"open cost is considered as scanning $openCostInBytes bytes.")

val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
partition.files.filter(_.getLen > 0).flatMap { file =>
val blockLocations = getBlockLocations(file)
if (fsRelation.fileFormat.isSplitable(
fsRelation.sparkSession, fsRelation.options, file.getPath)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1842,7 +1842,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val path = dir.getCanonicalPath
primitiveFieldAndType
.toDF("value")
.repartition(1)
.write
.text(path)

Expand Down Expand Up @@ -1910,7 +1909,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
F.count($"dummy").as("valid"),
F.count($"_corrupt_record").as("corrupt"),
F.count("*").as("count"))
checkAnswer(counts, Row(1, 4, 6)) // null row for empty file
checkAnswer(counts, Row(1, 4, 6))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.spark.sql.sources

import java.io.File
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}

import org.scalatest.BeforeAndAfter

Expand Down Expand Up @@ -142,4 +144,15 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA
assert(e.contains(s"Partition column `$unknown` not found in schema $schemaCatalog"))
}
}

test("skip empty files in non bucketed read") {
withTempDir { dir =>
val path = dir.getCanonicalPath
Files.write(Paths.get(path, "empty"), Array.empty[Byte])
Files.write(Paths.get(path, "notEmpty"), "a".getBytes(StandardCharsets.UTF_8))
val readback = spark.read.option("wholetext", true).text(path)

assert(readback.rdd.getNumPartitions === 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this test fail without your change? IIUC one partition can read multiple files. Is JSON the only data source that may return a row for empty file?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this test fail without your change?

Yes, it does due to the wholetext.

Is JSON the only data source that may return a row for empty file?

We depend on underlying parser here. I will check CSV and Text.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean wholetext mode will force to create one partition per file?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, wholetext makes files not splittable:

super.isSplitable(sparkSession, options, path) && !textOptions.wholeText

This can guarantee ( in text datasources at least) one file -> one partition.

IIUC one partition can read multiple files.

Do you mean this code?

if (currentSize + file.length > maxSplitBytes) {
closePartition()
}
// Add the given file to the current partition.
currentSize += file.length + openCostInBytes
currentFiles += file

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for pointing it out, I think we are good here.

}
}
}