From f25306865376b56f6636fe666963040b8840bdc1 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 18 Jan 2016 10:23:11 -0800 Subject: [PATCH 01/16] Adds spark-sketch skeleton --- pom.xml | 1 + sketch/pom.xml | 42 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+) create mode 100644 sketch/pom.xml diff --git a/pom.xml b/pom.xml index 43f08efaae86..ad4f2d9818e2 100644 --- a/pom.xml +++ b/pom.xml @@ -97,6 +97,7 @@ sql/catalyst sql/core sql/hive + sketch docker-integration-tests unsafe assembly diff --git a/sketch/pom.xml b/sketch/pom.xml new file mode 100644 index 000000000000..9b4a98eccafb --- /dev/null +++ b/sketch/pom.xml @@ -0,0 +1,42 @@ + + + + + 4.0.0 + + org.apache.spark + spark-parent_2.10 + 2.0.0-SNAPSHOT + ../pom.xml + + + org.apache.spark + spark-sketch_2.10 + jar + Spark Project Sketch + http://spark.apache.org/ + + sketch + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + From 1ecfd26f013a1b7108a00c342578620e28f49971 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 19 Jan 2016 13:17:30 -0800 Subject: [PATCH 02/16] Moves sketch to common/sketch --- {sketch => common/sketch}/pom.xml | 2 +- pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename {sketch => common/sketch}/pom.xml (97%) diff --git a/sketch/pom.xml b/common/sketch/pom.xml similarity index 97% rename from sketch/pom.xml rename to common/sketch/pom.xml index 9b4a98eccafb..67723fa421ab 100644 --- a/sketch/pom.xml +++ b/common/sketch/pom.xml @@ -23,7 +23,7 @@ org.apache.spark spark-parent_2.10 2.0.0-SNAPSHOT - ../pom.xml + ../../pom.xml org.apache.spark diff --git a/pom.xml b/pom.xml index ad4f2d9818e2..8194517af1a4 100644 --- a/pom.xml +++ b/pom.xml @@ -86,6 +86,7 @@ + common/sketch tags core graphx @@ -97,7 +98,6 @@ sql/catalyst sql/core sql/hive - sketch docker-integration-tests unsafe assembly From c9b09248e3fb01853177768324cc9a2a73b6d326 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 19 Jan 2016 13:52:10 -0800 Subject: [PATCH 03/16] Copies Murmur3_x86_32 and Platform from unsafe --- .../org/apache/spark/unsafe/Platform.java | 170 ++++++++++++++++++ .../spark/unsafe/hash/Murmur3_x86_32.java | 126 +++++++++++++ 2 files changed, 296 insertions(+) create mode 100644 common/sketch/src/main/java/org/apache/spark/unsafe/Platform.java create mode 100644 common/sketch/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java diff --git a/common/sketch/src/main/java/org/apache/spark/unsafe/Platform.java b/common/sketch/src/main/java/org/apache/spark/unsafe/Platform.java new file mode 100644 index 000000000000..0d6b215fe5aa --- /dev/null +++ b/common/sketch/src/main/java/org/apache/spark/unsafe/Platform.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe; + +import java.lang.reflect.Field; + +import sun.misc.Unsafe; + +public final class Platform { + + private static final Unsafe _UNSAFE; + + public static final int BYTE_ARRAY_OFFSET; + + public static final int INT_ARRAY_OFFSET; + + public static final int LONG_ARRAY_OFFSET; + + public static final int DOUBLE_ARRAY_OFFSET; + + public static int getInt(Object object, long offset) { + return _UNSAFE.getInt(object, offset); + } + + public static void putInt(Object object, long offset, int value) { + _UNSAFE.putInt(object, offset, value); + } + + public static boolean getBoolean(Object object, long offset) { + return _UNSAFE.getBoolean(object, offset); + } + + public static void putBoolean(Object object, long offset, boolean value) { + _UNSAFE.putBoolean(object, offset, value); + } + + public static byte getByte(Object object, long offset) { + return _UNSAFE.getByte(object, offset); + } + + public static void putByte(Object object, long offset, byte value) { + _UNSAFE.putByte(object, offset, value); + } + + public static short getShort(Object object, long offset) { + return _UNSAFE.getShort(object, offset); + } + + public static void putShort(Object object, long offset, short value) { + _UNSAFE.putShort(object, offset, value); + } + + public static long getLong(Object object, long offset) { + return _UNSAFE.getLong(object, offset); + } + + public static void putLong(Object object, long offset, long value) { + _UNSAFE.putLong(object, offset, value); + } + + public static float getFloat(Object object, long offset) { + return _UNSAFE.getFloat(object, offset); + } + + public static void putFloat(Object object, long offset, float value) { + _UNSAFE.putFloat(object, offset, value); + } + + public static double getDouble(Object object, long offset) { + return _UNSAFE.getDouble(object, offset); + } + + public static void putDouble(Object object, long offset, double value) { + _UNSAFE.putDouble(object, offset, value); + } + + public static Object getObjectVolatile(Object object, long offset) { + return _UNSAFE.getObjectVolatile(object, offset); + } + + public static void putObjectVolatile(Object object, long offset, Object value) { + _UNSAFE.putObjectVolatile(object, offset, value); + } + + public static long allocateMemory(long size) { + return _UNSAFE.allocateMemory(size); + } + + public static void freeMemory(long address) { + _UNSAFE.freeMemory(address); + } + + public static void copyMemory( + Object src, long srcOffset, Object dst, long dstOffset, long length) { + // Check if dstOffset is before or after srcOffset to determine if we should copy + // forward or backwards. This is necessary in case src and dst overlap. + if (dstOffset < srcOffset) { + while (length > 0) { + long size = Math.min(length, UNSAFE_COPY_THRESHOLD); + _UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size); + length -= size; + srcOffset += size; + dstOffset += size; + } + } else { + srcOffset += length; + dstOffset += length; + while (length > 0) { + long size = Math.min(length, UNSAFE_COPY_THRESHOLD); + srcOffset -= size; + dstOffset -= size; + _UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size); + length -= size; + } + + } + } + + /** + * Raises an exception bypassing compiler checks for checked exceptions. + */ + public static void throwException(Throwable t) { + _UNSAFE.throwException(t); + } + + /** + * Limits the number of bytes to copy per {@link Unsafe#copyMemory(long, long, long)} to + * allow safepoint polling during a large copy. + */ + private static final long UNSAFE_COPY_THRESHOLD = 1024L * 1024L; + + static { + sun.misc.Unsafe unsafe; + try { + Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe"); + unsafeField.setAccessible(true); + unsafe = (sun.misc.Unsafe) unsafeField.get(null); + } catch (Throwable cause) { + unsafe = null; + } + _UNSAFE = unsafe; + + if (_UNSAFE != null) { + BYTE_ARRAY_OFFSET = _UNSAFE.arrayBaseOffset(byte[].class); + INT_ARRAY_OFFSET = _UNSAFE.arrayBaseOffset(int[].class); + LONG_ARRAY_OFFSET = _UNSAFE.arrayBaseOffset(long[].class); + DOUBLE_ARRAY_OFFSET = _UNSAFE.arrayBaseOffset(double[].class); + } else { + BYTE_ARRAY_OFFSET = 0; + INT_ARRAY_OFFSET = 0; + LONG_ARRAY_OFFSET = 0; + DOUBLE_ARRAY_OFFSET = 0; + } + } +} diff --git a/common/sketch/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java b/common/sketch/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java new file mode 100644 index 000000000000..5e7ee480cafd --- /dev/null +++ b/common/sketch/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe.hash; + +import org.apache.spark.unsafe.Platform; + +/** + * 32-bit Murmur3 hasher. This is based on Guava's Murmur3_32HashFunction. + */ +public final class Murmur3_x86_32 { + private static final int C1 = 0xcc9e2d51; + private static final int C2 = 0x1b873593; + + private final int seed; + + public Murmur3_x86_32(int seed) { + this.seed = seed; + } + + @Override + public String toString() { + return "Murmur3_32(seed=" + seed + ")"; + } + + public int hashInt(int input) { + return hashInt(input, seed); + } + + public static int hashInt(int input, int seed) { + int k1 = mixK1(input); + int h1 = mixH1(seed, k1); + + return fmix(h1, 4); + } + + public int hashUnsafeWords(Object base, long offset, int lengthInBytes) { + return hashUnsafeWords(base, offset, lengthInBytes, seed); + } + + public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, int seed) { + // This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method. + assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 8 (word-aligned)"; + int h1 = hashBytesByInt(base, offset, lengthInBytes, seed); + return fmix(h1, lengthInBytes); + } + + public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) { + assert (lengthInBytes >= 0): "lengthInBytes cannot be negative"; + int lengthAligned = lengthInBytes - lengthInBytes % 4; + int h1 = hashBytesByInt(base, offset, lengthAligned, seed); + for (int i = lengthAligned; i < lengthInBytes; i++) { + int halfWord = Platform.getByte(base, offset + i); + int k1 = mixK1(halfWord); + h1 = mixH1(h1, k1); + } + return fmix(h1, lengthInBytes); + } + + private static int hashBytesByInt(Object base, long offset, int lengthInBytes, int seed) { + assert (lengthInBytes % 4 == 0); + int h1 = seed; + for (int i = 0; i < lengthInBytes; i += 4) { + int halfWord = Platform.getInt(base, offset + i); + int k1 = mixK1(halfWord); + h1 = mixH1(h1, k1); + } + return h1; + } + + public int hashLong(long input) { + return hashLong(input, seed); + } + + public static int hashLong(long input, int seed) { + int low = (int) input; + int high = (int) (input >>> 32); + + int k1 = mixK1(low); + int h1 = mixH1(seed, k1); + + k1 = mixK1(high); + h1 = mixH1(h1, k1); + + return fmix(h1, 8); + } + + private static int mixK1(int k1) { + k1 *= C1; + k1 = Integer.rotateLeft(k1, 15); + k1 *= C2; + return k1; + } + + private static int mixH1(int h1, int k1) { + h1 ^= k1; + h1 = Integer.rotateLeft(h1, 13); + h1 = h1 * 5 + 0xe6546b64; + return h1; + } + + // Finalization mix - force all bits of a hash block to avalanche + private static int fmix(int h1, int length) { + h1 ^= length; + h1 ^= h1 >>> 16; + h1 *= 0x85ebca6b; + h1 ^= h1 >>> 13; + h1 *= 0xc2b2ae35; + h1 ^= h1 >>> 16; + return h1; + } +} From d920a3a54942dbc04623914fba437c1f4fa8e7cd Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 19 Jan 2016 16:24:25 -0800 Subject: [PATCH 04/16] Initial version of CountMinSketch --- .../spark/util/sketch/CountMinSketch.java | 55 ++++ .../spark/util/sketch/CountMinSketchImpl.java | 283 ++++++++++++++++++ .../util/sketch/CountMinSketchSuite.scala | 57 ++++ 3 files changed, 395 insertions(+) create mode 100644 common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java create mode 100644 common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java create mode 100644 common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java new file mode 100644 index 000000000000..afcc3ace6bf2 --- /dev/null +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.sketch; + +import java.io.InputStream; +import java.io.OutputStream; + +abstract public class CountMinSketch { + public abstract double relativeError(); + + public abstract double confidence(); + + public abstract int depth(); + + public abstract int width(); + + public abstract long totalCount(); + + public abstract void add(Object item); + + public abstract void add(Object item, long count); + + public abstract long estimateCount(Object item); + + public abstract CountMinSketch mergeInPlace(CountMinSketch other); + + public abstract void writeTo(OutputStream out); + + public static CountMinSketch readFrom(InputStream in) { + return null; + } + + public static CountMinSketch create(int depth, int width, int seed) { + return new CountMinSketchImpl(depth, width, seed); + } + + public static CountMinSketch create(double eps, double confidence, int seed) { + return new CountMinSketchImpl(eps, confidence, seed); + } +} diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java new file mode 100644 index 000000000000..fbbd1de06b76 --- /dev/null +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.sketch; + +import java.io.OutputStream; +import java.io.UnsupportedEncodingException; +import java.util.Arrays; +import java.util.Random; + +import org.apache.spark.unsafe.Platform; +import org.apache.spark.unsafe.hash.Murmur3_x86_32; + +public class CountMinSketchImpl extends CountMinSketch { + public static final long PRIME_MODULUS = (1L << 31) - 1; + + private int depth; + private int width; + private long[][] table; + private long[] hashA; + private long totalCount; + private double eps; + private double confidence; + + public CountMinSketchImpl(int depth, int width, int seed) { + this.depth = depth; + this.width = width; + this.eps = 2.0 / width; + this.confidence = 1 - 1 / Math.pow(2, depth); + initTablesWith(depth, width, seed); + } + + public CountMinSketchImpl(double eps, double confidence, int seed) { + // 2/w = eps ; w = 2/eps + // 1/2^depth <= 1-confidence ; depth >= -log2 (1-confidence) + this.eps = eps; + this.confidence = confidence; + this.width = (int) Math.ceil(2 / eps); + this.depth = (int) Math.ceil(-Math.log(1 - confidence) / Math.log(2)); + initTablesWith(depth, width, seed); + } + + public CountMinSketchImpl(int depth, int width, long totalCount, long[] hashA, long[][] table) { + this.depth = depth; + this.width = width; + this.eps = 2.0 / width; + this.confidence = 1 - 1 / Math.pow(2, depth); + this.hashA = hashA; + this.table = table; + this.totalCount = totalCount; + } + + private void initTablesWith(int depth, int width, int seed) { + this.table = new long[depth][width]; + this.hashA = new long[depth]; + Random r = new Random(seed); + // We're using a linear hash functions + // of the form (a*x+b) mod p. + // a,b are chosen independently for each hash function. + // However we can set b = 0 as all it does is shift the results + // without compromising their uniformity or independence with + // the other hashes. + for (int i = 0; i < depth; ++i) { + hashA[i] = r.nextInt(Integer.MAX_VALUE); + } + } + + @Override + public double relativeError() { + return eps; + } + + @Override + public double confidence() { + return confidence; + } + + @Override + public int depth() { + return depth; + } + + @Override + public int width() { + return width; + } + + @Override + public long totalCount() { + return totalCount; + } + + @Override + public void add(Object item) { + add(item, 1); + } + + @Override + public void add(Object item, long count) { + if (item instanceof String) { + addString((String) item, count); + } else { + long longValue; + + if (item instanceof Long) { + longValue = (Long) item; + } else if (item instanceof Integer) { + longValue = ((Integer) item).longValue(); + } else if (item instanceof Short) { + longValue = ((Short) item).longValue(); + } else if (item instanceof Byte) { + longValue = ((Byte) item).longValue(); + } else { + throw new IllegalArgumentException( + "Support for " + item.getClass().getName() + " not implemented" + ); + } + + addLong(longValue, count); + } + } + + private void addString(String item, long count) { + if (count < 0) { + throw new IllegalArgumentException("Negative increments not implemented"); + } + + int[] buckets = getHashBuckets(item, depth, width); + + for (int i = 0; i < depth; ++i) { + table[i][buckets[i]] += count; + } + + totalCount += count; + } + + private void addLong(long item, long count) { + if (count < 0) { + throw new IllegalArgumentException("Negative increments not implemented"); + } + + for (int i = 0; i < depth; ++i) { + table[i][hash(item, i)] += count; + } + + totalCount += count; + } + + private int hash(long item, int count) { + long hash = hashA[count] * item; + // A super fast way of computing x mod 2^p-1 + // See http://www.cs.princeton.edu/courses/archive/fall09/cos521/Handouts/universalclasses.pdf + // page 149, right after Proposition 7. + hash += hash >> 32; + hash &= PRIME_MODULUS; + // Doing "%" after (int) conversion is ~2x faster than %'ing longs. + return ((int) hash) % width; + } + + private static int[] getHashBuckets(String key, int hashCount, int max) { + byte[] b; + try { + b = key.getBytes("UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + return getHashBuckets(b, hashCount, max); + } + + private static int[] getHashBuckets(byte[] b, int hashCount, int max) { + int[] result = new int[hashCount]; + int hash1 = Murmur3_x86_32.hashUnsafeBytes(b, Platform.BYTE_ARRAY_OFFSET, b.length, 0); + int hash2 = Murmur3_x86_32.hashUnsafeBytes(b, Platform.BYTE_ARRAY_OFFSET, b.length, hash1); + for (int i = 0; i < hashCount; i++) { + result[i] = Math.abs((hash1 + i * hash2) % max); + } + return result; + } + + @Override + public long estimateCount(Object item) { + if (item instanceof String) { + return estimateCountForStringItem((String) item); + } else { + long longValue; + + if (item instanceof Long) { + longValue = (Long) item; + } else if (item instanceof Integer) { + longValue = ((Integer) item).longValue(); + } else if (item instanceof Short) { + longValue = ((Short) item).longValue(); + } else if (item instanceof Byte) { + longValue = ((Byte) item).longValue(); + } else { + throw new IllegalArgumentException( + "Support for " + item.getClass().getName() + " not implemented" + ); + } + + return estimateCountForLongItem(longValue); + } + } + + private long estimateCountForLongItem(long item) { + long res = Long.MAX_VALUE; + for (int i = 0; i < depth; ++i) { + res = Math.min(res, table[i][hash(item, i)]); + } + return res; + } + + private long estimateCountForStringItem(String item) { + long res = Long.MAX_VALUE; + int[] buckets = getHashBuckets(item, depth, width); + for (int i = 0; i < depth; ++i) { + res = Math.min(res, table[i][buckets[i]]); + } + return res; + } + + @Override + public CountMinSketch mergeInPlace(CountMinSketch other) { + if (other == null) { + throw new CMSMergeException("Cannot merge null estimator"); + } + + if (!(other instanceof CountMinSketchImpl)) { + throw new CMSMergeException("Cannot merge estimator of class " + other.getClass().getName()); + } + + CountMinSketchImpl that = (CountMinSketchImpl) other; + + if (this.depth != that.depth) { + throw new CMSMergeException("Cannot merge estimators of different depth"); + } + + if (this.width != that.width) { + throw new CMSMergeException("Cannot merge estimators of different width"); + } + + long[] hashA = Arrays.copyOf(that.hashA, that.hashA.length); + if (!Arrays.equals(this.hashA, hashA)) { + throw new CMSMergeException("Cannot merge estimators of different seed"); + } + + long[][] table = new long[this.depth][this.width]; + for (int i = 0; i < table.length; ++i) { + for (int j = 0; j < table[i].length; ++j) { + table[i][j] = this.table[i][j] + that.table[i][j]; + } + } + + long totalCount = this.totalCount + that.totalCount; + + return new CountMinSketchImpl(this.depth, this.width, totalCount, hashA, table); + } + + @Override + public void writeTo(OutputStream out) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + protected static class CMSMergeException extends RuntimeException { + public CMSMergeException(String message) { + super(message); + } + } +} diff --git a/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala new file mode 100644 index 000000000000..7de16c66a77d --- /dev/null +++ b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.sketch + +import java.util.Random + +import org.scalatest.FunSuite // scalastyle:ignore funsuite + +class CountMinSketchSuite extends FunSuite { // scalastyle:ignore funsuite + test("accuracy - long") { + val epsOfTotalCount = 0.0001 + val confidence = 0.99 + val seed = 42 + + val numItems = 1000000 + val maxScale = 20 + val items = { + val r = new Random(seed) + Array.fill(numItems) { r.nextInt(1 << r.nextInt(maxScale)) } + } + + val sketch = CountMinSketch.create(epsOfTotalCount, confidence, seed) + items.foreach(sketch.add) + + val exactFreq = Array.fill(1 << maxScale)(0) + items.foreach(exactFreq(_) += 1) + + val pCorrect = { + val numErrors = exactFreq.zipWithIndex.map { case (f, i) => + val ratio = (sketch.estimateCount(i) - f).toDouble / numItems + if (ratio > epsOfTotalCount) 1 else 0 + }.sum + + 1.0 - numErrors.toDouble / exactFreq.length + } + + assert( + pCorrect > confidence, + s"Confidence not reached: required $confidence, reached $pCorrect" + ) + } +} From 36ce0774d6be1fbfe60c56489bc62546501fce7a Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 20 Jan 2016 12:02:08 -0800 Subject: [PATCH 05/16] Tests accuracy for all supported types --- .../util/sketch/CountMinSketchSuite.scala | 73 ++++++++++++------- 1 file changed, 46 insertions(+), 27 deletions(-) diff --git a/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala index 7de16c66a77d..1812b0ca1387 100644 --- a/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala +++ b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala @@ -17,41 +17,60 @@ package org.apache.spark.util.sketch -import java.util.Random +import scala.reflect.ClassTag +import scala.util.Random import org.scalatest.FunSuite // scalastyle:ignore funsuite class CountMinSketchSuite extends FunSuite { // scalastyle:ignore funsuite - test("accuracy - long") { - val epsOfTotalCount = 0.0001 - val confidence = 0.99 - val seed = 42 - - val numItems = 1000000 - val maxScale = 20 - val items = { - val r = new Random(seed) - Array.fill(numItems) { r.nextInt(1 << r.nextInt(maxScale)) } - } + private val epsOfTotalCount = 0.0001 - val sketch = CountMinSketch.create(epsOfTotalCount, confidence, seed) - items.foreach(sketch.add) + private val confidence = 0.99 - val exactFreq = Array.fill(1 << maxScale)(0) - items.foreach(exactFreq(_) += 1) + private val seed = 42 - val pCorrect = { - val numErrors = exactFreq.zipWithIndex.map { case (f, i) => - val ratio = (sketch.estimateCount(i) - f).toDouble / numItems - if (ratio > epsOfTotalCount) 1 else 0 - }.sum + def testAccuracy[T: ClassTag](typeName: String)(itemGenerator: Random => T) { + test(s"accuracy - $typeName") { + val r = new Random() - 1.0 - numErrors.toDouble / exactFreq.length - } + val numAllItems = 1000000 + val allItems = Array.fill(numAllItems)(itemGenerator(r)) + + val numSamples = numAllItems / 10 + val sampledItemIndices = Array.fill(numSamples)(r.nextInt(numAllItems)) + + val exactFreq = { + val sampledItems = sampledItemIndices.map(allItems) + sampledItems.groupBy(identity).mapValues(_.length.toLong) + } + + val sketch = CountMinSketch.create(epsOfTotalCount, confidence, seed) + sampledItemIndices.foreach(i => sketch.add(allItems(i))) + + val probCorrect = { + val numErrors = allItems.map { item => + val count = exactFreq.getOrElse(item, 0L) + val ratio = (sketch.estimateCount(item) - count).toDouble / numAllItems + if (ratio > epsOfTotalCount) 1 else 0 + }.sum + + 1D - numErrors.toDouble / numAllItems + } - assert( - pCorrect > confidence, - s"Confidence not reached: required $confidence, reached $pCorrect" - ) + assert( + probCorrect > confidence, + s"Confidence not reached: required $confidence, reached $probCorrect" + ) + } } + + testAccuracy[Byte]("Byte") { _.nextInt.toByte } + + testAccuracy[Short]("Short") { _.nextInt.toShort } + + testAccuracy[Int]("Int") { _.nextInt } + + testAccuracy[Long]("Long") { _.nextLong() } + + testAccuracy[String]("String") { r => r.nextString(r.nextInt(20)) } } From 814317fea4345d06b6ebfc9e9b093ccd0d6182f1 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 20 Jan 2016 13:32:36 -0800 Subject: [PATCH 06/16] Tests mergeInPlace --- .../spark/util/sketch/CountMinSketch.java | 2 +- .../util/sketch/CountMinSketchSuite.scala | 48 ++++++++++++++++--- 2 files changed, 43 insertions(+), 7 deletions(-) diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java index afcc3ace6bf2..49e724aef7b4 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java @@ -42,7 +42,7 @@ abstract public class CountMinSketch { public abstract void writeTo(OutputStream out); public static CountMinSketch readFrom(InputStream in) { - return null; + throw new UnsupportedOperationException("Not implemented yet"); } public static CountMinSketch create(int depth, int width, int seed) { diff --git a/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala index 1812b0ca1387..312a6751d8aa 100644 --- a/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala +++ b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala @@ -29,7 +29,7 @@ class CountMinSketchSuite extends FunSuite { // scalastyle:ignore funsuite private val seed = 42 - def testAccuracy[T: ClassTag](typeName: String)(itemGenerator: Random => T) { + def testAccuracy[T: ClassTag](typeName: String)(itemGenerator: Random => T): Unit = { test(s"accuracy - $typeName") { val r = new Random() @@ -64,13 +64,49 @@ class CountMinSketchSuite extends FunSuite { // scalastyle:ignore funsuite } } - testAccuracy[Byte]("Byte") { _.nextInt.toByte } + def testMerge[T: ClassTag](typeName: String)(itemGenerator: Random => T): Unit = { + test(s"mergeInPlace - $typeName") { + val r = new Random() + val numToMerge = 5 + val numItemsPerSketch = 100000 + val perSketchItems = Array.fill(numToMerge, numItemsPerSketch) { + itemGenerator(r) + } + + val sketches = perSketchItems.map { items => + val sketch = CountMinSketch.create(epsOfTotalCount, confidence, seed) + items.foreach(sketch.add) + sketch + } + + val mergedSketch = sketches.reduce(_ mergeInPlace _) + + val expectedSketch = { + val sketch = CountMinSketch.create(epsOfTotalCount, confidence, seed) + perSketchItems.foreach(_.foreach(sketch.add)) + sketch + } + + perSketchItems.foreach { + _.foreach { item => + assert(mergedSketch.estimateCount(item) === expectedSketch.estimateCount(item)) + } + } + } + } + + def testItemType[T: ClassTag](typeName: String)(itemGenerator: Random => T): Unit = { + testAccuracy[T](typeName)(itemGenerator) + testMerge[T](typeName)(itemGenerator) + } + + testItemType[Byte]("Byte") { _.nextInt.toByte } - testAccuracy[Short]("Short") { _.nextInt.toShort } + testItemType[Short]("Short") { _.nextInt.toShort } - testAccuracy[Int]("Int") { _.nextInt } + testItemType[Int]("Int") { _.nextInt } - testAccuracy[Long]("Long") { _.nextLong() } + testItemType[Long]("Long") { _.nextLong() } - testAccuracy[String]("String") { r => r.nextString(r.nextInt(20)) } + testItemType[String]("String") { r => r.nextString(r.nextInt(20)) } } From 3d3d81cd376b88df06cce6a0ac7e08324e6a9d7a Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 20 Jan 2016 14:53:27 -0800 Subject: [PATCH 07/16] Adds sketch module to sparktestsupport --- dev/sparktestsupport/modules.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index efe58ea2e0e7..032c0616edb1 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -113,6 +113,18 @@ def contains_file(self, filename): ) +sketch = Module( + name="sketch", + dependencies=[], + source_file_regexes=[ + "common/sketch/", + ], + sbt_test_goals=[ + "sketch/test" + ] +) + + graphx = Module( name="graphx", dependencies=[], From 9dd56e1eaea71327fbaf4bbfcbdfce35adc79c6d Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 20 Jan 2016 15:00:37 -0800 Subject: [PATCH 08/16] Moves Murmur3_x86_32 and Platform to package o.a.s.util.sketch.unsafe --- .../org/apache/spark/util/sketch/CountMinSketchImpl.java | 5 +++-- .../org/apache/spark/{ => util/sketch}/unsafe/Platform.java | 2 +- .../spark/{ => util/sketch}/unsafe/hash/Murmur3_x86_32.java | 4 ++-- .../org/apache/spark/util/sketch/CountMinSketchSuite.scala | 6 +++--- 4 files changed, 9 insertions(+), 8 deletions(-) rename common/sketch/src/main/java/org/apache/spark/{ => util/sketch}/unsafe/Platform.java (99%) rename common/sketch/src/main/java/org/apache/spark/{ => util/sketch}/unsafe/hash/Murmur3_x86_32.java (97%) diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java index fbbd1de06b76..0db3644df5ff 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java @@ -22,9 +22,10 @@ import java.util.Arrays; import java.util.Random; -import org.apache.spark.unsafe.Platform; -import org.apache.spark.unsafe.hash.Murmur3_x86_32; +import org.apache.spark.util.sketch.unsafe.Platform; +import org.apache.spark.util.sketch.unsafe.hash.Murmur3_x86_32; +// This class is largely based on the `CountMinSketch` class from stream-lib. public class CountMinSketchImpl extends CountMinSketch { public static final long PRIME_MODULUS = (1L << 31) - 1; diff --git a/common/sketch/src/main/java/org/apache/spark/unsafe/Platform.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/unsafe/Platform.java similarity index 99% rename from common/sketch/src/main/java/org/apache/spark/unsafe/Platform.java rename to common/sketch/src/main/java/org/apache/spark/util/sketch/unsafe/Platform.java index 0d6b215fe5aa..e2b962715a66 100644 --- a/common/sketch/src/main/java/org/apache/spark/unsafe/Platform.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/unsafe/Platform.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.unsafe; +package org.apache.spark.util.sketch.unsafe; import java.lang.reflect.Field; diff --git a/common/sketch/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/unsafe/hash/Murmur3_x86_32.java similarity index 97% rename from common/sketch/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java rename to common/sketch/src/main/java/org/apache/spark/util/sketch/unsafe/hash/Murmur3_x86_32.java index 5e7ee480cafd..6e523ead0c1c 100644 --- a/common/sketch/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/unsafe/hash/Murmur3_x86_32.java @@ -15,9 +15,9 @@ * limitations under the License. */ -package org.apache.spark.unsafe.hash; +package org.apache.spark.util.sketch.unsafe.hash; -import org.apache.spark.unsafe.Platform; +import org.apache.spark.util.sketch.unsafe.Platform; /** * 32-bit Murmur3 hasher. This is based on Guava's Murmur3_32HashFunction. diff --git a/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala index 312a6751d8aa..dbc93a28743d 100644 --- a/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala +++ b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala @@ -100,11 +100,11 @@ class CountMinSketchSuite extends FunSuite { // scalastyle:ignore funsuite testMerge[T](typeName)(itemGenerator) } - testItemType[Byte]("Byte") { _.nextInt.toByte } + testItemType[Byte]("Byte") { _.nextInt().toByte } - testItemType[Short]("Short") { _.nextInt.toShort } + testItemType[Short]("Short") { _.nextInt().toShort } - testItemType[Int]("Int") { _.nextInt } + testItemType[Int]("Int") { _.nextInt() } testItemType[Long]("Long") { _.nextLong() } From 1208a0f426c5fa1dfc7e78708df75b6522d221f4 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 20 Jan 2016 15:18:39 -0800 Subject: [PATCH 09/16] Ensures that mergeInPlace merges in place --- .../spark/util/sketch/CountMinSketchImpl.java | 14 ++++++-------- .../spark/util/sketch/CountMinSketchSuite.scala | 4 ++-- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java index 0db3644df5ff..83b52067b336 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java @@ -254,21 +254,19 @@ public CountMinSketch mergeInPlace(CountMinSketch other) { throw new CMSMergeException("Cannot merge estimators of different width"); } - long[] hashA = Arrays.copyOf(that.hashA, that.hashA.length); - if (!Arrays.equals(this.hashA, hashA)) { + if (!Arrays.equals(this.hashA, that.hashA)) { throw new CMSMergeException("Cannot merge estimators of different seed"); } - long[][] table = new long[this.depth][this.width]; - for (int i = 0; i < table.length; ++i) { - for (int j = 0; j < table[i].length; ++j) { - table[i][j] = this.table[i][j] + that.table[i][j]; + for (int i = 0; i < this.table.length; ++i) { + for (int j = 0; j < this.table[i].length; ++j) { + this.table[i][j] = this.table[i][j] + that.table[i][j]; } } - long totalCount = this.totalCount + that.totalCount; + this.totalCount += that.totalCount; - return new CountMinSketchImpl(this.depth, this.width, totalCount, hashA, table); + return this; } @Override diff --git a/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala index dbc93a28743d..ec5b4eddeca0 100644 --- a/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala +++ b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala @@ -64,7 +64,7 @@ class CountMinSketchSuite extends FunSuite { // scalastyle:ignore funsuite } } - def testMerge[T: ClassTag](typeName: String)(itemGenerator: Random => T): Unit = { + def testMergeInPlace[T: ClassTag](typeName: String)(itemGenerator: Random => T): Unit = { test(s"mergeInPlace - $typeName") { val r = new Random() val numToMerge = 5 @@ -97,7 +97,7 @@ class CountMinSketchSuite extends FunSuite { // scalastyle:ignore funsuite def testItemType[T: ClassTag](typeName: String)(itemGenerator: Random => T): Unit = { testAccuracy[T](typeName)(itemGenerator) - testMerge[T](typeName)(itemGenerator) + testMergeInPlace[T](typeName)(itemGenerator) } testItemType[Byte]("Byte") { _.nextInt().toByte } From 64b21628b887f12e8b570bd4dd6d1e721f30baab Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 20 Jan 2016 16:15:45 -0800 Subject: [PATCH 10/16] Javadoc --- .../spark/util/sketch/CountMinSketch.java | 77 +++++++++++++++++++ .../spark/util/sketch/CountMinSketchImpl.java | 1 - 2 files changed, 77 insertions(+), 1 deletion(-) diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java index 49e724aef7b4..ab929ba17113 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java @@ -20,35 +20,112 @@ import java.io.InputStream; import java.io.OutputStream; +/** + * An implementation of Count-Min sketch data structure for the following data types: + *
    + *
  • {@link Byte}
  • + *
  • {@link Short}
  • + *
  • {@link Integer}
  • + *
  • {@link Long}
  • + *
  • {@link String}
  • + *
