From 35063ef8e734bdeb39316e02b2e8451d0d75d43a Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 14 Jul 2018 11:49:30 +0200 Subject: [PATCH 1/9] Test for reading files without avro extension --- external/avro/src/test/resources/episodesAvro | Bin 0 -> 597 bytes .../org/apache/spark/sql/avro/AvroSuite.scala | 6 ++++++ 2 files changed, 6 insertions(+) create mode 100644 external/avro/src/test/resources/episodesAvro diff --git a/external/avro/src/test/resources/episodesAvro b/external/avro/src/test/resources/episodesAvro new file mode 100644 index 0000000000000000000000000000000000000000..58a028ce19e6a1e964465dba8f9fbf5c84b3c3e5 GIT binary patch literal 597 zcmZ`$$w~u35Ov8x#Dj=L5s_jL1qmVRM7@a%A}%2+f)b^isW@$Vx`*!0$Pn`jp8Nt& zeudv=PZl@uSoPMfKD&P$pU7gYWL|p#h4`N7Iwpz8*>)6pQu$8K5g4X3MNCVd^l+mi z^wPBcH1bcl6SZw%Sr`PO_y|f#X$L9-g z;dAr)w);_-ea$!*mc7p@CSd|NlpVELhMh<;4y8h|knQ6Gw{;CytVP*k1x_$Y;bL~} zP%34!WeX0_W;dkQhBBN}WGK8R1;wpeZEAH#z@;EmCg2I|28{bqD#NLaM Date: Sat, 14 Jul 2018 12:38:33 +0200 Subject: [PATCH 2/9] Fix tests --- .../org/apache/spark/sql/avro/AvroFileFormat.scala | 14 +++++++++----- .../org/apache/spark/sql/avro/AvroSuite.scala | 13 ++++++++++--- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index 46e5a189c5eb3..d6fc844ed437b 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -64,7 +64,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { // Schema evolution is not supported yet. Here we only pick a single random sample file to // figure out the schema of the whole dataset. val sampleFile = - if (conf.getBoolean(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, true)) { + if (AvroFileFormat.ignoreFilesWithoutExtensions(conf)) { files.find(_.getPath.getName.endsWith(".avro")).getOrElse { throw new FileNotFoundException( "No Avro files found. Hadoop option \"avro.mapred.ignore.inputs.without.extension\" " + @@ -172,10 +172,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { // Doing input file filtering is improper because we may generate empty tasks that process no // input files but stress the scheduler. We should probably add a more general input file // filtering mechanism for `FileFormat` data sources. See SPARK-16317. - if ( - conf.getBoolean(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, true) && - !file.filePath.endsWith(".avro") - ) { + if (AvroFileFormat.ignoreFilesWithoutExtensions(conf) && !file.filePath.endsWith(".avro")) { Iterator.empty } else { val reader = { @@ -286,4 +283,11 @@ private[avro] object AvroFileFormat { value.readFields(new DataInputStream(in)) } } + + def ignoreFilesWithoutExtensions(conf: Configuration): Boolean = { + // Files without .avro extensions are not ignored by default + val defaultValue = false + + conf.getBoolean(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, defaultValue) + } } diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 7eeede775d322..6615023c3b21a 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -624,7 +624,7 @@ class AvroSuite extends SparkFunSuite { spark.read.avro("*/*/*/*/*/*/*/something.avro") } - intercept[FileNotFoundException] { + intercept[java.io.IOException] { TestUtils.withTempDir { dir => FileUtils.touch(new File(dir, "test")) spark.read.avro(dir.toString) @@ -812,7 +812,14 @@ class AvroSuite extends SparkFunSuite { } test("reading files without .avro extension") { - val df = spark.read.avro(episodesWithoutExtension) - assert(df.count == 16) + val df1 = spark.read.avro(episodesWithoutExtension) + assert(df1.count == 8) + + val schema = new StructType() + .add("title", StringType) + .add("air_date", StringType) + .add("doctor", IntegerType) + val df2 = spark.read.schema(schema).avro(episodesWithoutExtension) + assert(df2.count == 8) } } From 8562a8d43868d551efa6a0e9a00d50cdc838a178 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 14 Jul 2018 12:53:24 +0200 Subject: [PATCH 3/9] Adding ticket number to test title --- .../src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 6615023c3b21a..6b2f957a9ffc4 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -811,7 +811,7 @@ class AvroSuite extends SparkFunSuite { } } - test("reading files without .avro extension") { + test("SPARK-24805: reading files without .avro extension") { val df1 = spark.read.avro(episodesWithoutExtension) assert(df1.count == 8) From a7d078e439a8ea79260377a966dc31c2d3ae38f7 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 15 Jul 2018 00:07:09 +0200 Subject: [PATCH 4/9] Revert expected exception in the test --- .../scala/org/apache/spark/sql/avro/AvroSuite.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 6b2f957a9ffc4..89bb9261a830e 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -624,10 +624,16 @@ class AvroSuite extends SparkFunSuite { spark.read.avro("*/*/*/*/*/*/*/something.avro") } - intercept[java.io.IOException] { + intercept[FileNotFoundException] { TestUtils.withTempDir { dir => FileUtils.touch(new File(dir, "test")) - spark.read.avro(dir.toString) + try { + spark.sqlContext.sparkContext + .hadoopConfiguration.set("avro.mapred.ignore.inputs.without.extension", "true") + spark.read.avro(dir.toString) + } finally { + spark.sqlContext.sparkContext + .hadoopConfiguration.unset("avro.mapred.ignore.inputs.without.extension") } } } From 3b75c2719abc1a6295f9fd64646853b4f1928575 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 15 Jul 2018 01:06:00 +0200 Subject: [PATCH 5/9] Fix tests --- .../org/apache/spark/sql/avro/AvroSuite.scala | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 89bb9261a830e..6c64581ec414e 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -629,11 +629,13 @@ class AvroSuite extends SparkFunSuite { FileUtils.touch(new File(dir, "test")) try { spark.sqlContext.sparkContext - .hadoopConfiguration.set("avro.mapred.ignore.inputs.without.extension", "true") + .hadoopConfiguration + .set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true") spark.read.avro(dir.toString) } finally { spark.sqlContext.sparkContext - .hadoopConfiguration.unset("avro.mapred.ignore.inputs.without.extension") } + .hadoopConfiguration + .unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty) } } } @@ -695,12 +697,22 @@ class AvroSuite extends SparkFunSuite { Files.createFile(new File(tempSaveDir, "non-avro").toPath) - val newDf = spark - .read - .option(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true") - .avro(tempSaveDir) + val count = try { + spark.sqlContext.sparkContext + .hadoopConfiguration + .set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true") + val newDf = spark + .read + .avro(tempSaveDir) + newDf.count() + } finally { + spark.sqlContext.sparkContext + .hadoopConfiguration + .unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty) + -1 + } - assert(newDf.count == 8) + assert(count == 8) } } @@ -817,7 +829,7 @@ class AvroSuite extends SparkFunSuite { } } - test("SPARK-24805: reading files without .avro extension") { + test("SPARK-24805: do not ignore files without .avro extension by default") { val df1 = spark.read.avro(episodesWithoutExtension) assert(df1.count == 8) From a48770cd660bb82840851c7a9c4aca060272ea17 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 15 Jul 2018 22:19:25 +0200 Subject: [PATCH 6/9] Simplification of tests --- .../org/apache/spark/sql/avro/AvroSuite.scala | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 456b54c99bbc7..d3c15834cc145 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -619,15 +619,12 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { intercept[FileNotFoundException] { withTempPath { dir => FileUtils.touch(new File(dir, "test")) + val hadoopConf = spark.sqlContext.sparkContext.hadoopConfiguration try { - spark.sqlContext.sparkContext - .hadoopConfiguration - .set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true") + hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true") spark.read.avro(dir.toString) } finally { - spark.sqlContext.sparkContext - .hadoopConfiguration - .unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty) } + hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty) } } } @@ -689,18 +686,15 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { Files.createFile(new File(tempSaveDir, "non-avro").toPath) + val hadoopConf = spark.sqlContext.sparkContext.hadoopConfiguration val count = try { - spark.sqlContext.sparkContext - .hadoopConfiguration - .set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true") + hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true") val newDf = spark .read .avro(tempSaveDir) newDf.count() } finally { - spark.sqlContext.sparkContext - .hadoopConfiguration - .unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty) + hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty) -1 } From 91e40e7686b82aae1b34329e961a0a9eb20bec47 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 15 Jul 2018 22:42:10 +0200 Subject: [PATCH 7/9] Copying the episodes file intead of having duplicate in the resource folder. --- external/avro/src/test/resources/episodesAvro | Bin 597 -> 0 bytes .../org/apache/spark/sql/avro/AvroSuite.scala | 26 ++++++++++-------- 2 files changed, 15 insertions(+), 11 deletions(-) delete mode 100644 external/avro/src/test/resources/episodesAvro diff --git a/external/avro/src/test/resources/episodesAvro b/external/avro/src/test/resources/episodesAvro deleted file mode 100644 index 58a028ce19e6a1e964465dba8f9fbf5c84b3c3e5..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 597 zcmZ`$$w~u35Ov8x#Dj=L5s_jL1qmVRM7@a%A}%2+f)b^isW@$Vx`*!0$Pn`jp8Nt& zeudv=PZl@uSoPMfKD&P$pU7gYWL|p#h4`N7Iwpz8*>)6pQu$8K5g4X3MNCVd^l+mi z^wPBcH1bcl6SZw%Sr`PO_y|f#X$L9-g z;dAr)w);_-ea$!*mc7p@CSd|NlpVELhMh<;4y8h|knQ6Gw{;CytVP*k1x_$Y;bL~} zP%34!WeX0_W;dkQhBBN}WGK8R1;wpeZEAH#z@;EmCg2I|28{bqD#NLaM + val fileWithoutExtension = s"${dir.getCanonicalPath}/episodes" + Files.copy(Paths.get(episodesFile), Paths.get(fileWithoutExtension)) + + val df1 = spark.read.avro(fileWithoutExtension) + assert(df1.count == 8) + + val schema = new StructType() + .add("title", StringType) + .add("air_date", StringType) + .add("doctor", IntegerType) + val df2 = spark.read.schema(schema).avro(fileWithoutExtension) + assert(df2.count == 8) + } } } From 1c5325133a01d35e6637fea18046cc630c8ad097 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 16 Jul 2018 10:51:32 +0200 Subject: [PATCH 8/9] Fit test --- .../src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 7c6a93261ca3f..b90492def0a12 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -820,7 +820,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { test("SPARK-24805: do not ignore files without .avro extension by default") { withTempDir { dir => val fileWithoutExtension = s"${dir.getCanonicalPath}/episodes" - Files.copy(Paths.get(episodesFile), Paths.get(fileWithoutExtension)) + Files.copy(Paths.get(episodesAvro), Paths.get(fileWithoutExtension)) val df1 = spark.read.avro(fileWithoutExtension) assert(df1.count == 8) From 85cdf871ab31d9fada6280917a66557c98938f3c Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 16 Jul 2018 20:34:36 +0200 Subject: [PATCH 9/9] Fix test - using URI --- .../test/scala/org/apache/spark/sql/avro/AvroSuite.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index b90492def0a12..446b42124ceca 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.avro import java.io._ +import java.net.URL import java.nio.file.{Files, Path, Paths} import java.sql.{Date, Timestamp} import java.util.{TimeZone, UUID} @@ -819,9 +820,11 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { test("SPARK-24805: do not ignore files without .avro extension by default") { withTempDir { dir => - val fileWithoutExtension = s"${dir.getCanonicalPath}/episodes" - Files.copy(Paths.get(episodesAvro), Paths.get(fileWithoutExtension)) + Files.copy( + Paths.get(new URL(episodesAvro).toURI), + Paths.get(dir.getCanonicalPath, "episodes")) + val fileWithoutExtension = s"${dir.getCanonicalPath}/episodes" val df1 = spark.read.avro(fileWithoutExtension) assert(df1.count == 8)