diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java
index c0b425e72959..d05e0cd13031 100644
--- a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java
+++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java
@@ -80,6 +80,24 @@ int getVersionNumber() {
*/
public abstract long bitSize();
+ /**
+ * Swamidass & Baldi (2007) approximation for number of items in a Bloom filter
+ *
+ * n* = - m/k * ln(1- X/m)
+ * where:
+ * n* = the estimated number of items in the Bloom filter,
+ * k = the number of hash functions used (k-fold compression),
+ * m = the length of the filter,
+ * X = the number of bits set to one
+ *
+ * Note: the approximation is not valid when the Bloom filter is close to full
+ * since it yields a diverging value.
+ *
+ * @see
+ * Mathematical Correction for Fingerprint Similarity Measures to Improve Chemical Retrieval
+ */
+ public abstract double approxItems();
+
/**
* Puts an item into this {@code BloomFilter}. Ensures that subsequent invocations of
* {@linkplain #mightContain(Object)} with the same item will always return {@code true}.
@@ -147,6 +165,35 @@ int getVersionNumber() {
*/
public abstract boolean mightContainBinary(byte[] item);
+ /**
+ * Returns a new Bloom filter of the union of two Bloom filters.
+ * Unlike mergeInplace, this will not cause a mutation.
+ * Callers must ensure the bloom filters are appropriately sized to avoid saturating them.
+ *
+ * @param other The bloom filter to union this bloom filter with.
+ * @throws IncompatibleUnionException if {@code isCompatible(other) == false}
+ * @see #approxItems()
+ */
+ public abstract BloomFilterImpl union(BloomFilter other) throws IncompatibleUnionException;
+
+ /**
+ * Swamidass & Baldi (2007) approximation for number of items in the intersection of two Bloom filters
+ *
+ * n(A* ∩ B*) = n(A*) + n(B*) - n(A* ∪ B*)
+ * The approx. of the intersection is the approx. of A plus B minus the approx. of their union
+ *
+ * Running approxItems() directly on A ∩ B leads to overestimation because "some bits in A ∩ B are
+ * set to 1 by chance and do not correspond to a compression of bits present in the uncompressed intersection vector
+ * (A->)* ∩ (B->)*"
+ *
+ * @param other The bloom filter to intersect this bloom filter with.
+ * @throws IncompatibleUnionException if {@code isCompatible(other) == false}
+ * @see #approxItems()
+ * @see
+ * Mathematical Correction for Fingerprint Similarity Measures to Improve Chemical Retrieval
+ */
+ public abstract double approxItemsInIntersection(BloomFilter that) throws IncompatibleUnionException;
+
/**
* Writes out this {@link BloomFilter} to an output stream in binary format. It is the caller's
* responsibility to close the stream.
diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java
index 92c28bcb56a5..f3b0d3f87107 100644
--- a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java
+++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java
@@ -220,6 +220,49 @@ public BloomFilter mergeInPlace(BloomFilter other) throws IncompatibleMergeExcep
return this;
}
+ @Override
+ public double approxItems() {
+ double m = bitSize();
+ return (-m / numHashFunctions) * Math.log(1 - (bits.cardinality() / m));
+ }
+
+ @Override
+ public BloomFilterImpl union(BloomFilter other) throws IncompatibleUnionException {
+ // Duplicates the logic of `isCompatible` here to provide better error message.
+ if (other == null) {
+ throw new IncompatibleUnionException("Cannot union null bloom filters");
+ }
+
+ if (!(other instanceof BloomFilterImpl)) {
+ throw new IncompatibleUnionException(
+ "Cannot union bloom filter of class " + other.getClass().getName()
+ );
+ }
+
+ BloomFilterImpl that = (BloomFilterImpl) other;
+
+ if (this.bitSize() != that.bitSize()) {
+ throw new IncompatibleUnionException("Cannot union bloom filters with different bit size");
+ }
+
+ if (this.numHashFunctions != that.numHashFunctions) {
+ throw new IncompatibleUnionException("Cannot union bloom filters with different number of hash functions");
+ }
+
+ BloomFilterImpl bfUnion = (BloomFilterImpl)BloomFilter.create(bitSize()/Long.SIZE);
+
+ bfUnion.bits.putAll(this.bits);
+ bfUnion.bits.putAll(that.bits);
+ return bfUnion;
+ }
+
+ @Override
+ public double approxItemsInIntersection(BloomFilter that) throws IncompatibleUnionException {
+ BloomFilterImpl union = union(that);
+
+ return this.approxItems() + that.approxItems() - union.approxItems();
+ }
+
@Override
public void writeTo(OutputStream out) throws IOException {
DataOutputStream dos = new DataOutputStream(out);
diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/IncompatibleUnionException.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/IncompatibleUnionException.java
new file mode 100644
index 000000000000..5ac68e5d5ed4
--- /dev/null
+++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/IncompatibleUnionException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.sketch;
+
+public class IncompatibleUnionException extends Exception {
+ public IncompatibleUnionException(String message) {
+ super(message);
+ }
+}
diff --git a/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala b/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala
index a0408d2da4df..68612d26c1f8 100644
--- a/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala
+++ b/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala
@@ -99,6 +99,20 @@ class BloomFilterSuite extends FunSuite { // scalastyle:ignore funsuite
}
}
+ def testApproxItems[T: ClassTag](): Unit = {
+ test("approxItems") {
+ val filter1 = BloomFilter.create(10)
+ val filter2 = BloomFilter.create(10000)
+ for (x <- 1 to 1000) {
+ filter1.putLong(x)
+ filter2.putLong(x)
+ }
+
+ assert(filter1.approxItems().isInfinite)
+ assert(!filter2.approxItems().isInfinite)
+ }
+ }
+
def testItemType[T: ClassTag](typeName: String, numItems: Int)(itemGen: Random => T): Unit = {
testAccuracy[T](typeName, numItems)(itemGen)
testMergeInPlace[T](typeName, numItems)(itemGen)
@@ -131,4 +145,6 @@ class BloomFilterSuite extends FunSuite { // scalastyle:ignore funsuite
filter1.mergeInPlace(filter2)
}
}
+
+ testApproxItems()
}