Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,25 @@ import java.io.{DataInput, DataOutput, EOFException, IOException, InputStream, O
import java.nio.ByteBuffer
import javax.annotation.Nullable

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer}
import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.roaringbitmap.RoaringBitmap

import org.apache.spark._
import org.apache.spark.api.python.PythonBroadcast
import org.apache.spark.broadcast.HttpBroadcast
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.serializer.avro.{GenericAvroSerializer, EmptySchemaRepo, SchemaRepo}
import org.apache.spark.storage._
import org.apache.spark.util.collection.CompactBuffer
import org.apache.spark.util.{BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf, Utils}
import org.roaringbitmap.RoaringBitmap

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

/**
* A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]].
Expand Down Expand Up @@ -110,8 +110,12 @@ class KryoSerializer(conf: SparkConf)
kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer())

kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas))
kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas))
val schemaRepo = SchemaRepo(conf) match {
case Some(repo) => repo
case None => EmptySchemaRepo
}
kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas, schemaRepo))
kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas, schemaRepo))

try {
// scalastyle:off classforname
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,22 @@
* limitations under the License.
*/

package org.apache.spark.serializer
package org.apache.spark.serializer.avro

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.nio.ByteBuffer

import scala.collection.mutable

import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import org.apache.avro.{Schema, SchemaNormalization}
import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer}
import org.apache.avro.Schema.Parser
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.io._
import org.apache.avro.{Schema, SchemaNormalization}
import org.apache.commons.io.IOUtils

import org.apache.spark.{SparkException, SparkEnv}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.{SparkEnv, SparkException}

import scala.collection.mutable

/**
* Custom serializer used for generic Avro records. If the user registers the schemas
Expand All @@ -42,7 +42,8 @@ import org.apache.spark.io.CompressionCodec
* string representation of the Avro schema, used to decrease the amount of data
* that needs to be serialized.
*/
private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
private[serializer] class GenericAvroSerializer(schemas: Map[Long, String],
schemaRepo: SchemaRepo = EmptySchemaRepo)
extends KSerializer[GenericRecord] {

/** Used to reduce the amount of effort to compress the schema */
Expand Down Expand Up @@ -96,6 +97,40 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
def serializeDatum[R <: GenericRecord](datum: R, output: KryoOutput): Unit = {
val encoder = EncoderFactory.get.binaryEncoder(output, null)
val schema = datum.getSchema

serializeSchema(datum, schema, output)

writerCache.getOrElseUpdate(schema, GenericData.get.createDatumWriter(schema))
.asInstanceOf[DatumWriter[R]]
.write(datum, encoder)
encoder.flush()
}

/**
* Deserializes generic records into their in-memory form. There is internal
* state to keep a cache of already seen schemas and datum readers.
*/
def deserializeDatum(input: KryoInput): GenericRecord = {
val schema: Schema = deserializeSchema(input)

val decoder = DecoderFactory.get.directBinaryDecoder(input, null)
readerCache.getOrElseUpdate(schema, GenericData.get.createDatumReader(schema))
.asInstanceOf[DatumReader[GenericRecord]]
.read(null, decoder)
}


/**
* Serialize schema
* Step 1: Calculate the schema's finger print using Avro's SchemaNormilization mechanism.
* Step 2: Use fingerprint to look for the schema in the pre-registered schemas, if found serialize the fingerprint, else step 3
* Step 3: Use SchemaRepo to find the schemaId of record, if found serialize the schemaId as fingerprint, else step 4
* Step 4: Serialize the entire schema with indicator of this behavior.
* @param datum - datum to extract id from
* @param schema - schema to serialize
* @param output - kryo output
*/
private def serializeSchema[R <: GenericRecord](datum: R, schema: Schema, output: KryoOutput) = {
val fingerprint = fingerprintCache.getOrElseUpdate(schema, {
SchemaNormalization.parsingFingerprint64(schema)
})
Expand All @@ -104,45 +139,56 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
output.writeBoolean(true)
output.writeLong(fingerprint)
case None =>
output.writeBoolean(false)
val compressedSchema = compress(schema)
output.writeInt(compressedSchema.length)
output.writeBytes(compressedSchema)
schemaRepo.extractSchemaId(datum) match {
case Some(schemaId) if schemaRepo.contains(schemaId) =>
output.writeBoolean(true)
output.writeLong(schemaId)
case _ =>
output.writeBoolean(false)
val compressedSchema = compress(schema)
output.writeInt(compressedSchema.length)
output.writeBytes(compressedSchema)
}
}

writerCache.getOrElseUpdate(schema, GenericData.get.createDatumWriter(schema))
.asInstanceOf[DatumWriter[R]]
.write(datum, encoder)
encoder.flush()
}


