From 62b4b6d5789175f4d38c63d68b8c9f7f141ac17b Mon Sep 17 00:00:00 2001 From: rotems Date: Wed, 2 Dec 2015 00:33:01 +0200 Subject: [PATCH 1/8] SPARK-12080: Kryo - Support multiple user registrators --- .../scala/org/apache/spark/serializer/KryoSerializer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index d5ba690ed04be..06d5bde1e3fce 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -70,7 +70,7 @@ class KryoSerializer(conf: SparkConf) private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true) private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false) - private val userRegistrator = conf.getOption("spark.kryo.registrator") + private val userRegistrators = conf.get("spark.kryo.registrator", "").split(",").filter(!_.isEmpty) private val classesToRegister = conf.get("spark.kryo.classesToRegister", "") .split(',') .filter(!_.isEmpty) @@ -119,7 +119,7 @@ class KryoSerializer(conf: SparkConf) classesToRegister .foreach { className => kryo.register(Class.forName(className, true, classLoader)) } // Allow the user to register their own classes by setting spark.kryo.registrator. - userRegistrator + userRegistrators .map(Class.forName(_, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]) .foreach { reg => reg.registerClasses(kryo) } // scalastyle:on classforname From 6924a86e18814f5d7afe3188ed224bf894f6a03f Mon Sep 17 00:00:00 2001 From: rotems Date: Wed, 2 Dec 2015 10:27:37 +0200 Subject: [PATCH 2/8] SPARK-12080: Kryo - Support multiple user registrators. Updated configuration documentation. --- docs/configuration.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 741d6b2b37a87..75fc8560ae050 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -647,10 +647,10 @@ Apart from these, the following properties are also available, and may be useful spark.kryo.registrator (none) - If you use Kryo serialization, set this class to register your custom classes with Kryo. This + If you use Kryo serialization, give a comma-separated list of classes that register your custom classes with Kryo. This property is useful if you need to register your classes in a custom way, e.g. to specify a custom field serializer. Otherwise spark.kryo.classesToRegister is simpler. It should be - set to a class that extends + set to classes that extends KryoRegistrator. See the tuning guide for more details. From ca00a134eb2e3803fcd1e032be505ef8c0735ed1 Mon Sep 17 00:00:00 2001 From: rotems Date: Wed, 2 Dec 2015 11:09:00 +0200 Subject: [PATCH 3/8] SPARK-12080: Kryo - Support multiple user registrators. Changing String to Char --- .../main/scala/org/apache/spark/serializer/KryoSerializer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 06d5bde1e3fce..db1d19abcb75c 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -70,7 +70,7 @@ class KryoSerializer(conf: SparkConf) private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true) private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false) - private val userRegistrators = conf.get("spark.kryo.registrator", "").split(",").filter(!_.isEmpty) + private val userRegistrators = conf.get("spark.kryo.registrator", "").split(',').filter(!_.isEmpty) private val classesToRegister = conf.get("spark.kryo.classesToRegister", "") .split(',') .filter(!_.isEmpty) From 29e519a36b9ed0a664cd71bcc561d088f25ba2c0 Mon Sep 17 00:00:00 2001 From: rotems Date: Wed, 2 Dec 2015 11:32:32 +0200 Subject: [PATCH 4/8] SPARK-12080: Kryo - Support multiple user registrators. Improving style --- .../scala/org/apache/spark/serializer/KryoSerializer.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index db1d19abcb75c..7b77f78ce6f1a 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -70,7 +70,9 @@ class KryoSerializer(conf: SparkConf) private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true) private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false) - private val userRegistrators = conf.get("spark.kryo.registrator", "").split(',').filter(!_.isEmpty) + private val userRegistrators = conf.get("spark.kryo.registrator", "") + .split(',') + .filter(!_.isEmpty) private val classesToRegister = conf.get("spark.kryo.classesToRegister", "") .split(',') .filter(!_.isEmpty) From 6a4cb9bcd59e5f7f229f908406004e2d859552fd Mon Sep 17 00:00:00 2001 From: rotems Date: Fri, 4 Dec 2015 01:50:02 +0200 Subject: [PATCH 5/8] SPARK-12080: Kryo - Support multiple user registrators. Doc typo --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 75fc8560ae050..7b9235361bcc6 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -650,7 +650,7 @@ Apart from these, the following properties are also available, and may be useful If you use Kryo serialization, give a comma-separated list of classes that register your custom classes with Kryo. This property is useful if you need to register your classes in a custom way, e.g. to specify a custom field serializer. Otherwise spark.kryo.classesToRegister is simpler. It should be - set to classes that extends + set to classes that extend KryoRegistrator. See the tuning guide for more details. From 1e9f50fce8beb39ddfd1a07d3e82dd097f095ea4 Mon Sep 17 00:00:00 2001 From: rotems Date: Tue, 8 Dec 2015 09:24:02 +0200 Subject: [PATCH 6/8] SPARK-12197: Kryo - Support Avro SchemaRepo --- .../spark/serializer/KryoSerializer.scala | 21 ++-- .../{ => avro}/GenericAvroSerializer.scala | 104 ++++++++++----- .../spark/serializer/avro/SchemaRepo.scala | 86 +++++++++++++ .../GenericAvroSerializerSuite.scala | 119 ++++++++++++++++-- docs/configuration.md | 10 ++ 5 files changed, 296 insertions(+), 44 deletions(-) rename core/src/main/scala/org/apache/spark/serializer/{ => avro}/GenericAvroSerializer.scala (69%) create mode 100644 core/src/main/scala/org/apache/spark/serializer/avro/SchemaRepo.scala diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 7b77f78ce6f1a..35d2d552eabff 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -21,25 +21,25 @@ import java.io.{DataInput, DataOutput, EOFException, IOException, InputStream, O import java.nio.ByteBuffer import javax.annotation.Nullable -import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer -import scala.reflect.ClassTag - import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer} import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator} import org.apache.avro.generic.{GenericData, GenericRecord} -import org.roaringbitmap.RoaringBitmap - import org.apache.spark._ import org.apache.spark.api.python.PythonBroadcast import org.apache.spark.broadcast.HttpBroadcast import org.apache.spark.network.util.ByteUnit import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus} +import org.apache.spark.serializer.avro.{GenericAvroSerializer, EmptySchemaRepo, SchemaRepo} import org.apache.spark.storage._ import org.apache.spark.util.collection.CompactBuffer import org.apache.spark.util.{BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf, Utils} +import org.roaringbitmap.RoaringBitmap + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag /** * A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]]. @@ -78,6 +78,7 @@ class KryoSerializer(conf: SparkConf) .filter(!_.isEmpty) private val avroSchemas = conf.getAvroSchema + private val avroSchemaRepo = conf.getOption(SchemaRepo.SCHEMA_REPO_KEY) def newKryoOutput(): KryoOutput = new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) @@ -110,8 +111,12 @@ class KryoSerializer(conf: SparkConf) kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer()) kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer()) - kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas)) - kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas)) + val schemaRepo = SchemaRepo(conf) match { + case Some(repo) => repo + case None => EmptySchemaRepo + } + kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas, schemaRepo)) + kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas, schemaRepo)) try { // scalastyle:off classforname diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/avro/GenericAvroSerializer.scala similarity index 69% rename from core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala rename to core/src/main/scala/org/apache/spark/serializer/avro/GenericAvroSerializer.scala index 62f8aae7f2126..2c5030fdd5e1d 100644 --- a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/avro/GenericAvroSerializer.scala @@ -15,22 +15,22 @@ * limitations under the License. */ -package org.apache.spark.serializer +package org.apache.spark.serializer.avro import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import java.nio.ByteBuffer -import scala.collection.mutable - -import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer} import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} -import org.apache.avro.{Schema, SchemaNormalization} +import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer} +import org.apache.avro.Schema.Parser import org.apache.avro.generic.{GenericData, GenericRecord} import org.apache.avro.io._ +import org.apache.avro.{Schema, SchemaNormalization} import org.apache.commons.io.IOUtils - -import org.apache.spark.{SparkException, SparkEnv} import org.apache.spark.io.CompressionCodec +import org.apache.spark.{SparkEnv, SparkException} + +import scala.collection.mutable /** * Custom serializer used for generic Avro records. If the user registers the schemas @@ -42,7 +42,8 @@ import org.apache.spark.io.CompressionCodec * string representation of the Avro schema, used to decrease the amount of data * that needs to be serialized. */ -private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) +private[serializer] class GenericAvroSerializer(schemas: Map[Long, String], + schemaRepo: SchemaRepo = EmptySchemaRepo) extends KSerializer[GenericRecord] { /** Used to reduce the amount of effort to compress the schema */ @@ -93,6 +94,40 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) def serializeDatum[R <: GenericRecord](datum: R, output: KryoOutput): Unit = { val encoder = EncoderFactory.get.binaryEncoder(output, null) val schema = datum.getSchema + + serializeSchema(datum, schema, output) + + writerCache.getOrElseUpdate(schema, GenericData.get.createDatumWriter(schema)) + .asInstanceOf[DatumWriter[R]] + .write(datum, encoder) + encoder.flush() + } + + /** + * Deserializes generic records into their in-memory form. There is internal + * state to keep a cache of already seen schemas and datum readers. + */ + def deserializeDatum(input: KryoInput): GenericRecord = { + val schema: Schema = deserializeSchema(input) + + val decoder = DecoderFactory.get.directBinaryDecoder(input, null) + readerCache.getOrElseUpdate(schema, GenericData.get.createDatumReader(schema)) + .asInstanceOf[DatumReader[GenericRecord]] + .read(null, decoder) + } + + + /** + * Serialize schema + * Step 1: Calculate the schema's finger print using Avro's SchemaNormilization mechanism. + * Step 2: Use fingerprint to look for the schema in the pre-registered schemas, if found serialize the fingerprint, else step 3 + * Step 3: Use SchemaRepo to find the schemaId of record, if found serialize the schemaId as fingerprint, else step 4 + * Step 4: Serialize the entire schema with indicator of this behavior. + * @param datum - datum to extract id from + * @param schema - schema to serialize + * @param output - kryo output + */ + private def serializeSchema[R <: GenericRecord](datum: R, schema: Schema, output: KryoOutput) = { val fingerprint = fingerprintCache.getOrElseUpdate(schema, { SchemaNormalization.parsingFingerprint64(schema) }) @@ -101,34 +136,48 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) output.writeBoolean(true) output.writeLong(fingerprint) case None => - output.writeBoolean(false) - val compressedSchema = compress(schema) - output.writeInt(compressedSchema.length) - output.writeBytes(compressedSchema) + schemaRepo.extractSchemaId(datum) match { + case Some(schemaId) if schemaRepo.contains(schemaId) => + output.writeBoolean(true) + output.writeLong(schemaId) + case _ => + output.writeBoolean(false) + val compressedSchema = compress(schema) + output.writeInt(compressedSchema.length) + output.writeBytes(compressedSchema) + } } - - writerCache.getOrElseUpdate(schema, GenericData.get.createDatumWriter(schema)) - .asInstanceOf[DatumWriter[R]] - .write(datum, encoder) - encoder.flush() } + /** - * Deserializes generic records into their in-memory form. There is internal - * state to keep a cache of already seen schemas and datum readers. + * Deserialize schema + * If the indicator boolean of finger using is false: + * 1: Deserialize the schema itself from bytes. + * If the indicator boolean of fingerprint using is true: + * Step 1: Search for the schema in the explicitly registered schemas using the fingerprint. If schema was not found, move to step 2. + * Step 2: Search in the schema repository using the fingerprint. At that point if the schema was not found - throw exception. + * @param input KryoInput + * @return the deserialized schema */ - def deserializeDatum(input: KryoInput): GenericRecord = { + private def deserializeSchema(input: KryoInput): Schema = { val schema = { if (input.readBoolean()) { val fingerprint = input.readLong() schemaCache.getOrElseUpdate(fingerprint, { schemas.get(fingerprint) match { - case Some(s) => new Schema.Parser().parse(s) + case Some(s) => new Parser().parse(s) case None => - throw new SparkException( - "Error reading attempting to read avro data -- encountered an unknown " + - s"fingerprint: $fingerprint, not sure what schema to use. This could happen " + - "if you registered additional schemas after starting your spark context.") + schemaRepo.getSchema(fingerprint) match { + case Some(res_schema) => res_schema + case None => + throw new SparkException( + s"""Error reading attempting to read avro data -- + |encountered an unknown fingerprint: $fingerprint, not sure what schema to use. + |This could happen if you registered additional schemas after starting your + |spark context.""".stripMargin) + } + } }) } else { @@ -136,10 +185,7 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) decompress(ByteBuffer.wrap(input.readBytes(length))) } } - val decoder = DecoderFactory.get.directBinaryDecoder(input, null) - readerCache.getOrElseUpdate(schema, GenericData.get.createDatumReader(schema)) - .asInstanceOf[DatumReader[GenericRecord]] - .read(null, decoder) + schema } override def write(kryo: Kryo, output: KryoOutput, datum: GenericRecord): Unit = diff --git a/core/src/main/scala/org/apache/spark/serializer/avro/SchemaRepo.scala b/core/src/main/scala/org/apache/spark/serializer/avro/SchemaRepo.scala new file mode 100644 index 0000000000000..659eb1a0b8503 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/serializer/avro/SchemaRepo.scala @@ -0,0 +1,86 @@ +package org.apache.spark.serializer.avro + +import org.apache.avro.Schema +import org.apache.avro.generic.GenericRecord +import org.apache.spark.{Logging, SparkConf} + +/** + * Created by rotems on 12/6/15. + */ +/** + * A schema repository for avro records. + * This repo assumes that it is possible to extract the schemaId of a record from the record itself. + * @param config sparkConf for configuration purposes + */ +abstract class SchemaRepo(config: SparkConf) { + + /** + * Receive from repo an avro schema as string by its ID + * @param schemaId - the schemaId + * @return schema if found, none otherwise + */ + def getRawSchema(schemaId : Long) : Option[String] + + /** + * Extract schemaId from record. + * @param record current avro record + * @return schemaId if managed to extract, none otherwise + */ + def extractSchemaId(record: GenericRecord) : Option[Long] + + /** + * Checks whether the schema repository contains the following schemaId + * @param schemaId - the schemaId + * @return true if found in repo, false otherwise. + */ + def contains(schemaId: Long) : Boolean + + /** + * Get schema from repo using schemaId as Schema type + * @param schemaId - the schemaId + * @return schema if found, none otherwise + */ + def getSchema(schemaId : Long) : Option[Schema] = { + getRawSchema(schemaId) match { + case Some(s) => Some(new Schema.Parser().parse(s)) + case None => None + + } + } + +} + +object SchemaRepo extends Logging { + val SCHEMA_REPO_KEY = "spark.kryo.avro.schema.repo" + + /** + * Create a schemaRepo using SparkConf + * @param conf - spark conf used to configure the repo. + * @return the initiated SchemaRepo or None if anything goes wrong + */ + def apply(conf: SparkConf) : Option[SchemaRepo]= { + try { + conf.getOption(SCHEMA_REPO_KEY) match { + case Some(clazz) => Some(Class.forName(clazz).getConstructor(classOf[SparkConf]) + .newInstance(conf).asInstanceOf[SchemaRepo]) + case None => None + } + } catch { + case t: Throwable => + log.error(s"Failed to build schema repo. ", t) + None + } + } +} + +/** + * A dummpy empty schema repository. + */ +object EmptySchemaRepo extends SchemaRepo(null) { + + override def getRawSchema(schemaId: Long): Option[String] = None + + override def extractSchemaId(record: GenericRecord): Option[Long] = None + + override def contains(schemaId: Long): Boolean = false +} \ No newline at end of file diff --git a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala index 87f25e7245e1f..3ceaa2d145677 100644 --- a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala @@ -17,24 +17,21 @@ package org.apache.spark.serializer +import org.apache.spark.serializer.GenericAvroSerializerSuite._ import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import java.nio.ByteBuffer import com.esotericsoftware.kryo.io.{Output, Input} +import org.apache.avro.generic.GenericRecord import org.apache.avro.{SchemaBuilder, Schema} import org.apache.avro.generic.GenericData.Record +import org.apache.spark.serializer.avro.{GenericAvroSerializer, SchemaRepo} -import org.apache.spark.{SparkFunSuite, SharedSparkContext} +import org.apache.spark.{SparkConf, SparkFunSuite, SharedSparkContext} class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext { conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - val schema : Schema = SchemaBuilder - .record("testRecord").fields() - .requiredString("data") - .endRecord() - val record = new Record(schema) - record.put("data", "test data") test("schema compression and decompression") { val genericSer = new GenericAvroSerializer(conf.getAvroSchema) @@ -81,4 +78,112 @@ class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext { assert(compressedSchema.eq(genericSer.compress(schema))) assert(decompressedSchema.eq(genericSer.decompress(ByteBuffer.wrap(compressedSchema)))) } + + test("found in schema repository") { + val schemaRepo = new TestSchemaRepo(conf) + val genericSerFingerWithRepo = new GenericAvroSerializer(conf.getAvroSchema, schemaRepo) + + val outputStream = new ByteArrayOutputStream() + val output = new Output(outputStream) + genericSerFingerWithRepo.serializeDatum(record, output) + + output.flush() + output.close() + + val input = new Input(new ByteArrayInputStream(outputStream.toByteArray)) + assert(genericSerFingerWithRepo.deserializeDatum(input) === record) + + } + + test("extracted schemaId which is missing from schema repository") { + val schemaRepo = new TestSchemaRepo(conf) + val genericSerFingerWithRepo = new GenericAvroSerializer(conf.getAvroSchema, schemaRepo) + + val outputStream = new ByteArrayOutputStream() + val output = new Output(outputStream) + genericSerFingerWithRepo.serializeDatum(record2, output) + + output.flush() + output.close() + + val input = new Input(new ByteArrayInputStream(outputStream.toByteArray)) + assert(genericSerFingerWithRepo.deserializeDatum(input) === record2) + } + + test("no schemaId extracted from record") { + val schemaRepo = new TestSchemaRepo(conf) + val genericSerFingerWithRepo = new GenericAvroSerializer(conf.getAvroSchema, schemaRepo) + + val outputStream = new ByteArrayOutputStream() + val output = new Output(outputStream) + genericSerFingerWithRepo.serializeDatum(record3, output) + + output.flush() + output.close() + + val input = new Input(new ByteArrayInputStream(outputStream.toByteArray)) + assert(genericSerFingerWithRepo.deserializeDatum(input) === record3) + } + + test("registered schemas takes precedence over schema repository") { + conf.registerAvroSchemas(schema) + val schemaRepo = new TestSchemaRepo(conf) + val genericSerFingerWithRepo = new GenericAvroSerializer(conf.getAvroSchema, schemaRepo) + + val outputStream = new ByteArrayOutputStream() + val output = new Output(outputStream) + genericSerFingerWithRepo.serializeDatum(record2, output) + + output.flush() + output.close() + + val input = new Input(new ByteArrayInputStream(outputStream.toByteArray)) + assert(genericSerFingerWithRepo.deserializeDatum(input) === record2) + } + + class TestSchemaRepo(conf: SparkConf) extends SchemaRepo(conf) { + val repo = Map[Long,String](1L -> schema.toString) + /** + * Receive from repo an avro schema as string by its ID + * @param schemaId - the schemaId + * @return schema if found, none otherwise + */ + override def getRawSchema(schemaId: Long): Option[String] = { + repo.get(schemaId) + } + + /** + * Extract schemaId from record. + * @param r current avro record + * @return schemaId if managed to extract, none otherwise + */ + override def extractSchemaId(r: GenericRecord): Option[Long] = { + if(r equals record) Some(1L) + else if(r equals record2) Some(2L) + else None + } + + /** + * Checks whether the schema repository contains the following schemaId + * @param schemaId - the schemaId + * @return true if found in repo, false otherwise. + */ + override def contains(schemaId: Long): Boolean = repo.contains(schemaId) + } +} + +object GenericAvroSerializerSuite { + val schema : Schema = SchemaBuilder + .record("testRecord").fields() + .requiredString("data") + .endRecord() + val record = new Record(schema) + record.put("data", "test data") + + val record2 = new Record(schema) + record2.put("data", "test data2") + + val record3 = new Record(schema) + record3.put("data", "test data3") + } diff --git a/docs/configuration.md b/docs/configuration.md index 7b9235361bcc6..cd0cdb27a0d2c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -656,6 +656,16 @@ Apart from these, the following properties are also available, and may be useful See the tuning guide for more details. + + spark.kryo.avro.schema.repo + (none) + + If you use Kryo serialization and Avro's GenericRecords, give a schema repository class implementation to improve shuffle performance. This + property is useful if you need to serialize GenericRecord and/or GenericData objects and don't know their schema ahead of time. + Otherwise SparkConf.registerAvroSchemas is simpler. It should be + set to classes that extend org.apache.spark.serializer.avro.SchemaRepo + + spark.kryoserializer.buffer.max 64m From e44c7947897d2bbd44444bf1395c3220a5806b9f Mon Sep 17 00:00:00 2001 From: rotems Date: Tue, 8 Dec 2015 09:57:33 +0200 Subject: [PATCH 7/8] SPARK-12197: Kryo - Support Avro SchemaRepo Removed unnecessary line --- .../main/scala/org/apache/spark/serializer/KryoSerializer.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index ec4985a336ff0..70c1aad519ff2 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -78,7 +78,6 @@ class KryoSerializer(conf: SparkConf) .filter(!_.isEmpty) private val avroSchemas = conf.getAvroSchema - private val avroSchemaRepo = conf.getOption(SchemaRepo.SCHEMA_REPO_KEY) def newKryoOutput(): KryoOutput = new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) From ad5b27af9f52ff90558aad92e03a236de4197f34 Mon Sep 17 00:00:00 2001 From: rotems Date: Fri, 11 Dec 2015 23:33:12 +0200 Subject: [PATCH 8/8] SPARK-12197: Kryo - Support Avro SchemaRepo - Fixing typo --- .../scala/org/apache/spark/serializer/avro/SchemaRepo.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/avro/SchemaRepo.scala b/core/src/main/scala/org/apache/spark/serializer/avro/SchemaRepo.scala index 659eb1a0b8503..3407217f6f597 100644 --- a/core/src/main/scala/org/apache/spark/serializer/avro/SchemaRepo.scala +++ b/core/src/main/scala/org/apache/spark/serializer/avro/SchemaRepo.scala @@ -74,7 +74,7 @@ object SchemaRepo extends Logging { } /** - * A dummpy empty schema repository. + * A dummy empty schema repository. */ object EmptySchemaRepo extends SchemaRepo(null) { @@ -83,4 +83,4 @@ object EmptySchemaRepo extends SchemaRepo(null) { override def extractSchemaId(record: GenericRecord): Option[Long] = None override def contains(schemaId: Long): Boolean = false -} \ No newline at end of file +}