From b19c47235d5a2a3e22f08b290ad05dff5cfbdb67 Mon Sep 17 00:00:00 2001 From: Xianglin Zhang Date: Fri, 31 Oct 2025 11:43:39 -0700 Subject: [PATCH 1/4] make public --- parquet/src/bloom_filter/mod.rs | 49 +++++++++++++++++---------------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index 290a887b2960..2b374c87e436 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -244,29 +244,8 @@ fn num_of_bits_from_ndv_fpp(ndv: u64, fpp: f64) -> usize { } impl Sbbf { - /// Create a new [Sbbf] with given number of distinct values and false positive probability. - /// Will return an error if `fpp` is greater than or equal to 1.0 or less than 0.0. - pub(crate) fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Result { - if !(0.0..1.0).contains(&fpp) { - return Err(ParquetError::General(format!( - "False positive probability must be between 0.0 and 1.0, got {fpp}" - ))); - } - let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp); - Ok(Self::new_with_num_of_bytes(num_bits / 8)) - } - - /// Create a new [Sbbf] with given number of bytes, the exact number of bytes will be adjusted - /// to the next power of two bounded by [BITSET_MIN_LENGTH] and [BITSET_MAX_LENGTH]. - pub(crate) fn new_with_num_of_bytes(num_bytes: usize) -> Self { - let num_bytes = optimal_num_of_bytes(num_bytes); - assert_eq!(num_bytes % size_of::(), 0); - let num_blocks = num_bytes / size_of::(); - let bitset = vec![Block::ZERO; num_blocks]; - Self(bitset) - } - - pub(crate) fn new(bitset: &[u8]) -> Self { + /// Create a new [Sbbf] from raw bitset bytes. + pub fn new(bitset: &[u8]) -> Self { let data = bitset .chunks_exact(4 * 8) .map(|chunk| { @@ -283,7 +262,7 @@ impl Sbbf { /// Write the bloom filter data (header and then bitset) to the output. This doesn't /// flush the writer in order to boost performance of bulk writing all blocks. Caller /// must remember to flush the writer. - pub(crate) fn write(&self, mut writer: W) -> Result<(), ParquetError> { + pub fn write(&self, mut writer: W) -> Result<(), ParquetError> { let mut protocol = ThriftCompactOutputProtocol::new(&mut writer); self.header().write_thrift(&mut protocol).map_err(|e| { ParquetError::General(format!("Could not write bloom filter header: {e}")) @@ -292,6 +271,28 @@ impl Sbbf { Ok(()) } + /// Create a new [Sbbf] with given number of distinct values and false positive probability. + /// Will return an error if `fpp` is greater than or equal to 1.0 or less than 0.0. + pub(crate) fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Result { + if !(0.0..1.0).contains(&fpp) { + return Err(ParquetError::General(format!( + "False positive probability must be between 0.0 and 1.0, got {fpp}" + ))); + } + let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp); + Ok(Self::new_with_num_of_bytes(num_bits / 8)) + } + + /// Create a new [Sbbf] with given number of bytes, the exact number of bytes will be adjusted + /// to the next power of two bounded by [BITSET_MIN_LENGTH] and [BITSET_MAX_LENGTH]. + pub(crate) fn new_with_num_of_bytes(num_bytes: usize) -> Self { + let num_bytes = optimal_num_of_bytes(num_bytes); + assert_eq!(num_bytes % size_of::(), 0); + let num_blocks = num_bytes / size_of::(); + let bitset = vec![Block::ZERO; num_blocks]; + Self(bitset) + } + /// Write the bitset in serialized form to the writer. #[cfg(not(target_endian = "little"))] fn write_bitset(&self, mut writer: W) -> Result<(), ParquetError> { From 95db76bb61eb7f440e28b9e2aa12609b93493cb8 Mon Sep 17 00:00:00 2001 From: Xianglin Zhang Date: Fri, 31 Oct 2025 12:01:15 -0700 Subject: [PATCH 2/4] add tests --- parquet/src/bloom_filter/mod.rs | 51 +++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index 2b374c87e436..69bf2a90e5a5 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -477,6 +477,57 @@ mod tests { } } + #[test] + fn test_sbbf_new_parses_little_endian_blocks() { + let words: [u32; 16] = [ + 0x0001_0203, + 0x0405_0607, + 0x0809_0a0b, + 0x0c0d_0e0f, + 0x1011_1213, + 0x1415_1617, + 0x1819_1a1b, + 0x1c1d_1e1f, + 0x2021_2223, + 0x2425_2627, + 0x2829_2a2b, + 0x2c2d_2e2f, + 0x3031_3233, + 0x3435_3637, + 0x3839_3a3b, + 0x3c3d_3e3f, + ]; + let mut bitset = Vec::with_capacity(words.len() * 4); + for word in &words { + bitset.extend_from_slice(&word.to_le_bytes()); + } + let sbbf = Sbbf::new(&bitset); + assert_eq!(sbbf.0.len(), 2); + for (block_index, block) in sbbf.0.iter().enumerate() { + for word_index in 0..8 { + let overall_index = block_index * 8 + word_index; + assert_eq!(block[word_index], words[overall_index]); + } + } + } + + #[test] + fn test_sbbf_write_round_trip() { + let bitset: Vec = (0u8..64).collect(); + let sbbf = Sbbf::new(&bitset); + let mut output = Vec::new(); + sbbf.write(&mut output).unwrap(); + + let mut protocol = ThriftSliceInputProtocol::new(&output); + let header = BloomFilterHeader::read_thrift(&mut protocol).unwrap(); + assert_eq!(header.num_bytes, bitset.len() as i32); + assert_eq!(header.algorithm, BloomFilterAlgorithm::BLOCK); + assert_eq!(header.hash, BloomFilterHash::XXHASH); + assert_eq!(header.compression, BloomFilterCompression::UNCOMPRESSED); + + assert_eq!(protocol.as_slice(), bitset.as_slice()); + } + /// test the assumption that bloom filter header size should not exceed SBBF_HEADER_SIZE_ESTIMATE /// essentially we are testing that the struct is packed with 4 i32 fields, each can be 1-5 bytes /// so altogether it'll be 20 bytes at most. From 93afaec059c3dad5e68dabfbada9c4dbf64a78e4 Mon Sep 17 00:00:00 2001 From: Xianglin Zhang Date: Thu, 6 Nov 2025 16:04:13 -0800 Subject: [PATCH 3/4] add doc example --- parquet/src/bloom_filter/mod.rs | 40 +++++++++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index 69bf2a90e5a5..ffa9c4c9a62b 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -116,7 +116,7 @@ pub struct BloomFilterHeader { /// Each block is 256 bits, broken up into eight contiguous "words", each consisting of 32 bits. /// Each word is thought of as an array of bits; each bit is either "set" or "not set". -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] #[repr(transparent)] struct Block([u32; 8]); impl Block { @@ -195,7 +195,7 @@ impl std::ops::IndexMut for Block { /// /// The creation of this structure is based on the [`crate::file::properties::BloomFilterProperties`] /// struct set via [`crate::file::properties::WriterProperties`] and is thus hidden by default. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct Sbbf(Vec); pub(crate) const SBBF_HEADER_SIZE_ESTIMATE: usize = 20; @@ -245,6 +245,42 @@ fn num_of_bits_from_ndv_fpp(ndv: u64, fpp: f64) -> usize { impl Sbbf { /// Create a new [Sbbf] from raw bitset bytes. + /// + /// # Examples + /// + /// ``` + /// # use parquet::errors::Result; + /// # use parquet::bloom_filter::Sbbf; + /// # fn main() -> Result<()> { + /// // In a real application you would read a serialized bloom filter from a Parquet file. + /// // For example, using ParquetRecordBatchStreamBuilder: + /// // + /// // let file = std::fs::File::open("data.parquet")?; + /// // let reader = ParquetRecordBatchStreamBuilder::new(file).await?; + /// // let bloom_filter = reader + /// // .get_row_group_column_bloom_filter(row_group_index, column_index) + /// // .await?; + /// // + /// // For this example we handcraft a 32-byte bitset. + /// let bitset_bytes = vec![0u8; 32]; + /// let original = Sbbf::new(&bitset_bytes); + /// + /// // Persist the filter (header + bitset) into an in-memory buffer. + /// let mut serialized = Vec::new(); + /// original.write(&mut serialized)?; + /// + /// // When reading the filter back, reuse the bitset portion of the buffer. + /// let bitset_slice = &serialized[serialized.len() - bitset_bytes.len()..]; + /// let reconstructed = Sbbf::new(bitset_slice); + /// + /// assert_eq!(reconstructed, original); + /// # Ok(()) + /// # } + /// ``` + /// + /// A practical way to obtain a correctly sized bitset slice for this constructor is to + /// serialize an existing filter with [`Sbbf::write`] and reuse the bitset bytes that follow + /// the header. pub fn new(bitset: &[u8]) -> Self { let data = bitset .chunks_exact(4 * 8) From b65d20767c7510570cff0ab0154a268c29a3f712 Mon Sep 17 00:00:00 2001 From: Xianglin Zhang Date: Fri, 21 Nov 2025 12:04:29 -0800 Subject: [PATCH 4/4] add two more functions --- parquet/src/bloom_filter/mod.rs | 35 +++++++++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index ffa9c4c9a62b..e1f36de50856 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -84,6 +84,7 @@ use crate::parquet_thrift::{ use crate::thrift_struct; use bytes::Bytes; use std::io::Write; +use std::mem::size_of; use twox_hash::XxHash64; /// Salt as defined in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md#technical-approach). @@ -270,17 +271,18 @@ impl Sbbf { /// original.write(&mut serialized)?; /// /// // When reading the filter back, reuse the bitset portion of the buffer. - /// let bitset_slice = &serialized[serialized.len() - bitset_bytes.len()..]; - /// let reconstructed = Sbbf::new(bitset_slice); + /// let bitset_len = original.bitset_len(); + /// let reconstructed = Sbbf::new(&serialized[serialized.len() - bitset_len..]); /// - /// assert_eq!(reconstructed, original); + /// assert_eq!(reconstructed.as_slice(), original.as_slice()); /// # Ok(()) /// # } /// ``` /// /// A practical way to obtain a correctly sized bitset slice for this constructor is to /// serialize an existing filter with [`Sbbf::write`] and reuse the bitset bytes that follow - /// the header. + /// the header, compute the length with [`Sbbf::bitset_len`], or call [`Sbbf::as_slice`] to + /// obtain the bytes directly. pub fn new(bitset: &[u8]) -> Self { let data = bitset .chunks_exact(4 * 8) @@ -307,6 +309,22 @@ impl Sbbf { Ok(()) } + /// Returns the raw bitset bytes encoded in little-endian order. + pub fn as_slice(&self) -> Vec { + let mut buffer = Vec::with_capacity(self.bitset_len()); + for block in &self.0 { + for word in &block.0 { + buffer.extend_from_slice(&word.to_le_bytes()); + } + } + buffer + } + + /// Returns the size of the bitset in bytes. + pub fn bitset_len(&self) -> usize { + self.0.len() * size_of::() + } + /// Create a new [Sbbf] with given number of distinct values and false positive probability. /// Will return an error if `fpp` is greater than or equal to 1.0 or less than 0.0. pub(crate) fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Result { @@ -564,6 +582,15 @@ mod tests { assert_eq!(protocol.as_slice(), bitset.as_slice()); } + #[test] + fn test_sbbf_as_slice_matches_original_bytes() { + let bitset: Vec = (0u8..64).collect(); + let sbbf = Sbbf::new(&bitset); + let view = sbbf.as_slice(); + assert_eq!(view, bitset); + assert_eq!(sbbf.bitset_len(), view.len()); + } + /// test the assumption that bloom filter header size should not exceed SBBF_HEADER_SIZE_ESTIMATE /// essentially we are testing that the struct is packed with 4 i32 fields, each can be 1-5 bytes /// so altogether it'll be 20 bytes at most.