From e97d7f92ad4cb075234772c24f246bf51eff6cc7 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sun, 24 Jan 2016 18:50:40 -0800 Subject: [PATCH 1/5] CountMinSketch serialization --- .../spark/util/sketch/CountMinSketch.java | 54 +++++++++++- .../spark/util/sketch/CountMinSketchImpl.java | 83 ++++++++++++++++--- .../sketch/CountMinSketchMergeException.java | 24 ++++++ .../util/sketch/CountMinSketchSuite.scala | 38 +++++++++ 4 files changed, 183 insertions(+), 16 deletions(-) create mode 100644 common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchMergeException.java diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java index 21b161bc74ae..2d0aa13ae189 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java @@ -17,6 +17,8 @@ package org.apache.spark.util.sketch; +import java.io.DataInputStream; +import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -54,6 +56,25 @@ * This implementation is largely based on the {@code CountMinSketch} class from stream-lib. */ abstract public class CountMinSketch { + /** + * Version number of the serialized binary format. + */ + public enum Version { + V1(1); + + private final int versionNumber; + + Version(int versionNumber) { + this.versionNumber = versionNumber; + } + + public int getVersionNumber() { + return versionNumber; + } + } + + public abstract Version version(); + /** * Returns the relative error (or {@code eps}) of this {@link CountMinSketch}. */ @@ -99,19 +120,44 @@ abstract public class CountMinSketch { * * Note that only Count-Min sketches with the same {@code depth}, {@code width}, and random seed * can be merged. + * + * @exception CountMinSketchMergeException if the {@code other} {@link CountMinSketch} has + * incompatible depth, width, relative-error, confidence, or random seed. */ - public abstract CountMinSketch mergeInPlace(CountMinSketch other); + public abstract CountMinSketch mergeInPlace(CountMinSketch other) + throws CountMinSketchMergeException; /** * Writes out this {@link CountMinSketch} to an output stream in binary format. */ - public abstract void writeTo(OutputStream out); + public abstract void writeTo(OutputStream out) throws IOException; /** * Reads in a {@link CountMinSketch} from an input stream. */ - public static CountMinSketch readFrom(InputStream in) { - throw new UnsupportedOperationException("Not implemented yet"); + public static CountMinSketch readFrom(InputStream in) throws IOException { + DataInputStream dis = new DataInputStream(in); + + // Ignores version number + dis.readInt(); + + long totalCount = dis.readLong(); + int depth = dis.readInt(); + int width = dis.readInt(); + + long hashA[] = new long[depth]; + for (int i = 0; i < depth; ++i) { + hashA[i] = dis.readLong(); + } + + long table[][] = new long[depth][width]; + for (int i = 0; i < depth; ++i) { + for (int j = 0; j < width; ++j) { + table[i][j] = dis.readLong(); + } + } + + return new CountMinSketchImpl(depth, width, totalCount, hashA, table); } /** diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java index e9fdbe3a8686..43cb8af10adb 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java @@ -17,6 +17,8 @@ package org.apache.spark.util.sketch; +import java.io.DataOutputStream; +import java.io.IOException; import java.io.OutputStream; import java.io.UnsupportedEncodingException; import java.util.Arrays; @@ -51,6 +53,53 @@ public CountMinSketchImpl(double eps, double confidence, int seed) { initTablesWith(depth, width, seed); } + public CountMinSketchImpl(int depth, int width, long totalCount, long hashA[], long table[][]) { + this.depth = depth; + this.width = width; + this.eps = 2.0 / width; + this.confidence = 1 - 1 / Math.pow(2, depth); + this.hashA = hashA; + this.table = table; + this.totalCount = totalCount; + } + + @Override + public boolean equals(Object other) { + if (other == this) { + return true; + } + + if (!(other instanceof CountMinSketchImpl)) { + return false; + } + + CountMinSketchImpl that = (CountMinSketchImpl) other; + + if (this.depth == that.depth && + this.width == that.width && + this.totalCount == that.totalCount) { + for (int i = 0; i < depth; ++i) { + if (this.hashA[i] != that.hashA[i]) { + return false; + } + + for (int j = 0; j < width; ++j) { + if (this.table[i][j] != that.table[i][j]) { + return false; + } + } + } + return true; + } else { + return false; + } + } + + @Override + public Version version() { + return Version.V1; + } + private void initTablesWith(int depth, int width, int seed) { this.table = new long[depth][width]; this.hashA = new long[depth]; @@ -221,27 +270,27 @@ private long estimateCountForStringItem(String item) { } @Override - public CountMinSketch mergeInPlace(CountMinSketch other) { + public CountMinSketch mergeInPlace(CountMinSketch other) throws CountMinSketchMergeException { if (other == null) { - throw new CMSMergeException("Cannot merge null estimator"); + throw new CountMinSketchMergeException("Cannot merge null estimator"); } if (!(other instanceof CountMinSketchImpl)) { - throw new CMSMergeException("Cannot merge estimator of class " + other.getClass().getName()); + throw new CountMinSketchMergeException("Cannot merge estimator of class " + other.getClass().getName()); } CountMinSketchImpl that = (CountMinSketchImpl) other; if (this.depth != that.depth) { - throw new CMSMergeException("Cannot merge estimators of different depth"); + throw new CountMinSketchMergeException("Cannot merge estimators of different depth"); } if (this.width != that.width) { - throw new CMSMergeException("Cannot merge estimators of different width"); + throw new CountMinSketchMergeException("Cannot merge estimators of different width"); } if (!Arrays.equals(this.hashA, that.hashA)) { - throw new CMSMergeException("Cannot merge estimators of different seed"); + throw new CountMinSketchMergeException("Cannot merge estimators of different seed"); } for (int i = 0; i < this.table.length; ++i) { @@ -256,13 +305,23 @@ public CountMinSketch mergeInPlace(CountMinSketch other) { } @Override - public void writeTo(OutputStream out) { - throw new UnsupportedOperationException("Not implemented yet"); - } + public void writeTo(OutputStream out) throws IOException { + DataOutputStream dos = new DataOutputStream(out); + + dos.writeInt(version().getVersionNumber()); + + dos.writeLong(this.totalCount); + dos.writeInt(this.depth); + dos.writeInt(this.width); - protected static class CMSMergeException extends RuntimeException { - public CMSMergeException(String message) { - super(message); + for (int i = 0; i < this.depth; ++i) { + dos.writeLong(this.hashA[i]); + } + + for (int i = 0; i < this.depth; ++i) { + for (int j = 0; j < this.width; ++j) { + dos.writeLong(table[i][j]); + } } } } diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchMergeException.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchMergeException.java new file mode 100644 index 000000000000..e751556421bb --- /dev/null +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchMergeException.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.sketch; + +public class CountMinSketchMergeException extends Exception { + public CountMinSketchMergeException(String message) { + super(message); + } +} diff --git a/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala index ec5b4eddeca0..f19648fbd17d 100644 --- a/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala +++ b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.util.sketch +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} + import scala.reflect.ClassTag import scala.util.Random @@ -29,6 +31,16 @@ class CountMinSketchSuite extends FunSuite { // scalastyle:ignore funsuite private val seed = 42 + private def checkSerDe(sketch: CountMinSketch): Unit = { + val out = new ByteArrayOutputStream() + sketch.writeTo(out) + + val in = new ByteArrayInputStream(out.toByteArray) + val deserialized = CountMinSketch.readFrom(in) + + assert(sketch === deserialized) + } + def testAccuracy[T: ClassTag](typeName: String)(itemGenerator: Random => T): Unit = { test(s"accuracy - $typeName") { val r = new Random() @@ -45,7 +57,10 @@ class CountMinSketchSuite extends FunSuite { // scalastyle:ignore funsuite } val sketch = CountMinSketch.create(epsOfTotalCount, confidence, seed) + checkSerDe(sketch) + sampledItemIndices.foreach(i => sketch.add(allItems(i))) + checkSerDe(sketch) val probCorrect = { val numErrors = allItems.map { item => @@ -75,11 +90,16 @@ class CountMinSketchSuite extends FunSuite { // scalastyle:ignore funsuite val sketches = perSketchItems.map { items => val sketch = CountMinSketch.create(epsOfTotalCount, confidence, seed) + checkSerDe(sketch) + items.foreach(sketch.add) + checkSerDe(sketch) + sketch } val mergedSketch = sketches.reduce(_ mergeInPlace _) + checkSerDe(mergedSketch) val expectedSketch = { val sketch = CountMinSketch.create(epsOfTotalCount, confidence, seed) @@ -109,4 +129,22 @@ class CountMinSketchSuite extends FunSuite { // scalastyle:ignore funsuite testItemType[Long]("Long") { _.nextLong() } testItemType[String]("String") { r => r.nextString(r.nextInt(20)) } + + test("incompatible merge") { + intercept[CountMinSketchMergeException] { + CountMinSketch.create(10, 10, 1).mergeInPlace(null) + } + + intercept[CountMinSketchMergeException] { + val sketch1 = CountMinSketch.create(10, 20, 1) + val sketch2 = CountMinSketch.create(10, 20, 2) + sketch1.mergeInPlace(sketch2) + } + + intercept[CountMinSketchMergeException] { + val sketch1 = CountMinSketch.create(10, 10, 1) + val sketch2 = CountMinSketch.create(10, 20, 2) + sketch1.mergeInPlace(sketch2) + } + } } From 12bbefb675ea0ce0bfa9eec90f121d36ea57c64c Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 25 Jan 2016 13:38:13 -0800 Subject: [PATCH 2/5] Addresses PR comments --- .../spark/util/sketch/CountMinSketch.java | 24 +----- .../spark/util/sketch/CountMinSketchImpl.java | 84 ++++++++++++++----- .../util/sketch/CountMinSketchSuite.scala | 5 +- 3 files changed, 68 insertions(+), 45 deletions(-) diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java index 2d0aa13ae189..189a8f7ecdb0 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java @@ -17,7 +17,6 @@ package org.apache.spark.util.sketch; -import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -136,28 +135,7 @@ public abstract CountMinSketch mergeInPlace(CountMinSketch other) * Reads in a {@link CountMinSketch} from an input stream. */ public static CountMinSketch readFrom(InputStream in) throws IOException { - DataInputStream dis = new DataInputStream(in); - - // Ignores version number - dis.readInt(); - - long totalCount = dis.readLong(); - int depth = dis.readInt(); - int width = dis.readInt(); - - long hashA[] = new long[depth]; - for (int i = 0; i < depth; ++i) { - hashA[i] = dis.readLong(); - } - - long table[][] = new long[depth][width]; - for (int i = 0; i < depth; ++i) { - for (int j = 0; j < width; ++j) { - table[i][j] = dis.readLong(); - } - } - - return new CountMinSketchImpl(depth, width, totalCount, hashA, table); + return CountMinSketchImpl.readFrom(in); } /** diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java index 43cb8af10adb..2ea57ea59e3d 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java @@ -17,13 +17,30 @@ package org.apache.spark.util.sketch; +import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.io.UnsupportedEncodingException; import java.util.Arrays; import java.util.Random; +/* + * Binary format of a serialized CountMinSketchImpl, version 1 (all values written in big-endian + * order): + * + * - Version number, always 1 (32 bit) + * - Total count of added items (64 bit) + * - Depth (32 bit) + * - Width (32 bit) + * - Hash functions (depth * 64 bit) + * - Count table + * - Row 0 (width * 64 bit) + * - Row 1 (width * 64 bit) + * - ... + * - Row depth - 1 (width * 64 bit) + */ class CountMinSketchImpl extends CountMinSketch { public static final long PRIME_MODULUS = (1L << 31) - 1; @@ -35,7 +52,7 @@ class CountMinSketchImpl extends CountMinSketch { private double eps; private double confidence; - public CountMinSketchImpl(int depth, int width, int seed) { + CountMinSketchImpl(int depth, int width, int seed) { this.depth = depth; this.width = width; this.eps = 2.0 / width; @@ -43,7 +60,7 @@ public CountMinSketchImpl(int depth, int width, int seed) { initTablesWith(depth, width, seed); } - public CountMinSketchImpl(double eps, double confidence, int seed) { + CountMinSketchImpl(double eps, double confidence, int seed) { // 2/w = eps ; w = 2/eps // 1/2^depth <= 1-confidence ; depth >= -log2 (1-confidence) this.eps = eps; @@ -53,7 +70,7 @@ public CountMinSketchImpl(double eps, double confidence, int seed) { initTablesWith(depth, width, seed); } - public CountMinSketchImpl(int depth, int width, long totalCount, long hashA[], long table[][]) { + CountMinSketchImpl(int depth, int width, long totalCount, long hashA[], long table[][]) { this.depth = depth; this.width = width; this.eps = 2.0 / width; @@ -75,24 +92,24 @@ public boolean equals(Object other) { CountMinSketchImpl that = (CountMinSketchImpl) other; - if (this.depth == that.depth && - this.width == that.width && - this.totalCount == that.totalCount) { - for (int i = 0; i < depth; ++i) { - if (this.hashA[i] != that.hashA[i]) { - return false; - } - - for (int j = 0; j < width; ++j) { - if (this.table[i][j] != that.table[i][j]) { - return false; - } - } - } - return true; - } else { - return false; - } + return + this.depth == that.depth && + this.width == that.width && + this.totalCount == that.totalCount && + Arrays.equals(this.hashA, that.hashA) && + Arrays.deepEquals(this.table, that.table); + } + + @Override + public int hashCode() { + int hash = depth; + + hash = hash * 31 + width; + hash = hash * 31 + (int) (totalCount ^ (totalCount >>> 32)); + hash = hash * 31 + Arrays.hashCode(hashA); + hash = hash * 31 + Arrays.deepHashCode(table); + + return hash; } @Override @@ -324,4 +341,29 @@ public void writeTo(OutputStream out) throws IOException { } } } + + public static CountMinSketchImpl readFrom(InputStream in) throws IOException { + DataInputStream dis = new DataInputStream(in); + + // Ignores version number + dis.readInt(); + + long totalCount = dis.readLong(); + int depth = dis.readInt(); + int width = dis.readInt(); + + long hashA[] = new long[depth]; + for (int i = 0; i < depth; ++i) { + hashA[i] = dis.readLong(); + } + + long table[][] = new long[depth][width]; + for (int i = 0; i < depth; ++i) { + for (int j = 0; j < width; ++j) { + table[i][j] = dis.readLong(); + } + } + + return new CountMinSketchImpl(depth, width, totalCount, hashA, table); + } } diff --git a/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala index f19648fbd17d..631f79b71e2c 100644 --- a/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala +++ b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala @@ -31,6 +31,8 @@ class CountMinSketchSuite extends FunSuite { // scalastyle:ignore funsuite private val seed = 42 + // Serializes and deserializes a given `CountMinSketch`, then checks whether the deserialized + // version is equivalent to the original one. private def checkSerDe(sketch: CountMinSketch): Unit = { val out = new ByteArrayOutputStream() sketch.writeTo(out) @@ -43,7 +45,8 @@ class CountMinSketchSuite extends FunSuite { // scalastyle:ignore funsuite def testAccuracy[T: ClassTag](typeName: String)(itemGenerator: Random => T): Unit = { test(s"accuracy - $typeName") { - val r = new Random() + // Uses fixed seed to ensure reproducible test execution + val r = new Random(31) val numAllItems = 1000000 val allItems = Array.fill(numAllItems)(itemGenerator(r)) From 4abc4e06fc7077e10acad442ed2d81e5b156118f Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 25 Jan 2016 13:40:19 -0800 Subject: [PATCH 3/5] Exception class renaming --- .../apache/spark/util/sketch/CountMinSketch.java | 4 ++-- .../spark/util/sketch/CountMinSketchImpl.java | 14 ++++++++------ ...eption.java => IncompatibleMergeException.java} | 4 ++-- .../spark/util/sketch/CountMinSketchSuite.scala | 6 +++--- 4 files changed, 15 insertions(+), 13 deletions(-) rename common/sketch/src/main/java/org/apache/spark/util/sketch/{CountMinSketchMergeException.java => IncompatibleMergeException.java} (88%) diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java index 189a8f7ecdb0..67938644d9f6 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java @@ -120,11 +120,11 @@ public int getVersionNumber() { * Note that only Count-Min sketches with the same {@code depth}, {@code width}, and random seed * can be merged. * - * @exception CountMinSketchMergeException if the {@code other} {@link CountMinSketch} has + * @exception IncompatibleMergeException if the {@code other} {@link CountMinSketch} has * incompatible depth, width, relative-error, confidence, or random seed. */ public abstract CountMinSketch mergeInPlace(CountMinSketch other) - throws CountMinSketchMergeException; + throws IncompatibleMergeException; /** * Writes out this {@link CountMinSketch} to an output stream in binary format. diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java index 2ea57ea59e3d..8ea716df4a5c 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java @@ -287,27 +287,29 @@ private long estimateCountForStringItem(String item) { } @Override - public CountMinSketch mergeInPlace(CountMinSketch other) throws CountMinSketchMergeException { + public CountMinSketch mergeInPlace(CountMinSketch other) throws IncompatibleMergeException { if (other == null) { - throw new CountMinSketchMergeException("Cannot merge null estimator"); + throw new IncompatibleMergeException("Cannot merge null estimator"); } if (!(other instanceof CountMinSketchImpl)) { - throw new CountMinSketchMergeException("Cannot merge estimator of class " + other.getClass().getName()); + throw new IncompatibleMergeException( + "Cannot merge estimator of class " + other.getClass().getName() + ); } CountMinSketchImpl that = (CountMinSketchImpl) other; if (this.depth != that.depth) { - throw new CountMinSketchMergeException("Cannot merge estimators of different depth"); + throw new IncompatibleMergeException("Cannot merge estimators of different depth"); } if (this.width != that.width) { - throw new CountMinSketchMergeException("Cannot merge estimators of different width"); + throw new IncompatibleMergeException("Cannot merge estimators of different width"); } if (!Arrays.equals(this.hashA, that.hashA)) { - throw new CountMinSketchMergeException("Cannot merge estimators of different seed"); + throw new IncompatibleMergeException("Cannot merge estimators of different seed"); } for (int i = 0; i < this.table.length; ++i) { diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchMergeException.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/IncompatibleMergeException.java similarity index 88% rename from common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchMergeException.java rename to common/sketch/src/main/java/org/apache/spark/util/sketch/IncompatibleMergeException.java index e751556421bb..64b567caa57c 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchMergeException.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/IncompatibleMergeException.java @@ -17,8 +17,8 @@ package org.apache.spark.util.sketch; -public class CountMinSketchMergeException extends Exception { - public CountMinSketchMergeException(String message) { +public class IncompatibleMergeException extends Exception { + public IncompatibleMergeException(String message) { super(message); } } diff --git a/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala index 631f79b71e2c..be4984e9cb30 100644 --- a/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala +++ b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala @@ -134,17 +134,17 @@ class CountMinSketchSuite extends FunSuite { // scalastyle:ignore funsuite testItemType[String]("String") { r => r.nextString(r.nextInt(20)) } test("incompatible merge") { - intercept[CountMinSketchMergeException] { + intercept[IncompatibleMergeException] { CountMinSketch.create(10, 10, 1).mergeInPlace(null) } - intercept[CountMinSketchMergeException] { + intercept[IncompatibleMergeException] { val sketch1 = CountMinSketch.create(10, 20, 1) val sketch2 = CountMinSketch.create(10, 20, 2) sketch1.mergeInPlace(sketch2) } - intercept[CountMinSketchMergeException] { + intercept[IncompatibleMergeException] { val sketch1 = CountMinSketch.create(10, 10, 1) val sketch2 = CountMinSketch.create(10, 20, 2) sketch1.mergeInPlace(sketch2) From 9acfe33037cbe9c1c8b5f38e2fdc45159b523ed9 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 25 Jan 2016 14:00:50 -0800 Subject: [PATCH 4/5] Fixes CountMinSketchImpl.equals --- .../java/org/apache/spark/util/sketch/CountMinSketchImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java index 8ea716df4a5c..0209446ea3b1 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java @@ -86,7 +86,7 @@ public boolean equals(Object other) { return true; } - if (!(other instanceof CountMinSketchImpl)) { + if (other == null || !(other instanceof CountMinSketchImpl)) { return false; } From 4636af1b3e37c9be30d4e50e125a16005f8d0fab Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 25 Jan 2016 14:10:14 -0800 Subject: [PATCH 5/5] Uses fixed random seed in test case --- .../org/apache/spark/util/sketch/CountMinSketchSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala index be4984e9cb30..b9c7f5c23a8f 100644 --- a/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala +++ b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala @@ -84,7 +84,9 @@ class CountMinSketchSuite extends FunSuite { // scalastyle:ignore funsuite def testMergeInPlace[T: ClassTag](typeName: String)(itemGenerator: Random => T): Unit = { test(s"mergeInPlace - $typeName") { - val r = new Random() + // Uses fixed seed to ensure reproducible test execution + val r = new Random(31) + val numToMerge = 5 val numItemsPerSketch = 100000 val perSketchItems = Array.fill(numToMerge, numItemsPerSketch) {