+ * A Count-Min sketch is a probabilistic data structure used for summarizing streams of data in + * sub-linear space. Each {@link CountMinSketch} is initialized with a random seed, and a pair + * of parameters: + *
    + *
  1. relative error (or {@code eps}), and + *
  2. confidence (or {@code delta}) + *
+ * Suppose you want to estimate the number of times an element {@code x} has appeared in a data + * stream so far. With probability {@code delta}, the estimate of this frequency is within the + * range {@code true frequency <= estimate <= true frequency + eps * N}, where {@code N} is the + * total count of items have appeared the the data stream so far. + * + * Under the cover, a {@link CountMinSketch} is essentially a two-dimensional {@code long} array + * with depth {@code d} and width {@code w}, where + *
    + *
  • {@code d = ceil(2 / eps)}
  • + *
  • {@code w = ceil(-log(1 - confidence) / log(2))}
  • + *
+ * + * See http://www.eecs.harvard.edu/~michaelm/CS222/countmin.pdf for technical details, + * including proofs of the estimates and error bounds used in this implementation. + * + * This implementation is largely based on the {@code CountMinSketch} class from stream-lib. + */ abstract public class CountMinSketch { + /** + * Returns the relative error (or {@code eps}) of this {@link CountMinSketch}. + */ public abstract double relativeError(); + /** + * Returns the confidence (or {@code delta}) of this {@link CountMinSketch}. + */ public abstract double confidence(); + /** + * Depth of this {@link CountMinSketch}. + */ public abstract int depth(); + /** + * Width of this {@link CountMinSketch}. + */ public abstract int width(); + /** + * Total count of items added to this {@link CountMinSketch} so far. + */ public abstract long totalCount(); + /** + * Adds 1 to {@code item}. + */ public abstract void add(Object item); + /** + * Adds {@code count} to {@code item}. + */ public abstract void add(Object item, long count); + /** + * Returns the estimated frequency of {@code item}. + */ public abstract long estimateCount(Object item); + /** + * Merges another {@link CountMinSketch} with this one in place. + * + * Note that only Count-Min sketches with the same {@code depth}, {@code width}, and random seed + * can be merged. + */ public abstract CountMinSketch mergeInPlace(CountMinSketch other); + /** + * Writes out this {@link CountMinSketch} to an output stream in binary format. + */ public abstract void writeTo(OutputStream out); + /** + * Reads in a {@link CountMinSketch} from an input stream. + */ public static CountMinSketch readFrom(InputStream in) { throw new UnsupportedOperationException("Not implemented yet"); } + /** + * Creates a {@link CountMinSketch} with given {@code depth}, {@code width}, and random + * {@code seed}. + */ public static CountMinSketch create(int depth, int width, int seed) { return new CountMinSketchImpl(depth, width, seed); } + /** + * Creates a {@link CountMinSketch} with given relative error ({@code eps}), {@code confidence}, + * and random {@code seed}. + */ public static CountMinSketch create(double eps, double confidence, int seed) { return new CountMinSketchImpl(eps, confidence, seed); } diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java index 83b52067b336..ef73c6745bd5 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java @@ -25,7 +25,6 @@ import org.apache.spark.util.sketch.unsafe.Platform; import org.apache.spark.util.sketch.unsafe.hash.Murmur3_x86_32; -// This class is largely based on the `CountMinSketch` class from stream-lib. public class CountMinSketchImpl extends CountMinSketch { public static final long PRIME_MODULUS = (1L << 31) - 1; From 600e25a661ce953310889239b46b4be6cc714350 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 20 Jan 2016 18:00:38 -0800 Subject: [PATCH 11/16] Adds sketch to SparkBuild --- project/SparkBuild.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 3927b88fb0bf..bb03ef9d19d0 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -36,11 +36,13 @@ object BuildCommons { val allProjects@Seq(catalyst, core, graphx, hive, hiveThriftServer, mllib, repl, sql, networkCommon, networkShuffle, streaming, streamingFlumeSink, streamingFlume, streamingAkka, streamingKafka, - streamingMqtt, streamingTwitter, streamingZeromq, launcher, unsafe, testTags) = - Seq("catalyst", "core", "graphx", "hive", "hive-thriftserver", "mllib", "repl", + streamingMqtt, streamingTwitter, streamingZeromq, launcher, unsafe, testTags, sketch) = + Seq( + "catalyst", "core", "graphx", "hive", "hive-thriftserver", "mllib", "repl", "sql", "network-common", "network-shuffle", "streaming", "streaming-flume-sink", "streaming-flume", "streaming-akka", "streaming-kafka", "streaming-mqtt", "streaming-twitter", - "streaming-zeromq", "launcher", "unsafe", "test-tags").map(ProjectRef(buildLocation, _)) + "streaming-zeromq", "launcher", "unsafe", "test-tags", "sketch" + ).map(ProjectRef(buildLocation, _)) val optionallyEnabledProjects@Seq(yarn, java8Tests, sparkGangliaLgpl, streamingKinesisAsl, dockerIntegrationTests) = From 1f0a120b76fe52f9fa87eb0ed01a47babe02601e Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 20 Jan 2016 18:59:26 -0800 Subject: [PATCH 12/16] Moves Platform and Murmur3_x86_32 to o.a.s.u.sketch and makes them package private --- .../org/apache/spark/util/sketch/CountMinSketchImpl.java | 5 +---- .../spark/util/sketch/{unsafe/hash => }/Murmur3_x86_32.java | 6 ++---- .../org/apache/spark/util/sketch/{unsafe => }/Platform.java | 4 ++-- 3 files changed, 5 insertions(+), 10 deletions(-) rename common/sketch/src/main/java/org/apache/spark/util/sketch/{unsafe/hash => }/Murmur3_x86_32.java (96%) rename common/sketch/src/main/java/org/apache/spark/util/sketch/{unsafe => }/Platform.java (98%) diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java index ef73c6745bd5..0fbf280e98e5 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java @@ -22,10 +22,7 @@ import java.util.Arrays; import java.util.Random; -import org.apache.spark.util.sketch.unsafe.Platform; -import org.apache.spark.util.sketch.unsafe.hash.Murmur3_x86_32; - -public class CountMinSketchImpl extends CountMinSketch { +class CountMinSketchImpl extends CountMinSketch { public static final long PRIME_MODULUS = (1L << 31) - 1; private int depth; diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/unsafe/hash/Murmur3_x86_32.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/Murmur3_x86_32.java similarity index 96% rename from common/sketch/src/main/java/org/apache/spark/util/sketch/unsafe/hash/Murmur3_x86_32.java rename to common/sketch/src/main/java/org/apache/spark/util/sketch/Murmur3_x86_32.java index 6e523ead0c1c..475c63eaf1eb 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/unsafe/hash/Murmur3_x86_32.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/Murmur3_x86_32.java @@ -15,14 +15,12 @@ * limitations under the License. */ -package org.apache.spark.util.sketch.unsafe.hash; - -import org.apache.spark.util.sketch.unsafe.Platform; +package org.apache.spark.util.sketch; /** * 32-bit Murmur3 hasher. This is based on Guava's Murmur3_32HashFunction. */ -public final class Murmur3_x86_32 { +final class Murmur3_x86_32 { private static final int C1 = 0xcc9e2d51; private static final int C2 = 0x1b873593; diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/unsafe/Platform.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/Platform.java similarity index 98% rename from common/sketch/src/main/java/org/apache/spark/util/sketch/unsafe/Platform.java rename to common/sketch/src/main/java/org/apache/spark/util/sketch/Platform.java index e2b962715a66..2a37fb127ac9 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/unsafe/Platform.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/Platform.java @@ -15,13 +15,13 @@ * limitations under the License. */ -package org.apache.spark.util.sketch.unsafe; +package org.apache.spark.util.sketch; import java.lang.reflect.Field; import sun.misc.Unsafe; -public final class Platform { +final class Platform { private static final Unsafe _UNSAFE; From e06ff13396c9905fb63d3e84f70c64690880b72d Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 20 Jan 2016 19:00:29 -0800 Subject: [PATCH 13/16] Removes unused CountMinSketchImpl constructor --- .../apache/spark/util/sketch/CountMinSketchImpl.java | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java index 0fbf280e98e5..e9fdbe3a8686 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java @@ -51,16 +51,6 @@ public CountMinSketchImpl(double eps, double confidence, int seed) { initTablesWith(depth, width, seed); } - public CountMinSketchImpl(int depth, int width, long totalCount, long[] hashA, long[][] table) { - this.depth = depth; - this.width = width; - this.eps = 2.0 / width; - this.confidence = 1 - 1 / Math.pow(2, depth); - this.hashA = hashA; - this.table = table; - this.totalCount = totalCount; - } - private void initTablesWith(int depth, int width, int seed) { this.table = new long[depth][width]; this.hashA = new long[depth]; From 4adb57a688036313975699a898a6719df2585c53 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 22 Jan 2016 12:50:31 -0800 Subject: [PATCH 14/16] Fixes SparkBuild --- project/SparkBuild.scala | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index bb03ef9d19d0..e3f7f903616e 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -34,15 +34,24 @@ object BuildCommons { private val buildLocation = file(".").getAbsoluteFile.getParentFile - val allProjects@Seq(catalyst, core, graphx, hive, hiveThriftServer, mllib, repl, - sql, networkCommon, networkShuffle, streaming, streamingFlumeSink, streamingFlume, streamingAkka, streamingKafka, - streamingMqtt, streamingTwitter, streamingZeromq, launcher, unsafe, testTags, sketch) = - Seq( - "catalyst", "core", "graphx", "hive", "hive-thriftserver", "mllib", "repl", - "sql", "network-common", "network-shuffle", "streaming", "streaming-flume-sink", - "streaming-flume", "streaming-akka", "streaming-kafka", "streaming-mqtt", "streaming-twitter", - "streaming-zeromq", "launcher", "unsafe", "test-tags", "sketch" - ).map(ProjectRef(buildLocation, _)) + val sqlProjects@Seq(catalyst, sql, hive, hiveThriftServer) = Seq( + "catalyst", "sql", "hive", "hive-thriftserver" + ).map(ProjectRef(buildLocation, _)) + + val streamingProjects@Seq( + streaming, streamingFlumeSink, streamingFlume, streamingAkka, streamingKafka, streamingMqtt, + streamingTwitter, streamingZeromq + ) = Seq( + "streaming", "streaming-flume-sink", "streaming-flume", "streaming-akka", "streaming-kafka", + "streaming-mqtt", "streaming-twitter", "streaming-zeromq" + ).map(ProjectRef(buildLocation, _)) + + val allProjects@Seq( + core, graphx, mllib, repl, networkCommon, networkShuffle, launcher, unsafe, testTags, sketch, _* + ) = Seq( + "core", "graphx", "mllib", "repl", "network-common", "network-shuffle", "launcher", "unsafe", + "test-tags", "sketch" + ).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects val optionallyEnabledProjects@Seq(yarn, java8Tests, sparkGangliaLgpl, streamingKinesisAsl, dockerIntegrationTests) = From 57a31e67f3d2ef647d182268e1fa820927d4b6e4 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 22 Jan 2016 16:00:52 -0800 Subject: [PATCH 15/16] Excludes sketch from MIMA check --- project/SparkBuild.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index e3f7f903616e..4224a65a822b 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -243,11 +243,15 @@ object SparkBuild extends PomBuild { /* Enable tests settings for all projects except examples, assembly and tools */ (allProjects ++ optionallyEnabledProjects).foreach(enable(TestSettings.settings)) - // TODO: remove streamingAkka from this list after 2.0.0 - allProjects.filterNot(x => Seq(spark, hive, hiveThriftServer, catalyst, repl, - networkCommon, networkShuffle, networkYarn, unsafe, streamingAkka, testTags).contains(x)).foreach { - x => enable(MimaBuild.mimaSettings(sparkHome, x))(x) - } + // TODO: remove streamingAkka and sketch from this list after 2.0.0 + allProjects.filterNot { x => + Seq( + spark, hive, hiveThriftServer, catalyst, repl, networkCommon, networkShuffle, networkYarn, + unsafe, streamingAkka, testTags, sketch + ).contains(x) + }.foreach { x => + enable(MimaBuild.mimaSettings(sparkHome, x))(x) + } /* Unsafe settings */ enable(Unsafe.settings)(unsafe) From 65853adaa563d24eac10f0086190fba9bc5bfc23 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 22 Jan 2016 22:10:22 -0800 Subject: [PATCH 16/16] Addresses comments --- .../java/org/apache/spark/util/sketch/CountMinSketch.java | 6 +++--- .../java/org/apache/spark/util/sketch/Murmur3_x86_32.java | 2 ++ .../main/java/org/apache/spark/util/sketch/Platform.java | 2 ++ 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java index ab929ba17113..21b161bc74ae 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java @@ -21,7 +21,8 @@ import java.io.OutputStream; /** - * An implementation of Count-Min sketch data structure for the following data types: + * A Count-Min sketch is a probabilistic data structure used for summarizing streams of data in + * sub-linear space. Currently, supported data types include: *
    *
  • {@link Byte}
  • *
  • {@link Short}
  • @@ -29,8 +30,7 @@ *
  • {@link Long}
  • *
  • {@link String}
  • *
