@@ -21,6 +21,7 @@ import java.nio.ByteBuffer
2121
2222import scala .reflect .ClassTag
2323
24+ import com .clearspring .analytics .stream .cardinality .HyperLogLog
2425import com .esotericsoftware .kryo .io .{Input , Output }
2526import com .esotericsoftware .kryo .{Serializer , Kryo }
2627
@@ -44,6 +45,8 @@ private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(co
4445 kryo.register(classOf [scala.collection.Map [_,_]], new MapSerializer )
4546 kryo.register(classOf [org.apache.spark.sql.catalyst.expressions.GenericRow ])
4647 kryo.register(classOf [org.apache.spark.sql.catalyst.expressions.GenericMutableRow ])
48+ kryo.register(classOf [com.clearspring.analytics.stream.cardinality.HyperLogLog ],
49+ new HyperLogLogSerializer )
4750 kryo.register(classOf [scala.collection.mutable.ArrayBuffer [_]])
4851 kryo.register(classOf [scala.math.BigDecimal ], new BigDecimalSerializer )
4952 kryo.setReferences(false )
@@ -81,6 +84,20 @@ private[sql] class BigDecimalSerializer extends Serializer[BigDecimal] {
8184 }
8285}
8386
87+ private [sql] class HyperLogLogSerializer extends Serializer [HyperLogLog ] {
88+ def write (kryo : Kryo , output : Output , hyperLogLog : HyperLogLog ) {
89+ val bytes = hyperLogLog.getBytes()
90+ output.writeInt(bytes.length)
91+ output.writeBytes(bytes)
92+ }
93+
94+ def read (kryo : Kryo , input : Input , tpe : Class [HyperLogLog ]): HyperLogLog = {
95+ val length = input.readInt()
96+ val bytes = input.readBytes(length)
97+ HyperLogLog .Builder .build(bytes)
98+ }
99+ }
100+
84101/**
85102 * Maps do not have a no arg constructor and so cannot be serialized by default. So, we serialize
86103 * them as `Array[(k,v)]`.
0 commit comments