From 9da3a98d4e68697a0f4ab2b21be4b6aebeeb25e3 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 18 Jul 2018 20:50:37 +0200 Subject: [PATCH 1/2] Adding a class for Avro Options --- .../apache/spark/sql/avro/AvroOptions.scala | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala new file mode 100644 index 0000000000000..7cb5472eae6ce --- /dev/null +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.avro + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap + +/** + * Options for Avro Writer and Reader stored in case insensitive manner. + */ +class AvroOptions(@transient val parameters: CaseInsensitiveMap[String]) + extends Logging with Serializable { + + def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) +} \ No newline at end of file From 3a76ba2de635fb6f653ee1d814ed0b87bcb12fb6 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 18 Jul 2018 23:37:08 +0200 Subject: [PATCH 2/2] Gathering all options in AvroOptions --- .../spark/sql/avro/AvroFileFormat.scala | 13 +++++------ .../apache/spark/sql/avro/AvroOptions.scala | 22 +++++++++++++++++-- .../org/apache/spark/sql/avro/AvroSuite.scala | 6 +++-- 3 files changed, 30 insertions(+), 11 deletions(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index 9eb206457809c..1d0f40e1ce92a 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -58,6 +58,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { val conf = spark.sparkContext.hadoopConfiguration + val parsedOptions = new AvroOptions(options) // Schema evolution is not supported yet. Here we only pick a single random sample file to // figure out the schema of the whole dataset. @@ -76,7 +77,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { } // User can specify an optional avro json schema. - val avroSchema = options.get(AvroFileFormat.AvroSchema) + val avroSchema = parsedOptions.schema .map(new Schema.Parser().parse) .getOrElse { val in = new FsInput(sampleFile.getPath, conf) @@ -114,10 +115,9 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - val recordName = options.getOrElse("recordName", "topLevelRecord") - val recordNamespace = options.getOrElse("recordNamespace", "") + val parsedOptions = new AvroOptions(options) val outputAvroSchema = SchemaConverters.toAvroType( - dataSchema, nullable = false, recordName, recordNamespace) + dataSchema, nullable = false, parsedOptions.recordName, parsedOptions.recordNamespace) AvroJob.setOutputKeySchema(job, outputAvroSchema) val AVRO_COMPRESSION_CODEC = "spark.sql.avro.compression.codec" @@ -160,11 +160,12 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { val broadcastedConf = spark.sparkContext.broadcast(new AvroFileFormat.SerializableConfiguration(hadoopConf)) + val parsedOptions = new AvroOptions(options) (file: PartitionedFile) => { val log = LoggerFactory.getLogger(classOf[AvroFileFormat]) val conf = broadcastedConf.value.value - val userProvidedSchema = options.get(AvroFileFormat.AvroSchema).map(new Schema.Parser().parse) + val userProvidedSchema = parsedOptions.schema.map(new Schema.Parser().parse) // TODO Removes this check once `FileFormat` gets a general file filtering interface method. // Doing input file filtering is improper because we may generate empty tasks that process no @@ -235,8 +236,6 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { private[avro] object AvroFileFormat { val IgnoreFilesWithoutExtensionProperty = "avro.mapred.ignore.inputs.without.extension" - val AvroSchema = "avroSchema" - class SerializableConfiguration(@transient var value: Configuration) extends Serializable with KryoSerializable { @transient private[avro] lazy val log = LoggerFactory.getLogger(getClass) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index 7cb5472eae6ce..8721eae3481da 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -21,10 +21,28 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap /** - * Options for Avro Writer and Reader stored in case insensitive manner. + * Options for Avro Reader and Writer stored in case insensitive manner. */ class AvroOptions(@transient val parameters: CaseInsensitiveMap[String]) extends Logging with Serializable { def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) -} \ No newline at end of file + + /** + * Optional schema provided by an user in JSON format. + */ + val schema: Option[String] = parameters.get("avroSchema") + + /** + * Top level record name in write result, which is required in Avro spec. + * See https://avro.apache.org/docs/1.8.2/spec.html#schema_record . + * Default value is "topLevelRecord" + */ + val recordName: String = parameters.getOrElse("recordName", "topLevelRecord") + + /** + * Record namespace in write result. Default value is "". + * See Avro spec for details: https://avro.apache.org/docs/1.8.2/spec.html#schema_record . + */ + val recordNamespace: String = parameters.getOrElse("recordNamespace", "") +} diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 446b42124ceca..f7e9877b7744b 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -578,7 +578,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { """.stripMargin val result = spark .read - .option(AvroFileFormat.AvroSchema, avroSchema) + .option("avroSchema", avroSchema) .avro(testAvro) .collect() val expected = spark.read.avro(testAvro).select("string").collect() @@ -598,7 +598,9 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { | }] |} """.stripMargin - val result = spark.read.option(AvroFileFormat.AvroSchema, avroSchema) + val result = spark + .read + .option("avroSchema", avroSchema) .avro(testAvro).select("missingField").first assert(result === Row("foo")) }