@@ -30,12 +30,16 @@ pub use crate::arrow::array_reader::RowGroups;
3030use crate :: arrow:: array_reader:: { ArrayReader , ArrayReaderBuilder } ;
3131use crate :: arrow:: schema:: { parquet_to_arrow_schema_and_fields, ParquetField } ;
3232use crate :: arrow:: { parquet_to_arrow_field_levels, FieldLevels , ProjectionMask } ;
33+ use crate :: bloom_filter:: {
34+ chunk_read_bloom_filter_header_and_offset, Sbbf , SBBF_HEADER_SIZE_ESTIMATE ,
35+ } ;
3336use crate :: column:: page:: { PageIterator , PageReader } ;
3437#[ cfg( feature = "encryption" ) ]
3538use crate :: encryption:: decrypt:: FileDecryptionProperties ;
3639use crate :: errors:: { ParquetError , Result } ;
3740use crate :: file:: metadata:: { ParquetMetaData , ParquetMetaDataReader } ;
3841use crate :: file:: reader:: { ChunkReader , SerializedPageReader } ;
42+ use crate :: format:: { BloomFilterAlgorithm , BloomFilterCompression , BloomFilterHash } ;
3943use crate :: schema:: types:: SchemaDescriptor ;
4044
4145pub ( crate ) use read_plan:: { ReadPlan , ReadPlanBuilder } ;
@@ -703,6 +707,66 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
703707 Self :: new_builder ( SyncReader ( input) , metadata)
704708 }
705709
710+ /// Read bloom filter for a column in a row group
711+ ///
712+ /// Returns `None` if the column does not have a bloom filter
713+ ///
714+ /// We should call this function after other forms pruning, such as projection and predicate pushdown.
715+ pub fn get_row_group_column_bloom_filter (
716+ & mut self ,
717+ row_group_idx : usize ,
718+ column_idx : usize ,
719+ ) -> Result < Option < Sbbf > > {
720+ let metadata = self . metadata . row_group ( row_group_idx) ;
721+ let column_metadata = metadata. column ( column_idx) ;
722+
723+ let offset: u64 = if let Some ( offset) = column_metadata. bloom_filter_offset ( ) {
724+ offset
725+ . try_into ( )
726+ . map_err ( |_| ParquetError :: General ( "Bloom filter offset is invalid" . to_string ( ) ) ) ?
727+ } else {
728+ return Ok ( None ) ;
729+ } ;
730+
731+ let buffer = match column_metadata. bloom_filter_length ( ) {
732+ Some ( length) => self . input . 0 . get_bytes ( offset, length as usize ) ,
733+ None => self . input . 0 . get_bytes ( offset, SBBF_HEADER_SIZE_ESTIMATE ) ,
734+ } ?;
735+
736+ let ( header, bitset_offset) =
737+ chunk_read_bloom_filter_header_and_offset ( offset, buffer. clone ( ) ) ?;
738+
739+ match header. algorithm {
740+ BloomFilterAlgorithm :: BLOCK ( _) => {
741+ // this match exists to future proof the singleton algorithm enum
742+ }
743+ }
744+ match header. compression {
745+ BloomFilterCompression :: UNCOMPRESSED ( _) => {
746+ // this match exists to future proof the singleton compression enum
747+ }
748+ }
749+ match header. hash {
750+ BloomFilterHash :: XXHASH ( _) => {
751+ // this match exists to future proof the singleton hash enum
752+ }
753+ }
754+
755+ let bitset = match column_metadata. bloom_filter_length ( ) {
756+ Some ( _) => buffer. slice (
757+ ( TryInto :: < usize > :: try_into ( bitset_offset) . unwrap ( )
758+ - TryInto :: < usize > :: try_into ( offset) . unwrap ( ) ) ..,
759+ ) ,
760+ None => {
761+ let bitset_length: usize = header. num_bytes . try_into ( ) . map_err ( |_| {
762+ ParquetError :: General ( "Bloom filter length is invalid" . to_string ( ) )
763+ } ) ?;
764+ self . input . 0 . get_bytes ( bitset_offset, bitset_length) ?
765+ }
766+ } ;
767+ Ok ( Some ( Sbbf :: new ( & bitset) ) )
768+ }
769+
706770 /// Build a [`ParquetRecordBatchReader`]
707771 ///
708772 /// Note: this will eagerly evaluate any `RowFilter` before returning
@@ -4720,4 +4784,54 @@ mod tests {
47204784 assert_eq ! ( c0. len( ) , c1. len( ) ) ;
47214785 c0. iter ( ) . zip ( c1. iter ( ) ) . for_each ( |( l, r) | assert_eq ! ( l, r) ) ;
47224786 }
4787+
4788+ #[ test]
4789+ fn test_get_row_group_column_bloom_filter_with_length ( ) {
4790+ // convert to new parquet file with bloom_filter_length
4791+ let testdata = arrow:: util:: test_util:: parquet_test_data ( ) ;
4792+ let path = format ! ( "{testdata}/data_index_bloom_encoding_stats.parquet" ) ;
4793+ let file = File :: open ( path) . unwrap ( ) ;
4794+ let builder = ParquetRecordBatchReaderBuilder :: try_new ( file) . unwrap ( ) ;
4795+ let schema = builder. schema ( ) . clone ( ) ;
4796+ let reader = builder. build ( ) . unwrap ( ) ;
4797+
4798+ let mut parquet_data = Vec :: new ( ) ;
4799+ let props = WriterProperties :: builder ( )
4800+ . set_bloom_filter_enabled ( true )
4801+ . build ( ) ;
4802+ let mut writer = ArrowWriter :: try_new ( & mut parquet_data, schema, Some ( props) ) . unwrap ( ) ;
4803+ for batch in reader {
4804+ let batch = batch. unwrap ( ) ;
4805+ writer. write ( & batch) . unwrap ( ) ;
4806+ }
4807+ writer. close ( ) . unwrap ( ) ;
4808+
4809+ // test the new parquet file
4810+ test_get_row_group_column_bloom_filter ( parquet_data. into ( ) , true ) ;
4811+ }
4812+
4813+ #[ test]
4814+ fn test_get_row_group_column_bloom_filter_without_length ( ) {
4815+ let testdata = arrow:: util:: test_util:: parquet_test_data ( ) ;
4816+ let path = format ! ( "{testdata}/data_index_bloom_encoding_stats.parquet" ) ;
4817+ let data = Bytes :: from ( std:: fs:: read ( path) . unwrap ( ) ) ;
4818+ test_get_row_group_column_bloom_filter ( data, false ) ;
4819+ }
4820+
4821+ fn test_get_row_group_column_bloom_filter ( data : Bytes , with_length : bool ) {
4822+ let mut builder = ParquetRecordBatchReaderBuilder :: try_new ( data. clone ( ) ) . unwrap ( ) ;
4823+
4824+ let metadata = builder. metadata ( ) ;
4825+ assert_eq ! ( metadata. num_row_groups( ) , 1 ) ;
4826+ let row_group = metadata. row_group ( 0 ) ;
4827+ let column = row_group. column ( 0 ) ;
4828+ assert_eq ! ( column. bloom_filter_length( ) . is_some( ) , with_length) ;
4829+
4830+ let sbbf = builder
4831+ . get_row_group_column_bloom_filter ( 0 , 0 )
4832+ . unwrap ( )
4833+ . unwrap ( ) ;
4834+ assert ! ( sbbf. check( & "Hello" ) ) ;
4835+ assert ! ( !sbbf. check( & "Hello_Not_Exists" ) ) ;
4836+ }
47234837}
0 commit comments