- * A Count-Min sketch is a probabilistic data structure used for summarizing streams of data in - * sub-linear space. Each {@link CountMinSketch} is initialized with a random seed, and a pair + * Each {@link CountMinSketch} is initialized with a random seed, and a pair * of parameters: *
    *
  1. relative error (or {@code eps}), and diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/Murmur3_x86_32.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/Murmur3_x86_32.java index 475c63eaf1eb..3d1f28bcb911 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/Murmur3_x86_32.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/Murmur3_x86_32.java @@ -20,6 +20,8 @@ /** * 32-bit Murmur3 hasher. This is based on Guava's Murmur3_32HashFunction. */ +// This class is duplicated from `org.apache.spark.unsafe.hash.Murmur3_x86_32` to make sure +// spark-sketch has no external dependencies. final class Murmur3_x86_32 { private static final int C1 = 0xcc9e2d51; private static final int C2 = 0x1b873593; diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/Platform.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/Platform.java index 2a37fb127ac9..75d6a6beec40 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/Platform.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/Platform.java @@ -21,6 +21,8 @@ import sun.misc.Unsafe; +// This class is duplicated from `org.apache.spark.unsafe.Platform` to make sure spark-sketch has no +// external dependencies. final class Platform { private static final Unsafe _UNSAFE;