Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,19 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
val conf = spark.sparkContext.hadoopConfiguration
val parsedOptions = new AvroOptions(options)
val parsedOptions = new AvroOptions(options, conf)

// Schema evolution is not supported yet. Here we only pick a single random sample file to
// figure out the schema of the whole dataset.
val sampleFile =
if (AvroFileFormat.ignoreFilesWithoutExtensions(conf)) {
files.find(_.getPath.getName.endsWith(".avro")).getOrElse {
throw new FileNotFoundException(
"No Avro files found. Hadoop option \"avro.mapred.ignore.inputs.without.extension\" " +
" is set to true. Do all input files have \".avro\" extension?"
)
if (parsedOptions.ignoreExtension) {
files.headOption.getOrElse {
throw new FileNotFoundException("Files for schema inferring have been not found.")
}
} else {
files.headOption.getOrElse {
throw new FileNotFoundException("No Avro files found.")
files.find(_.getPath.getName.endsWith(".avro")).getOrElse {
throw new FileNotFoundException(
"No Avro files found. If files don't have .avro extension, set ignoreExtension to true")
}
}

Expand Down Expand Up @@ -115,7 +113,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
val parsedOptions = new AvroOptions(options)
val parsedOptions = new AvroOptions(options, spark.sessionState.newHadoopConf())
val outputAvroSchema = SchemaConverters.toAvroType(
dataSchema, nullable = false, parsedOptions.recordName, parsedOptions.recordNamespace)

Expand Down Expand Up @@ -160,7 +158,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {

val broadcastedConf =
spark.sparkContext.broadcast(new AvroFileFormat.SerializableConfiguration(hadoopConf))
val parsedOptions = new AvroOptions(options)
val parsedOptions = new AvroOptions(options, hadoopConf)

(file: PartitionedFile) => {
val log = LoggerFactory.getLogger(classOf[AvroFileFormat])
Expand All @@ -171,9 +169,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
// Doing input file filtering is improper because we may generate empty tasks that process no
// input files but stress the scheduler. We should probably add a more general input file
// filtering mechanism for `FileFormat` data sources. See SPARK-16317.
if (AvroFileFormat.ignoreFilesWithoutExtensions(conf) && !file.filePath.endsWith(".avro")) {
Iterator.empty
} else {
if (parsedOptions.ignoreExtension || file.filePath.endsWith(".avro")) {
val reader = {
val in = new FsInput(new Path(new URI(file.filePath)), conf)
try {
Expand Down Expand Up @@ -228,6 +224,8 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
deserializer.deserialize(record).asInstanceOf[InternalRow]
}
}
} else {
Iterator.empty
}
}
}
Expand Down Expand Up @@ -274,11 +272,4 @@ private[avro] object AvroFileFormat {
value.readFields(new DataInputStream(in))
}
}

def ignoreFilesWithoutExtensions(conf: Configuration): Boolean = {
// Files without .avro extensions are not ignored by default
val defaultValue = false

conf.getBoolean(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, defaultValue)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@

package org.apache.spark.sql.avro

import org.apache.hadoop.conf.Configuration

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap

/**
* Options for Avro Reader and Writer stored in case insensitive manner.
*/
class AvroOptions(@transient val parameters: CaseInsensitiveMap[String])
extends Logging with Serializable {
class AvroOptions(
@transient val parameters: CaseInsensitiveMap[String],
@transient val conf: Configuration) extends Logging with Serializable {

def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
def this(parameters: Map[String, String], conf: Configuration) = {
this(CaseInsensitiveMap(parameters), conf)
}

/**
* Optional schema provided by an user in JSON format.
Expand All @@ -45,4 +50,22 @@ class AvroOptions(@transient val parameters: CaseInsensitiveMap[String])
* See Avro spec for details: https://avro.apache.org/docs/1.8.2/spec.html#schema_record .
*/
val recordNamespace: String = parameters.getOrElse("recordNamespace", "")

/**
* The `ignoreExtension` option controls ignoring of files without `.avro` extensions in read.
* If the option is enabled, all files (with and without `.avro` extension) are loaded.
* If the option is not set, the Hadoop's config `avro.mapred.ignore.inputs.without.extension`
* is taken into account. If the former one is not set too, file extensions are ignored.
*/
val ignoreExtension: Boolean = {
val ignoreFilesWithoutExtensionByDefault = false
val ignoreFilesWithoutExtension = conf.getBoolean(
AvroFileFormat.IgnoreFilesWithoutExtensionProperty,
ignoreFilesWithoutExtensionByDefault)

parameters
.get("ignoreExtension")
.map(_.toBoolean)
.getOrElse(!ignoreFilesWithoutExtension)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -630,10 +630,21 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true")
spark.read.avro(dir.toString)
} finally {
hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty) }
hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty)
}
}
}

intercept[FileNotFoundException] {
withTempPath { dir =>
FileUtils.touch(new File(dir, "test"))

spark
.read
.option("ignoreExtension", false)
.avro(dir.toString)
}
}
}

test("SQL test insert overwrite") {
Expand Down Expand Up @@ -702,7 +713,6 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
} finally {
hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty)
}

assert(count == 8)
}
}
Expand Down Expand Up @@ -838,4 +848,45 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
assert(df2.count == 8)
}
}

test("SPARK-24836: checking the ignoreExtension option") {
withTempPath { tempDir =>
val df = spark.read.avro(episodesAvro)
assert(df.count == 8)

val tempSaveDir = s"$tempDir/save/"
df.write.avro(tempSaveDir)

Files.createFile(new File(tempSaveDir, "non-avro").toPath)

val newDf = spark
.read
.option("ignoreExtension", false)
.avro(tempSaveDir)

assert(newDf.count == 8)
}
}

test("SPARK-24836: ignoreExtension must override hadoop's config") {
withTempDir { dir =>
Files.copy(
Paths.get(new URL(episodesAvro).toURI),
Paths.get(dir.getCanonicalPath, "episodes"))

val hadoopConf = spark.sqlContext.sparkContext.hadoopConfiguration
val count = try {
hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true")
val newDf = spark
.read
.option("ignoreExtension", "true")
.avro(s"${dir.getCanonicalPath}/episodes")
newDf.count()
} finally {
hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty)
}

assert(count == 8)
}
}
}