Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
*/
public abstract class BloomFilter {

protected BloomFilterStrategy strategy = BloomFilterStrategies.HASH_32;

public enum Version {
/**
* {@code BloomFilter} binary format version 1. All values written in big-endian order:
Expand Down Expand Up @@ -223,6 +225,24 @@ public static long optimalNumOfBits(long expectedNumItems, long maxNumItems, lon
return Math.min(optimalNumOfBits(expectedNumItems, fpp), maxNumOfBits);
}

/**
* Returns the strategy that is used by the current implementation.
*/
public BloomFilterStrategy currentStrategy(){
// Since the optimal strategy selection depends on the bit size, we can obtain it
// from the current bit size of the filter.
return optimalStrategy(this.bitSize());
}

/**
* Determines the most suitable strategy to use based on the bit size.
*/
public static BloomFilterStrategy optimalStrategy(long numBits){
return numBits >= Integer.MAX_VALUE
? BloomFilterStrategies.HASH_128
: BloomFilterStrategies.HASH_32;
}

/**
* Creates a {@link BloomFilter} with the expected number of insertions and a default expected
* false positive probability of 3%.
Expand Down Expand Up @@ -264,6 +284,7 @@ public static BloomFilter create(long expectedNumItems, long numBits) {
throw new IllegalArgumentException("Number of bits must be positive");
}

return new BloomFilterImpl(optimalNumOfHashFunctions(expectedNumItems, numBits), numBits);
return new BloomFilterImpl(optimalNumOfHashFunctions(expectedNumItems, numBits),
numBits, optimalStrategy(numBits));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ class BloomFilterImpl extends BloomFilter implements Serializable {

private BitArray bits;

BloomFilterImpl(int numHashFunctions, long numBits) {
this(new BitArray(numBits), numHashFunctions);
BloomFilterImpl(int numHashFunctions, long numBits, BloomFilterStrategy strategy) {
this(new BitArray(numBits), numHashFunctions, strategy);
}

private BloomFilterImpl(BitArray bits, int numHashFunctions) {
private BloomFilterImpl(BitArray bits, int numHashFunctions, BloomFilterStrategy strategy) {
this.bits = bits;
this.numHashFunctions = numHashFunctions;
this.strategy = strategy;
}

private BloomFilterImpl() {}
Expand Down Expand Up @@ -77,102 +78,17 @@ public boolean put(Object item) {

@Override
public boolean putString(String item) {
return putBinary(Utils.getBytesFromUTF8String(item));
}

@Override
public boolean putBinary(byte[] item) {
int h1 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, 0);
int h2 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, h1);

long bitSize = bits.bitSize();
boolean bitsChanged = false;
for (int i = 1; i <= numHashFunctions; i++) {
int combinedHash = h1 + (i * h2);
// Flip all the bits if it's negative (guaranteed positive number)
if (combinedHash < 0) {
combinedHash = ~combinedHash;
}
bitsChanged |= bits.set(combinedHash % bitSize);
}
return bitsChanged;
}

@Override
public boolean mightContainString(String item) {
return mightContainBinary(Utils.getBytesFromUTF8String(item));
}

@Override
public boolean mightContainBinary(byte[] item) {
int h1 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, 0);
int h2 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, h1);

long bitSize = bits.bitSize();
for (int i = 1; i <= numHashFunctions; i++) {
int combinedHash = h1 + (i * h2);
// Flip all the bits if it's negative (guaranteed positive number)
if (combinedHash < 0) {
combinedHash = ~combinedHash;
}
if (!bits.get(combinedHash % bitSize)) {
return false;
}
}
return true;
return strategy.putString(item, bits, numHashFunctions);
}

@Override
public boolean putLong(long item) {
// Here we first hash the input long element into 2 int hash values, h1 and h2, then produce n
// hash values by `h1 + i * h2` with 1 <= i <= numHashFunctions.
// Note that `CountMinSketch` use a different strategy, it hash the input long element with
// every i to produce n hash values.
// TODO: the strategy of `CountMinSketch` looks more advanced, should we follow it here?
int h1 = Murmur3_x86_32.hashLong(item, 0);
int h2 = Murmur3_x86_32.hashLong(item, h1);

long bitSize = bits.bitSize();
boolean bitsChanged = false;
for (int i = 1; i <= numHashFunctions; i++) {
int combinedHash = h1 + (i * h2);
// Flip all the bits if it's negative (guaranteed positive number)
if (combinedHash < 0) {
combinedHash = ~combinedHash;
}
bitsChanged |= bits.set(combinedHash % bitSize);
}
return bitsChanged;
return strategy.putLong(item, bits, numHashFunctions);
}

@Override
public boolean mightContainLong(long item) {
int h1 = Murmur3_x86_32.hashLong(item, 0);
int h2 = Murmur3_x86_32.hashLong(item, h1);

long bitSize = bits.bitSize();
for (int i = 1; i <= numHashFunctions; i++) {
int combinedHash = h1 + (i * h2);
// Flip all the bits if it's negative (guaranteed positive number)
if (combinedHash < 0) {
combinedHash = ~combinedHash;
}
if (!bits.get(combinedHash % bitSize)) {
return false;
}
}
return true;
}

@Override
public boolean mightContain(Object item) {
if (item instanceof String str) {
return mightContainString(str);
} else if (item instanceof byte[] bytes) {
return mightContainBinary(bytes);
} else {
return mightContainLong(Utils.integralToLong(item));
}
public boolean putBinary(byte[] item) {
return strategy.putBinary(item, bits, numHashFunctions);
}

@Override
Expand Down Expand Up @@ -204,13 +120,8 @@ public BloomFilter intersectInPlace(BloomFilter other) throws IncompatibleMergeE
return this;
}

@Override
public long cardinality() {
return this.bits.cardinality();
}

private BloomFilterImpl checkCompatibilityForMerge(BloomFilter other)
throws IncompatibleMergeException {
throws IncompatibleMergeException {
// Duplicates the logic of `isCompatible` here to provide better error message.
if (other == null) {
throw new IncompatibleMergeException("Cannot merge null bloom filter");
Expand All @@ -234,6 +145,37 @@ private BloomFilterImpl checkCompatibilityForMerge(BloomFilter other)
return that;
}

@Override
public long cardinality() {
return this.bits.cardinality();
}

@Override
public boolean mightContain(Object item) {
if (item instanceof String str) {
return mightContainString(str);
} else if (item instanceof byte[] bytes) {
return mightContainBinary(bytes);
} else {
return mightContainLong(Utils.integralToLong(item));
}
}

@Override
public boolean mightContainString(String item) {
return strategy.mightContainString(item, bits, numHashFunctions);
}

@Override
public boolean mightContainLong(long item) {
return strategy.mightContainLong(item, bits, numHashFunctions);
}

@Override
public boolean mightContainBinary(byte[] item) {
return strategy.mightContainBinary(item, bits, numHashFunctions);
}

@Override
public void writeTo(OutputStream out) throws IOException {
DataOutputStream dos = new DataOutputStream(out);
Expand All @@ -243,18 +185,6 @@ public void writeTo(OutputStream out) throws IOException {
bits.writeTo(dos);
}

private void readFrom0(InputStream in) throws IOException {
DataInputStream dis = new DataInputStream(in);

int version = dis.readInt();
if (version != Version.V1.getVersionNumber()) {
throw new IOException("Unexpected Bloom filter version number (" + version + ")");
}

this.numHashFunctions = dis.readInt();
this.bits = BitArray.readFrom(dis);
}

public static BloomFilterImpl readFrom(InputStream in) throws IOException {
BloomFilterImpl filter = new BloomFilterImpl();
filter.readFrom0(in);
Expand All @@ -267,6 +197,19 @@ public static BloomFilterImpl readFrom(byte[] bytes) throws IOException {
}
}

private void readFrom0(InputStream in) throws IOException {
DataInputStream dis = new DataInputStream(in);

int version = dis.readInt();
if (version != Version.V1.getVersionNumber()) {
throw new IOException("Unexpected Bloom filter version number (" + version + ")");
}

this.numHashFunctions = dis.readInt();
this.bits = BitArray.readFrom(dis);
this.strategy = currentStrategy();
}

private void writeObject(ObjectOutputStream out) throws IOException {
writeTo(out);
}
Expand Down
Loading