/**
* Deserializes generic records into their in-memory form. There is internal
* state to keep a cache of already seen schemas and datum readers.
* Deserialize schema
* If the indicator boolean of finger using is false:
* 1: Deserialize the schema itself from bytes.
* If the indicator boolean of fingerprint using is true:
* Step 1: Search for the schema in the explicitly registered schemas using the fingerprint. If schema was not found, move to step 2.
* Step 2: Search in the schema repository using the fingerprint. At that point if the schema was not found - throw exception.
* @param input KryoInput
* @return the deserialized schema
*/
def deserializeDatum(input: KryoInput): GenericRecord = {
private def deserializeSchema(input: KryoInput): Schema = {
val schema = {
if (input.readBoolean()) {
val fingerprint = input.readLong()
schemaCache.getOrElseUpdate(fingerprint, {
schemas.get(fingerprint) match {
case Some(s) => new Schema.Parser().parse(s)
case Some(s) => new Parser().parse(s)
case None =>
throw new SparkException(
"Error reading attempting to read avro data -- encountered an unknown " +
s"fingerprint: $fingerprint, not sure what schema to use. This could happen " +
"if you registered additional schemas after starting your spark context.")
schemaRepo.getSchema(fingerprint) match {
case Some(res_schema) => res_schema
case None =>
throw new SparkException(
s"""Error reading attempting to read avro data --
|encountered an unknown fingerprint: $fingerprint, not sure what schema to use.
|This could happen if you registered additional schemas after starting your
|spark context.""".stripMargin)
}

}
})
} else {
val length = input.readInt()
decompress(ByteBuffer.wrap(input.readBytes(length)))
}
}
val decoder = DecoderFactory.get.directBinaryDecoder(input, null)
readerCache.getOrElseUpdate(schema, GenericData.get.createDatumReader(schema))
.asInstanceOf[DatumReader[GenericRecord]]
.read(null, decoder)
schema
}

override def write(kryo: Kryo, output: KryoOutput, datum: GenericRecord): Unit =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package org.apache.spark.serializer.avro

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.{Logging, SparkConf}

/**
* Created by rotems on 12/6/15.
*/
/**
* A schema repository for avro records.
* This repo assumes that it is possible to extract the schemaId of a record from the record itself.
* @param config sparkConf for configuration purposes
*/
abstract class SchemaRepo(config: SparkConf) {

/**
* Receive from repo an avro schema as string by its ID
* @param schemaId - the schemaId
* @return schema if found, none otherwise
*/
def getRawSchema(schemaId : Long) : Option[String]

/**
* Extract schemaId from record.
* @param record current avro record
* @return schemaId if managed to extract, none otherwise
*/
def extractSchemaId(record: GenericRecord) : Option[Long]

/**
* Checks whether the schema repository contains the following schemaId
* @param schemaId - the schemaId
* @return true if found in repo, false otherwise.
*/
def contains(schemaId: Long) : Boolean

/**
* Get schema from repo using schemaId as Schema type
* @param schemaId - the schemaId
* @return schema if found, none otherwise
*/
def getSchema(schemaId : Long) : Option[Schema] = {
getRawSchema(schemaId) match {
case Some(s) => Some(new Schema.Parser().parse(s))
case None => None

}
}

}

object SchemaRepo extends Logging {
val SCHEMA_REPO_KEY = "spark.kryo.avro.schema.repo"

/**
* Create a schemaRepo using SparkConf
* @param conf - spark conf used to configure the repo.
* @return the initiated SchemaRepo or None if anything goes wrong
*/
def apply(conf: SparkConf) : Option[SchemaRepo]= {
try {
conf.getOption(SCHEMA_REPO_KEY) match {
case Some(clazz) => Some(Class.forName(clazz).getConstructor(classOf[SparkConf])
.newInstance(conf).asInstanceOf[SchemaRepo])
case None => None
}
} catch {
case t: Throwable =>
log.error(s"Failed to build schema repo. ", t)
None
}
}
}

/**
* A dummy empty schema repository.
*/
object EmptySchemaRepo extends SchemaRepo(null) {

override def getRawSchema(schemaId: Long): Option[String] = None

override def extractSchemaId(record: GenericRecord): Option[Long] = None

override def contains(schemaId: Long): Boolean = false
}
Loading