Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
// Schema evolution is not supported yet. Here we only pick a single random sample file to
// figure out the schema of the whole dataset.
val sampleFile =
if (conf.getBoolean(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, true)) {
if (AvroFileFormat.ignoreFilesWithoutExtensions(conf)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried running queries. The option avro.mapred.ignore.inputs.without.extension is not set in conf. This is a bug in spark-avro.
Please read the value from options. It would be good to have a new test case with avro.mapred.ignore.inputs.without.extension as true.

Copy link
Member Author

@MaxGekk MaxGekk Jul 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The avro.mapred.ignore.inputs.without.extension is hadoop's parameter. This PR aims to change the default behavior only. I would prefer to do not convert the hadoop parameter to Avro datasource option here.

Copy link
Member Author

@MaxGekk MaxGekk Jul 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is how people use the option so far: databricks/spark-avro#71 (comment) . Probably we should discuss seperatly from this PR how we could fix the "bug" and could not break backward compatibily.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Hadoop config can be changed like:

spark
  .sqlContext
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we submit a separate PR to add a new option for AVRO? We should not rely on hadoopConf to control the behaviors of AVRO.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the PR: #21798 Please, have a look at it.

files.find(_.getPath.getName.endsWith(".avro")).getOrElse {
throw new FileNotFoundException(
"No Avro files found. Hadoop option \"avro.mapred.ignore.inputs.without.extension\" " +
Expand Down Expand Up @@ -170,10 +170,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
// Doing input file filtering is improper because we may generate empty tasks that process no
// input files but stress the scheduler. We should probably add a more general input file
// filtering mechanism for `FileFormat` data sources. See SPARK-16317.
if (
conf.getBoolean(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, true) &&
!file.filePath.endsWith(".avro")
) {
if (AvroFileFormat.ignoreFilesWithoutExtensions(conf) && !file.filePath.endsWith(".avro")) {
Iterator.empty
} else {
val reader = {
Expand Down Expand Up @@ -278,4 +275,11 @@ private[avro] object AvroFileFormat {
value.readFields(new DataInputStream(in))
}
}

def ignoreFilesWithoutExtensions(conf: Configuration): Boolean = {
// Files without .avro extensions are not ignored by default
val defaultValue = false

conf.getBoolean(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, defaultValue)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark.sql.avro

import java.io._
import java.nio.file.Files
import java.net.URL
import java.nio.file.{Files, Path, Paths}
import java.sql.{Date, Timestamp}
import java.util.{TimeZone, UUID}

Expand Down Expand Up @@ -622,7 +623,12 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
intercept[FileNotFoundException] {
withTempPath { dir =>
FileUtils.touch(new File(dir, "test"))
spark.read.avro(dir.toString)
val hadoopConf = spark.sqlContext.sparkContext.hadoopConfiguration
try {
hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true")
spark.read.avro(dir.toString)
} finally {
hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty) }
}
}

Expand Down Expand Up @@ -684,12 +690,18 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {

Files.createFile(new File(tempSaveDir, "non-avro").toPath)

val newDf = spark
.read
.option(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true")
.avro(tempSaveDir)
val hadoopConf = spark.sqlContext.sparkContext.hadoopConfiguration
val count = try {
hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true")
val newDf = spark
.read
.avro(tempSaveDir)
newDf.count()
} finally {
hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty)
}

assert(newDf.count == 8)
assert(count == 8)
}
}

Expand Down Expand Up @@ -805,4 +817,23 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
assert(readDf.collect().sameElements(writeDf.collect()))
}
}

test("SPARK-24805: do not ignore files without .avro extension by default") {
withTempDir { dir =>
Files.copy(
Paths.get(new URL(episodesAvro).toURI),
Paths.get(dir.getCanonicalPath, "episodes"))

val fileWithoutExtension = s"${dir.getCanonicalPath}/episodes"
val df1 = spark.read.avro(fileWithoutExtension)
assert(df1.count == 8)

val schema = new StructType()
.add("title", StringType)
.add("air_date", StringType)
.add("doctor", IntegerType)
val df2 = spark.read.schema(schema).avro(fileWithoutExtension)
assert(df2.count == 8)
}
}
}