diff --git a/velox/connectors/hive/HiveConfig.cpp b/velox/connectors/hive/HiveConfig.cpp index 74b5872aa404..70fe15fe0874 100644 --- a/velox/connectors/hive/HiveConfig.cpp +++ b/velox/connectors/hive/HiveConfig.cpp @@ -107,6 +107,13 @@ bool HiveConfig::isFileColumnNamesReadAsLowerCase( config_->get(kFileColumnNamesReadAsLowerCase, false)); } +bool HiveConfig::isParquetReadBloomFilter( + const config::ConfigBase* session) const { + return session->get( + kParquetReadBloomFilterSession, + config_->get(kParquetReadBloomFilter, false)); +} + bool HiveConfig::isPartitionPathAsLowerCase( const config::ConfigBase* session) const { return session->get(kPartitionPathAsLowerCaseSession, true); diff --git a/velox/connectors/hive/HiveConfig.h b/velox/connectors/hive/HiveConfig.h index 38becf08415d..996b9fc7de7b 100644 --- a/velox/connectors/hive/HiveConfig.h +++ b/velox/connectors/hive/HiveConfig.h @@ -80,6 +80,13 @@ class HiveConfig { static constexpr const char* kParquetUseColumnNamesSession = "parquet_use_column_names"; + // Read bloom filters from parquet files to filter row groups. + static constexpr const char* kParquetReadBloomFilter = + "hive.parquet.read-bloom-filter"; + + static constexpr const char* kParquetReadBloomFilterSession = + "hive_parquet_read_bloom_filter"; + /// Reads the source file column name as lower case. static constexpr const char* kFileColumnNamesReadAsLowerCase = "file-column-names-read-as-lower-case"; @@ -199,6 +206,8 @@ class HiveConfig { bool isFileColumnNamesReadAsLowerCase( const config::ConfigBase* session) const; + bool isParquetReadBloomFilter(const config::ConfigBase* session) const; + bool isPartitionPathAsLowerCase(const config::ConfigBase* session) const; bool allowNullPartitionKeys(const config::ConfigBase* session) const; diff --git a/velox/connectors/hive/HiveConnectorUtil.cpp b/velox/connectors/hive/HiveConnectorUtil.cpp index 1ccc4adb9c36..113443eefd7a 100644 --- a/velox/connectors/hive/HiveConnectorUtil.cpp +++ b/velox/connectors/hive/HiveConnectorUtil.cpp @@ -603,6 +603,11 @@ void configureReaderOptions( readerOptions.setFileFormat(hiveSplit->fileFormat); } + + if (readerOptions.fileFormat() == dwio::common::FileFormat::PARQUET) { + readerOptions.setReadBloomFilter( + hiveConfig->isParquetReadBloomFilter(sessionProperties)); + } } void configureRowReaderOptions( diff --git a/velox/connectors/hive/tests/HiveConfigTest.cpp b/velox/connectors/hive/tests/HiveConfigTest.cpp index 3522eec64abb..deea5ab518e4 100644 --- a/velox/connectors/hive/tests/HiveConfigTest.cpp +++ b/velox/connectors/hive/tests/HiveConfigTest.cpp @@ -37,7 +37,7 @@ TEST(HiveConfigTest, defaultConfig) { ASSERT_EQ(hiveConfig.gcsCredentialsPath(), ""); ASSERT_FALSE(hiveConfig.isOrcUseColumnNames(emptySession.get())); ASSERT_FALSE(hiveConfig.isFileColumnNamesReadAsLowerCase(emptySession.get())); - + ASSERT_FALSE(hiveConfig.isParquetReadBloomFilter(emptySession.get())); ASSERT_EQ(hiveConfig.maxCoalescedBytes(emptySession.get()), 128 << 20); ASSERT_EQ( hiveConfig.maxCoalescedDistanceBytes(emptySession.get()), 512 << 10); @@ -64,6 +64,7 @@ TEST(HiveConfigTest, overrideConfig) { {HiveConfig::kGcsCredentialsPath, "hey"}, {HiveConfig::kOrcUseColumnNames, "true"}, {HiveConfig::kFileColumnNamesReadAsLowerCase, "true"}, + {HiveConfig::kParquetReadBloomFilter, "true"}, {HiveConfig::kAllowNullPartitionKeys, "false"}, {HiveConfig::kMaxCoalescedBytes, "100"}, {HiveConfig::kMaxCoalescedDistance, "100kB"}, @@ -92,6 +93,7 @@ TEST(HiveConfigTest, overrideConfig) { ASSERT_EQ(hiveConfig.maxCoalescedBytes(emptySession.get()), 100); ASSERT_EQ( hiveConfig.maxCoalescedDistanceBytes(emptySession.get()), 100 << 10); + ASSERT_TRUE(hiveConfig.isParquetReadBloomFilter(emptySession.get())); ASSERT_EQ(hiveConfig.numCacheFileHandles(), 100); ASSERT_FALSE(hiveConfig.isFileHandleCacheEnabled()); ASSERT_EQ(hiveConfig.sortWriterMaxOutputRows(emptySession.get()), 100); diff --git a/velox/dwio/common/Options.h b/velox/dwio/common/Options.h index 987e0eb76462..2d112a7b6a2b 100644 --- a/velox/dwio/common/Options.h +++ b/velox/dwio/common/Options.h @@ -497,6 +497,11 @@ class ReaderOptions : public io::ReaderOptions { return *this; } + ReaderOptions& setReadBloomFilter(bool flag) { + readBloomFilter_ = flag; + return *this; + } + ReaderOptions& setIOExecutor(std::shared_ptr executor) { ioExecutor_ = std::move(executor); return *this; @@ -567,6 +572,10 @@ class ReaderOptions : public io::ReaderOptions { return useColumnNamesForColumnMapping_; } + bool readBloomFilter() const { + return readBloomFilter_; + } + const std::shared_ptr& randomSkip() const { return randomSkip_; } @@ -609,6 +618,7 @@ class ReaderOptions : public io::ReaderOptions { uint64_t filePreloadThreshold_{kDefaultFilePreloadThreshold}; bool fileColumnNamesReadAsLowerCase_{false}; bool useColumnNamesForColumnMapping_{false}; + bool readBloomFilter_{false}; std::shared_ptr ioExecutor_; std::shared_ptr randomSkip_; std::shared_ptr scanSpec_; diff --git a/velox/dwio/parquet/common/BloomFilter.h b/velox/dwio/parquet/common/BloomFilter.h index 9a2d9d47fec0..5eb196684bab 100644 --- a/velox/dwio/parquet/common/BloomFilter.h +++ b/velox/dwio/parquet/common/BloomFilter.h @@ -68,31 +68,31 @@ class BloomFilter { /// /// @param value the value to hash. /// @return hash result. - virtual uint64_t hash(int32_t value) const = 0; + virtual uint64_t hashInt32(int32_t value) const = 0; /// Compute hash for 64 bits value by using its plain encoding result. /// /// @param value the value to hash. /// @return hash result. - virtual uint64_t hash(int64_t value) const = 0; + virtual uint64_t hashInt64(int64_t value) const = 0; /// Compute hash for float value by using its plain encoding result. /// /// @param value the value to hash. /// @return hash result. - virtual uint64_t hash(float value) const = 0; + virtual uint64_t hashFloat(float value) const = 0; /// Compute hash for double value by using its plain encoding result. /// /// @param value the value to hash. /// @return hash result. - virtual uint64_t hash(double value) const = 0; + virtual uint64_t hashDouble(double value) const = 0; /// Compute hash for bytearray by using its plain encoding result. /// /// @param value the value to hash. /// @return hash result. - virtual uint64_t hash(const ByteArray* value) const = 0; + virtual uint64_t hashByteArray(const ByteArray* value) const = 0; /// Batch compute hashes for 32 bits values by using its plain encoding /// result. @@ -101,8 +101,8 @@ class BloomFilter { /// @param num_values the number of values to hash. /// @param hashes a pointer to the output hash values, its length should be /// equal to num_values. - virtual void hashes(const int32_t* values, int numValues, uint64_t* hashes) - const = 0; + virtual void + hashesInt32(const int32_t* values, int numValues, uint64_t* hashes) const = 0; /// Batch compute hashes for 64 bits values by using its plain encoding /// result. @@ -111,8 +111,8 @@ class BloomFilter { /// @param num_values the number of values to hash. /// @param hashes a pointer to the output hash values, its length should be /// equal to num_values. - virtual void hashes(const int64_t* values, int numValues, uint64_t* hashes) - const = 0; + virtual void + hashesInt64(const int64_t* values, int numValues, uint64_t* hashes) const = 0; /// Batch compute hashes for float values by using its plain encoding result. /// @@ -120,7 +120,7 @@ class BloomFilter { /// @param num_values the number of values to hash. /// @param hashes a pointer to the output hash values, its length should be /// equal to num_values. - virtual void hashes(const float* values, int numValues, uint64_t* hashes) + virtual void hashesFloat(const float* values, int numValues, uint64_t* hashes) const = 0; /// Batch compute hashes for double values by using its plain encoding result. @@ -129,8 +129,8 @@ class BloomFilter { /// @param num_values the number of values to hash. /// @param hashes a pointer to the output hash values, its length should be /// equal to num_values. - virtual void hashes(const double* values, int numValues, uint64_t* hashes) - const = 0; + virtual void + hashesDouble(const double* values, int numValues, uint64_t* hashes) const = 0; /// Batch compute hashes for bytearray values by using its plain encoding /// result. @@ -139,8 +139,10 @@ class BloomFilter { /// @param num_values the number of values to hash. /// @param hashes a pointer to the output hash values, its length should be /// equal to num_values. - virtual void hashes(const ByteArray* values, int numValues, uint64_t* hashes) - const = 0; + virtual void hashesByteArray( + const ByteArray* values, + int numValues, + uint64_t* hashes) const = 0; virtual ~BloomFilter() = default; @@ -248,54 +250,41 @@ class BlockSplitBloomFilter : public BloomFilter { return numBytes_; } - uint64_t hash(int32_t value) const override { - return hasher_->hash(value); + uint64_t hashInt32(int32_t value) const override { + return hasher_->hashInt32(value); } - uint64_t hash(int64_t value) const override { - return hasher_->hash(value); + uint64_t hashInt64(int64_t value) const override { + return hasher_->hashInt64(value); } - uint64_t hash(float value) const override { - return hasher_->hash(value); + uint64_t hashFloat(float value) const override { + return hasher_->hashFloat(value); } - uint64_t hash(double value) const override { - return hasher_->hash(value); + uint64_t hashDouble(double value) const override { + return hasher_->hashDouble(value); } - uint64_t hash(const ByteArray* value) const override { - return hasher_->hash(value); + uint64_t hashByteArray(const ByteArray* value) const override { + return hasher_->hashByteArray(value); } - void hashes(const int32_t* values, int numValues, uint64_t* hashes) + void hashesInt32(const int32_t* values, int numValues, uint64_t* hashes) const override { - hasher_->hashes(values, numValues, hashes); + hasher_->hashesInt32(values, numValues, hashes); } - void hashes(const int64_t* values, int numValues, uint64_t* hashes) + void hashesInt64(const int64_t* values, int numValues, uint64_t* hashes) const override { - hasher_->hashes(values, numValues, hashes); + hasher_->hashesInt64(values, numValues, hashes); } - void hashes(const float* values, int numValues, uint64_t* hashes) + void hashesFloat(const float* values, int numValues, uint64_t* hashes) const override { - hasher_->hashes(values, numValues, hashes); + hasher_->hashesFloat(values, numValues, hashes); } - void hashes(const double* values, int numValues, uint64_t* hashes) + void hashesDouble(const double* values, int numValues, uint64_t* hashes) const override { - hasher_->hashes(values, numValues, hashes); + hasher_->hashesDouble(values, numValues, hashes); } - void hashes(const ByteArray* values, int numValues, uint64_t* hashes) + void hashesByteArray(const ByteArray* values, int numValues, uint64_t* hashes) const override { - hasher_->hashes(values, numValues, hashes); - } - - uint64_t hash(const int32_t* value) const { - return hasher_->hash(*value); - } - uint64_t hash(const int64_t* value) const { - return hasher_->hash(*value); - } - uint64_t hash(const float* value) const { - return hasher_->hash(*value); - } - uint64_t hash(const double* value) const { - return hasher_->hash(*value); + hasher_->hashesByteArray(values, numValues, hashes); } /// Deserialize the Bloom filter from an input stream. It is used when diff --git a/velox/dwio/parquet/common/CMakeLists.txt b/velox/dwio/parquet/common/CMakeLists.txt index 4e3edf6687ce..fb6f4d4577c4 100644 --- a/velox/dwio/parquet/common/CMakeLists.txt +++ b/velox/dwio/parquet/common/CMakeLists.txt @@ -17,7 +17,8 @@ velox_add_library( BloomFilter.cpp XxHasher.cpp LevelComparison.cpp - LevelConversion.cpp) + LevelConversion.cpp + ParquetBloomFilter.h) velox_link_libraries( velox_dwio_parquet_common diff --git a/velox/dwio/parquet/common/Hasher.h b/velox/dwio/parquet/common/Hasher.h index 3f3a907d06b4..b40b1167b624 100644 --- a/velox/dwio/parquet/common/Hasher.h +++ b/velox/dwio/parquet/common/Hasher.h @@ -47,31 +47,31 @@ class Hasher { /// /// @param value the value to hash. /// @return hash result. - virtual uint64_t hash(int32_t value) const = 0; + virtual uint64_t hashInt32(int32_t value) const = 0; /// Compute hash for 64 bits value by using its plain encoding result. /// /// @param value the value to hash. /// @return hash result. - virtual uint64_t hash(int64_t value) const = 0; + virtual uint64_t hashInt64(int64_t value) const = 0; /// Compute hash for float value by using its plain encoding result. /// /// @param value the value to hash. /// @return hash result. - virtual uint64_t hash(float value) const = 0; + virtual uint64_t hashFloat(float value) const = 0; /// Compute hash for double value by using its plain encoding result. /// /// @param value the value to hash. /// @return hash result. - virtual uint64_t hash(double value) const = 0; + virtual uint64_t hashDouble(double value) const = 0; /// Compute hash for ByteArray value by using its plain encoding result. /// /// @param value the value to hash. /// @return hash result. - virtual uint64_t hash(const ByteArray* value) const = 0; + virtual uint64_t hashByteArray(const ByteArray* value) const = 0; /// Batch compute hashes for 32 bits values by using its plain encoding /// result. @@ -80,8 +80,10 @@ class Hasher { /// @param num_values the number of values to hash. /// @param hashes a pointer to the output hash values, its length should be /// equal to num_values. - virtual void hashes(const int32_t* values, int num_values, uint64_t* hashes) - const = 0; + virtual void hashesInt32( + const int32_t* values, + int num_values, + uint64_t* hashes) const = 0; /// Batch compute hashes for 64 bits values by using its plain encoding /// result. @@ -90,8 +92,10 @@ class Hasher { /// @param num_values the number of values to hash. /// @param hashes a pointer to the output hash values, its length should be /// equal to num_values. - virtual void hashes(const int64_t* values, int num_values, uint64_t* hashes) - const = 0; + virtual void hashesInt64( + const int64_t* values, + int num_values, + uint64_t* hashes) const = 0; /// Batch compute hashes for float values by using its plain encoding result. /// @@ -99,8 +103,8 @@ class Hasher { /// @param num_values the number of values to hash. /// @param hashes a pointer to the output hash values, its length should be /// equal to num_values. - virtual void hashes(const float* values, int num_values, uint64_t* hashes) - const = 0; + virtual void + hashesFloat(const float* values, int num_values, uint64_t* hashes) const = 0; /// Batch compute hashes for double values by using its plain encoding result. /// @@ -108,8 +112,10 @@ class Hasher { /// @param num_values the number of values to hash. /// @param hashes a pointer to the output hash values, its length should be /// equal to num_values. - virtual void hashes(const double* values, int num_values, uint64_t* hashes) - const = 0; + virtual void hashesDouble( + const double* values, + int num_values, + uint64_t* hashes) const = 0; /// Batch compute hashes for ByteArray values by using its plain encoding /// result. @@ -118,8 +124,10 @@ class Hasher { /// @param num_values the number of values to hash. /// @param hashes a pointer to the output hash values, its length should be /// equal to num_values. - virtual void hashes(const ByteArray* values, int num_values, uint64_t* hashes) - const = 0; + virtual void hashesByteArray( + const ByteArray* values, + int num_values, + uint64_t* hashes) const = 0; virtual ~Hasher() = default; }; diff --git a/velox/dwio/parquet/common/ParquetBloomFilter.h b/velox/dwio/parquet/common/ParquetBloomFilter.h new file mode 100644 index 000000000000..1cf4f0ffea27 --- /dev/null +++ b/velox/dwio/parquet/common/ParquetBloomFilter.h @@ -0,0 +1,47 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/dwio/parquet/common/BloomFilter.h" +#include "velox/type/Filter.h" + +namespace facebook::velox::parquet { + +class ParquetBloomFilter final : public common::AbstractBloomFilter { + public: + ParquetBloomFilter( + std::shared_ptr bloomFilter) + : bloomFilter_(bloomFilter) {} + + bool mightContainInt32(int32_t value) const override { + return bloomFilter_->findHash(bloomFilter_->hashInt32(value)); + } + + bool mightContainInt64(int64_t value) const override { + return bloomFilter_->findHash(bloomFilter_->hashInt64(value)); + } + + bool mightContainString(const std::string& value) const override { + ByteArray byteArray{value}; + return bloomFilter_->findHash(bloomFilter_->hashByteArray(&byteArray)); + } + + private: + std::shared_ptr bloomFilter_; +}; + +} // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/common/XxHasher.cpp b/velox/dwio/parquet/common/XxHasher.cpp index df189a972b6e..8904b3d601b5 100644 --- a/velox/dwio/parquet/common/XxHasher.cpp +++ b/velox/dwio/parquet/common/XxHasher.cpp @@ -42,51 +42,59 @@ void XxHashesHelper( } // namespace -uint64_t XxHasher::hash(int32_t value) const { +uint64_t XxHasher::hashInt32(int32_t value) const { return XxHashHelper(value, kParquetBloomXxHashSeed); } -uint64_t XxHasher::hash(int64_t value) const { +uint64_t XxHasher::hashInt64(int64_t value) const { return XxHashHelper(value, kParquetBloomXxHashSeed); } -uint64_t XxHasher::hash(float value) const { +uint64_t XxHasher::hashFloat(float value) const { return XxHashHelper(value, kParquetBloomXxHashSeed); } -uint64_t XxHasher::hash(double value) const { +uint64_t XxHasher::hashDouble(double value) const { return XxHashHelper(value, kParquetBloomXxHashSeed); } -uint64_t XxHasher::hash(const ByteArray* value) const { +uint64_t XxHasher::hashByteArray(const ByteArray* value) const { return XXH64( reinterpret_cast(value->ptr), value->len, kParquetBloomXxHashSeed); } -void XxHasher::hashes(const int32_t* values, int numValues, uint64_t* hashes) - const { +void XxHasher::hashesInt32( + const int32_t* values, + int numValues, + uint64_t* hashes) const { XxHashesHelper(values, kParquetBloomXxHashSeed, numValues, hashes); } -void XxHasher::hashes(const int64_t* values, int numValues, uint64_t* hashes) - const { +void XxHasher::hashesInt64( + const int64_t* values, + int numValues, + uint64_t* hashes) const { XxHashesHelper(values, kParquetBloomXxHashSeed, numValues, hashes); } -void XxHasher::hashes(const float* values, int numValues, uint64_t* hashes) +void XxHasher::hashesFloat(const float* values, int numValues, uint64_t* hashes) const { XxHashesHelper(values, kParquetBloomXxHashSeed, numValues, hashes); } -void XxHasher::hashes(const double* values, int numValues, uint64_t* hashes) - const { +void XxHasher::hashesDouble( + const double* values, + int numValues, + uint64_t* hashes) const { XxHashesHelper(values, kParquetBloomXxHashSeed, numValues, hashes); } -void XxHasher::hashes(const ByteArray* values, int numValues, uint64_t* hashes) - const { +void XxHasher::hashesByteArray( + const ByteArray* values, + int numValues, + uint64_t* hashes) const { for (int i = 0; i < numValues; ++i) { hashes[i] = XXH64( reinterpret_cast(values[i].ptr), diff --git a/velox/dwio/parquet/common/XxHasher.h b/velox/dwio/parquet/common/XxHasher.h index 07c37f762f36..0d830eaf4722 100644 --- a/velox/dwio/parquet/common/XxHasher.h +++ b/velox/dwio/parquet/common/XxHasher.h @@ -26,22 +26,24 @@ namespace facebook::velox::parquet { class XxHasher : public Hasher { public: - uint64_t hash(int32_t value) const override; - uint64_t hash(int64_t value) const override; - uint64_t hash(float value) const override; - uint64_t hash(double value) const override; - uint64_t hash(const ByteArray* value) const override; + uint64_t hashInt32(int32_t value) const override; + uint64_t hashInt64(int64_t value) const override; + uint64_t hashFloat(float value) const override; + uint64_t hashDouble(double value) const override; + uint64_t hashByteArray(const ByteArray* value) const override; - void hashes(const int32_t* values, int numValues, uint64_t* hashes) + void hashesInt32(const int32_t* values, int numValues, uint64_t* hashes) const override; - void hashes(const int64_t* values, int numValues, uint64_t* hashes) + void hashesInt64(const int64_t* values, int numValues, uint64_t* hashes) const override; - void hashes(const float* values, int numValues, uint64_t* hashes) + void hashesFloat(const float* values, int numValues, uint64_t* hashes) const override; - void hashes(const double* values, int numValues, uint64_t* hashes) - const override; - virtual void hashes(const ByteArray* values, int numValues, uint64_t* hashes) + void hashesDouble(const double* values, int numValues, uint64_t* hashes) const override; + virtual void hashesByteArray( + const ByteArray* values, + int numValues, + uint64_t* hashes) const override; static constexpr int kParquetBloomXxHashSeed = 0; }; diff --git a/velox/dwio/parquet/reader/Metadata.cpp b/velox/dwio/parquet/reader/Metadata.cpp index 771e68e8a595..b67be36f28c4 100644 --- a/velox/dwio/parquet/reader/Metadata.cpp +++ b/velox/dwio/parquet/reader/Metadata.cpp @@ -210,6 +210,15 @@ bool ColumnChunkMetaDataPtr::hasDictionaryPageOffset() const { thriftColumnChunkPtr(ptr_)->meta_data.__isset.dictionary_page_offset; } +bool ColumnChunkMetaDataPtr::hasBloomFilterOffset() const { + return hasMetadata() && + thriftColumnChunkPtr(ptr_)->meta_data.__isset.bloom_filter_offset; +} + +bool ColumnChunkMetaDataPtr::hasCryptoMetadata() const { + return thriftColumnChunkPtr(ptr_)->__isset.crypto_metadata; +} + std::unique_ptr ColumnChunkMetaDataPtr::getColumnStatistics( const TypePtr type, @@ -228,6 +237,11 @@ int64_t ColumnChunkMetaDataPtr::dictionaryPageOffset() const { return thriftColumnChunkPtr(ptr_)->meta_data.dictionary_page_offset; } +int64_t ColumnChunkMetaDataPtr::bloomFilterOffset() const { + VELOX_CHECK(hasBloomFilterOffset()); + return thriftColumnChunkPtr(ptr_)->meta_data.bloom_filter_offset; +} + common::CompressionKind ColumnChunkMetaDataPtr::compression() const { return thriftCodecToCompressionKind( thriftColumnChunkPtr(ptr_)->meta_data.codec); diff --git a/velox/dwio/parquet/reader/Metadata.h b/velox/dwio/parquet/reader/Metadata.h index f99d46656d8c..b32cf512fe8f 100644 --- a/velox/dwio/parquet/reader/Metadata.h +++ b/velox/dwio/parquet/reader/Metadata.h @@ -37,6 +37,12 @@ class ColumnChunkMetaDataPtr { /// Check the presence of the dictionary page offset in ColumnChunk metadata. bool hasDictionaryPageOffset() const; + // Check the presence of the bloom filter offset in ColumnChunk metadata + bool hasBloomFilterOffset() const; + + // Check the presence of crypto metadata in ColumnChunk metadata + bool hasCryptoMetadata() const; + /// Return the ColumnChunk statistics. std::unique_ptr getColumnStatistics( const TypePtr type, @@ -52,6 +58,8 @@ class ColumnChunkMetaDataPtr { /// Must check for its presence using hasDictionaryPageOffset(). int64_t dictionaryPageOffset() const; + int64_t bloomFilterOffset() const; + /// The compression. common::CompressionKind compression() const; diff --git a/velox/dwio/parquet/reader/ParquetData.cpp b/velox/dwio/parquet/reader/ParquetData.cpp index 29a593da414c..875c4ac15860 100644 --- a/velox/dwio/parquet/reader/ParquetData.cpp +++ b/velox/dwio/parquet/reader/ParquetData.cpp @@ -17,15 +17,62 @@ #include "velox/dwio/parquet/reader/ParquetData.h" #include "velox/dwio/common/BufferedInput.h" +#include "velox/dwio/parquet/common/ParquetBloomFilter.h" #include "velox/dwio/parquet/reader/ParquetStatsContext.h" namespace facebook::velox::parquet { +using thrift::RowGroup; + +namespace { +bool isFilterRangeCoversStatsRange( + common::Filter* filter, + dwio::common::ColumnStatistics* stats, + const TypePtr& type) { + switch (type->kind()) { + case TypeKind::BIGINT: + case TypeKind::INTEGER: + case TypeKind::SMALLINT: + case TypeKind::TINYINT: { + auto intStats = + dynamic_cast(stats); + if (!intStats) + return false; + + int64_t min = + intStats->getMinimum().value_or(std::numeric_limits::min()); + int64_t max = + intStats->getMaximum().value_or(std::numeric_limits::max()); + + switch (filter->kind()) { + case common::FilterKind::kBigintRange: + return static_cast(filter)->lower() <= min && + max <= static_cast(filter)->upper(); + case common::FilterKind::kBigintMultiRange: { + common::BigintMultiRange* multiRangeFilter = + static_cast(filter); + auto numRanges = multiRangeFilter->ranges().size(); + if (numRanges > 0) { + return multiRangeFilter->ranges()[0]->lower() <= min && + max <= multiRangeFilter->ranges()[numRanges - 1]->upper(); + } + } break; + default: + return false; + } + } break; + default: + return false; + } + return false; +} +} // namespace + std::unique_ptr ParquetParams::toFormatData( const std::shared_ptr& type, const common::ScanSpec& /*scanSpec*/) { return std::make_unique( - type, metaData_, pool(), sessionTimezone_); + type, metaData_, pool(), sessionTimezone_, parquetReadBloomFilter_); } void ParquetData::filterRowGroups( @@ -47,6 +94,7 @@ void ParquetData::filterRowGroups( result.filterResult.resize(nwords); } auto metadataFiltersStartIndex = result.metadataFilterResults.size(); + for (int i = 0; i < scanSpec.numMetadataFilters(); ++i) { result.metadataFilterResults.emplace_back( scanSpec.metadataFilterNodeAt(i), std::vector(nwords)); @@ -80,12 +128,31 @@ bool ParquetData::rowGroupMatches(uint32_t rowGroupId, common::Filter* filter) { return true; } + bool needsToCheckBloomFilter = true; auto columnChunk = rowGroup.columnChunk(column); if (columnChunk.hasStatistics()) { auto columnStats = columnChunk.getColumnStatistics(type, rowGroup.numRows()); - return testFilter(filter, columnStats.get(), rowGroup.numRows(), type); + if (!testFilter(filter, columnStats.get(), rowGroup.numRows(), type)) { + return false; + } + + // We can avoid testing bloom filter unnecessarily if we know that the + // filter (min,max) range is a superset of the stats (min,max) range. For + // example, if the filter is "COL between 1 and 20" and the column stats + // range is (5,10), then we have to read the whole row group and hence avoid + // bloom filter test. + needsToCheckBloomFilter = parquetReadBloomFilter_ && + !isFilterRangeCoversStatsRange(filter, columnStats.get(), type); + } + + if (needsToCheckBloomFilter && + rowGroup.columnChunk(column).hasBloomFilterOffset()) { + std::unique_ptr parquetBloomFilter = + std::make_unique(getBloomFilter(rowGroupId)); + return filter->testBloomFilter(*parquetBloomFilter, *type); } + return true; } @@ -148,4 +215,62 @@ std::pair ParquetData::getRowGroupRegion( return {fileOffset, length}; } +void ParquetData::setBloomFilterInputStream( + uint32_t rowGroupId, + dwio::common::BufferedInput& bufferedInput) { + bloomFilterInputStreams_.resize(fileMetaDataPtr_.numRowGroups()); + if (bloomFilterInputStreams_[rowGroupId] != nullptr) { + return; + } + auto rowGroup = fileMetaDataPtr_.rowGroup(rowGroupId); + auto colChunk = rowGroup.columnChunk(type_->column()); + + if (!colChunk.hasBloomFilterOffset()) { + return; + } + + VELOX_CHECK( + !colChunk.hasCryptoMetadata(), "Cannot read encrypted bloom filter yet"); + + auto bloomFilterOffset = colChunk.bloomFilterOffset(); + auto fileSize = bufferedInput.getInputStream()->getLength(); + VELOX_CHECK_GT( + fileSize, + bloomFilterOffset, + "file size {} less or equal than bloom offset {}", + fileSize, + bloomFilterOffset); + + auto id = dwio::common::StreamIdentifier(type_->column()); + bloomFilterInputStreams_[rowGroupId] = bufferedInput.enqueue( + {static_cast(bloomFilterOffset), fileSize - bloomFilterOffset}, + &id); +} + +std::shared_ptr ParquetData::getBloomFilter( + const uint32_t rowGroupId) { + auto columnBloomFilterIter = columnBloomFilterMap_.find(rowGroupId); + if (columnBloomFilterIter != columnBloomFilterMap_.end()) { + return columnBloomFilterIter->second; + } + + VELOX_CHECK_LT( + rowGroupId, + fileMetaDataPtr_.numRowGroups(), + "Invalid row group ordinal: {}", + rowGroupId); + + if (bloomFilterInputStreams_[rowGroupId] == nullptr) { + return nullptr; + } + + auto bloomFilter = BlockSplitBloomFilter::deserialize( + bloomFilterInputStreams_[rowGroupId].get(), pool_); + + auto blockSplitBloomFilter = + std::make_shared(std::move(bloomFilter)); + columnBloomFilterMap_[rowGroupId] = blockSplitBloomFilter; + return blockSplitBloomFilter; +} + } // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/reader/ParquetData.h b/velox/dwio/parquet/reader/ParquetData.h index fe8020f57c65..d5cab43a5dfd 100644 --- a/velox/dwio/parquet/reader/ParquetData.h +++ b/velox/dwio/parquet/reader/ParquetData.h @@ -17,6 +17,9 @@ #pragma once #include "velox/dwio/common/BufferUtil.h" +#include "velox/dwio/common/BufferedInput.h" +#include "velox/dwio/common/ScanSpec.h" +#include "velox/dwio/parquet/common/BloomFilter.h" #include "velox/dwio/parquet/reader/Metadata.h" #include "velox/dwio/parquet/reader/PageReader.h" @@ -37,11 +40,14 @@ class ParquetParams : public dwio::common::FormatParams { dwio::common::ColumnReaderStatistics& stats, const FileMetaDataPtr metaData, const tz::TimeZone* sessionTimezone, - TimestampPrecision timestampPrecision) + TimestampPrecision timestampPrecision, + bool parquetReadBloomFilter) : FormatParams(pool, stats), metaData_(metaData), sessionTimezone_(sessionTimezone), - timestampPrecision_(timestampPrecision) {} + timestampPrecision_(timestampPrecision), + parquetReadBloomFilter_(parquetReadBloomFilter) {} + std::unique_ptr toFormatData( const std::shared_ptr& type, const common::ScanSpec& scanSpec) override; @@ -54,6 +60,7 @@ class ParquetParams : public dwio::common::FormatParams { const FileMetaDataPtr metaData_; const tz::TimeZone* sessionTimezone_; const TimestampPrecision timestampPrecision_; + bool parquetReadBloomFilter_; }; /// Format-specific data created for each leaf column of a Parquet rowgroup. @@ -63,14 +70,16 @@ class ParquetData : public dwio::common::FormatData { const std::shared_ptr& type, const FileMetaDataPtr fileMetadataPtr, memory::MemoryPool& pool, - const tz::TimeZone* sessionTimezone) + const tz::TimeZone* sessionTimezone, + bool parquetReadBloomFilter) : pool_(pool), type_(std::static_pointer_cast(type)), fileMetaDataPtr_(fileMetadataPtr), maxDefine_(type_->maxDefine_), maxRepeat_(type_->maxRepeat_), rowsInRowGroup_(-1), - sessionTimezone_(sessionTimezone) {} + sessionTimezone_(sessionTimezone), + parquetReadBloomFilter_(parquetReadBloomFilter) {} /// Prepares to read data for 'index'th row group. void enqueueRowGroup(uint32_t index, dwio::common::BufferedInput& input); @@ -90,6 +99,8 @@ class ParquetData : public dwio::common::FormatData { return reader_.get(); } + std::shared_ptr getBloomFilter(const uint32_t rowGroupId); + // Reads null flags for 'numValues' next top level rows. The first 'numValues' // bits of 'nulls' are set and the reader is advanced by numValues'. void readNullsOnly(int32_t numValues, BufferPtr& nulls) { @@ -200,6 +211,10 @@ class ParquetData : public dwio::common::FormatData { return true; } + void setBloomFilterInputStream( + uint32_t rowGroupId, + dwio::common::BufferedInput& bufferedInput); + // Returns the of the row group. std::pair getRowGroupRegion(uint32_t index) const; @@ -222,6 +237,14 @@ class ParquetData : public dwio::common::FormatData { const tz::TimeZone* sessionTimezone_; std::unique_ptr reader_; + bool parquetReadBloomFilter_; + std::vector> + bloomFilterInputStreams_; + + // RowGroup+Column to BloomFilter map + std::unordered_map> + columnBloomFilterMap_; + // Nulls derived from leaf repdefs for non-leaf readers. BufferPtr presetNulls_; @@ -231,5 +254,4 @@ class ParquetData : public dwio::common::FormatData { // Count of leading skipped positions in 'presetNulls_' int32_t presetNullsConsumed_{0}; }; - } // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/reader/ParquetReader.cpp b/velox/dwio/parquet/reader/ParquetReader.cpp index 35702028c75e..b8aaa07e8455 100644 --- a/velox/dwio/parquet/reader/ParquetReader.cpp +++ b/velox/dwio/parquet/reader/ParquetReader.cpp @@ -17,7 +17,8 @@ #include "velox/dwio/parquet/reader/ParquetReader.h" #include //@manual - +#include "velox/dwio/common/MetricsLog.h" +#include "velox/dwio/common/TypeUtils.h" #include "velox/dwio/parquet/reader/ParquetColumnReader.h" #include "velox/dwio/parquet/reader/StructColumnReader.h" #include "velox/dwio/parquet/thrift/ThriftTransport.h" @@ -87,6 +88,10 @@ class ReaderBase { return version_; } + bool isReadBloomFilter() const { + return options_.readBloomFilter(); + } + /// Ensures that streams are enqueued and loading for the row group at /// 'currentGroup'. May start loading one or more subsequent groups. void scheduleRowGroups( @@ -952,7 +957,8 @@ class ParquetRowReader::Impl { columnReaderStats_, readerBase_->fileMetaData(), readerBase->sessionTimezone(), - options_.timestampPrecision()); + options_.timestampPrecision(), + readerBase_->isReadBloomFilter()); requestedType_ = options_.requestedType() ? options_.requestedType() : readerBase_->schema(); columnReader_ = ParquetColumnReader::build( @@ -971,10 +977,23 @@ class ParquetRowReader::Impl { } } + const thrift::FileMetaData& fileMetaData() const { + return readerBase_->thriftFileMetaData(); + } + void filterRowGroups() { rowGroupIds_.reserve(rowGroups_.size()); firstRowOfRowGroup_.reserve(rowGroups_.size()); + for (auto child : columnReader_->children()) { + auto& parquetData = child->formatData().as(); + for (auto i = 0; i < rowGroups_.size(); ++i) { + parquetData.setBloomFilterInputStream(i, readerBase_->bufferedInput()); + } + } + + readerBase_->bufferedInput().load(dwio::common::LogType::STRIPE_FOOTER); + ParquetData::FilterRowGroupsResult res; columnReader_->filterRowGroups(0, parquetStatsContext_, res); if (auto& metadataFilter = options_.metadataFilter()) { @@ -1158,6 +1177,10 @@ std::optional ParquetRowReader::estimatedRowSize() const { return impl_->estimatedRowSize(); } +const thrift::FileMetaData& ParquetRowReader::fileMetaData() const { + return impl_->fileMetaData(); +} + ParquetReader::ParquetReader( std::unique_ptr input, const dwio::common::ReaderOptions& options) diff --git a/velox/dwio/parquet/reader/ParquetReader.h b/velox/dwio/parquet/reader/ParquetReader.h index de6d7a9966dc..fca8f043b33e 100644 --- a/velox/dwio/parquet/reader/ParquetReader.h +++ b/velox/dwio/parquet/reader/ParquetReader.h @@ -18,8 +18,10 @@ #include "velox/dwio/common/Reader.h" #include "velox/dwio/common/ReaderFactory.h" +#include "velox/dwio/parquet/common/BloomFilter.h" #include "velox/dwio/parquet/reader/Metadata.h" #include "velox/dwio/parquet/reader/ParquetStatsContext.h" +#include "velox/dwio/parquet/reader/ParquetTypeWithId.h" namespace facebook::velox::dwio::common { @@ -36,6 +38,8 @@ class StructColumnReader; class ReaderBase; +class BloomFilterReader; + /// Implements the RowReader interface for Parquet. class ParquetRowReader : public dwio::common::RowReader { public: @@ -60,6 +64,8 @@ class ParquetRowReader : public dwio::common::RowReader { std::optional estimatedRowSize() const override; + const thrift::FileMetaData& fileMetaData() const; + bool allPrefetchIssued() const override { // Allow opening the next split while this is reading. return true; diff --git a/velox/dwio/parquet/tests/ParquetTestBase.h b/velox/dwio/parquet/tests/ParquetTestBase.h index 9b04ad56b500..bd0317517e0c 100644 --- a/velox/dwio/parquet/tests/ParquetTestBase.h +++ b/velox/dwio/parquet/tests/ParquetTestBase.h @@ -33,6 +33,33 @@ namespace facebook::velox::parquet { class ParquetTestBase : public testing::Test, public velox::test::VectorTestBase { + public: + static dwio::common::RowReaderOptions getReaderOpts( + const RowTypePtr& rowType, + bool fileColumnNamesReadAsLowerCase = false) { + dwio::common::RowReaderOptions rowReaderOpts; + rowReaderOpts.select( + std::make_shared( + rowType, + rowType->names(), + nullptr, + fileColumnNamesReadAsLowerCase)); + + return rowReaderOpts; + } + + static std::string getExampleFilePath(const std::string& fileName) { + return test::getDataFilePath( + "velox/dwio/parquet/tests/reader", "../examples/" + fileName); + } + + static std::shared_ptr makeScanSpec( + const RowTypePtr& rowType) { + auto scanSpec = std::make_shared(""); + scanSpec->addAllChildFields(*rowType); + return scanSpec; + } + protected: static void SetUpTestCase() { memory::MemoryManager::testingSetInstance({}); @@ -70,27 +97,6 @@ class ParquetTestBase : public testing::Test, std::move(input), opts); } - dwio::common::RowReaderOptions getReaderOpts( - const RowTypePtr& rowType, - bool fileColumnNamesReadAsLowerCase = false) { - dwio::common::RowReaderOptions rowReaderOpts; - rowReaderOpts.select( - std::make_shared( - rowType, - rowType->names(), - nullptr, - fileColumnNamesReadAsLowerCase)); - - return rowReaderOpts; - } - - std::shared_ptr makeScanSpec( - const RowTypePtr& rowType) { - auto scanSpec = std::make_shared(""); - scanSpec->addAllChildFields(*rowType); - return scanSpec; - } - using FilterMap = std::unordered_map>; @@ -115,7 +121,7 @@ class ParquetTestBase : public testing::Test, memory::MemoryPool& memoryPool) { uint64_t total = 0; VectorPtr result = BaseVector::create(outputType, 0, &memoryPool); - while (total < expected->size()) { + do { auto part = reader.next(1000, result); if (part > 0) { assertEqualVectorPart(expected, result, total); @@ -123,7 +129,7 @@ class ParquetTestBase : public testing::Test, } else { break; } - } + } while (total < expected->size()); EXPECT_EQ(total, expected->size()); EXPECT_EQ(reader.next(1000, result), 0); } @@ -133,7 +139,9 @@ class ParquetTestBase : public testing::Test, const std::string& /* fileName */, const RowTypePtr& fileSchema, FilterMap filters, - const RowVectorPtr& expected) { + const RowVectorPtr& expected, + std::shared_ptr + runtimeStats = nullptr) { auto scanSpec = makeScanSpec(fileSchema); for (auto&& [column, filter] : filters) { scanSpec->getOrCreateChild(velox::common::Subfield(column)) @@ -145,6 +153,9 @@ class ParquetTestBase : public testing::Test, auto rowReader = reader->createRowReader(rowReaderOpts); assertReadWithReaderAndExpected( fileSchema, *rowReader, expected, *leafPool_); + if (runtimeStats != nullptr) { + rowReader->updateRuntimeStats(*runtimeStats); + } } std::unique_ptr createSink( @@ -186,11 +197,6 @@ class ParquetTestBase : public testing::Test, return batches; } - std::string getExampleFilePath(const std::string& fileName) { - return test::getDataFilePath( - "velox/dwio/parquet/tests/reader", "../examples/" + fileName); - } - static constexpr uint64_t kRowsInRowGroup = 10'000; static constexpr uint64_t kBytesInRowGroup = 128 * 1'024 * 1'024; std::shared_ptr rootPool_; diff --git a/velox/dwio/parquet/tests/ParquetTpchTest.cpp b/velox/dwio/parquet/tests/ParquetTpchTest.cpp index 300b17b8ac2e..221ee8b23cfa 100644 --- a/velox/dwio/parquet/tests/ParquetTpchTest.cpp +++ b/velox/dwio/parquet/tests/ParquetTpchTest.cpp @@ -51,8 +51,8 @@ class ParquetTpchTest : public testing::Test { filesystems::registerLocalFileSystem(); dwio::common::registerFileSinks(); - parquet::registerParquetReaderFactory(); - parquet::registerParquetWriterFactory(); + facebook::velox::parquet::registerParquetReaderFactory(); + facebook::velox::parquet::registerParquetWriterFactory(); connector::registerConnectorFactory( std::make_shared()); @@ -87,8 +87,8 @@ class ParquetTpchTest : public testing::Test { connector::tpch::TpchConnectorFactory::kTpchConnectorName); connector::unregisterConnector(kHiveConnectorId); connector::unregisterConnector(kTpchConnectorId); - parquet::unregisterParquetReaderFactory(); - parquet::unregisterParquetWriterFactory(); + facebook::velox::parquet::unregisterParquetReaderFactory(); + facebook::velox::parquet::unregisterParquetWriterFactory(); } static void saveTpchTablesAsParquet() { diff --git a/velox/dwio/parquet/tests/examples/sample_int64_string_int32_bloom_1k.snappy.parquet b/velox/dwio/parquet/tests/examples/sample_int64_string_int32_bloom_1k.snappy.parquet new file mode 100644 index 000000000000..8e6beac8df0b Binary files /dev/null and b/velox/dwio/parquet/tests/examples/sample_int64_string_int32_bloom_1k.snappy.parquet differ diff --git a/velox/dwio/parquet/tests/reader/BloomFilterTest.cpp b/velox/dwio/parquet/tests/reader/BloomFilterTest.cpp index 91ba224d0c81..5ddbdc8b432e 100644 --- a/velox/dwio/parquet/tests/reader/BloomFilterTest.cpp +++ b/velox/dwio/parquet/tests/reader/BloomFilterTest.cpp @@ -76,32 +76,32 @@ TEST_F(BloomFilterTest, BasicTest) { // Empty bloom filter deterministically returns false for (const auto v : kIntInserts) { - EXPECT_FALSE(bloomFilter.findHash(bloomFilter.hash(v))); + EXPECT_FALSE(bloomFilter.findHash(bloomFilter.hashInt64(v))); } for (const auto v : kFloatInserts) { - EXPECT_FALSE(bloomFilter.findHash(bloomFilter.hash(v))); + EXPECT_FALSE(bloomFilter.findHash(bloomFilter.hashFloat(v))); } // Insert all values for (const auto v : kIntInserts) { - bloomFilter.insertHash(bloomFilter.hash(v)); + bloomFilter.insertHash(bloomFilter.hashInt64(v)); } for (const auto v : kFloatInserts) { - bloomFilter.insertHash(bloomFilter.hash(v)); + bloomFilter.insertHash(bloomFilter.hashFloat(v)); } // They should always lookup successfully for (const auto v : kIntInserts) { - EXPECT_TRUE(bloomFilter.findHash(bloomFilter.hash(v))); + EXPECT_TRUE(bloomFilter.findHash(bloomFilter.hashInt64(v))); } for (const auto v : kFloatInserts) { - EXPECT_TRUE(bloomFilter.findHash(bloomFilter.hash(v))); + EXPECT_TRUE(bloomFilter.findHash(bloomFilter.hashFloat(v))); } // Values not inserted in the filter should only rarely lookup successfully int falsePositives = 0; for (const auto v : kNegativeIntLookups) { - falsePositives += bloomFilter.findHash(bloomFilter.hash(v)); + falsePositives += bloomFilter.findHash(bloomFilter.hashInt64(v)); } // (this is a crude check, see FPPTest below for a more rigorous formula) EXPECT_LE(falsePositives, 2); @@ -130,14 +130,14 @@ TEST_F(BloomFilterTest, BasicTest) { // Lookup previously inserted values for (const auto v : kIntInserts) { - EXPECT_TRUE(deBloom.findHash(deBloom.hash(v))); + EXPECT_TRUE(deBloom.findHash(deBloom.hashInt64(v))); } for (const auto v : kFloatInserts) { - EXPECT_TRUE(deBloom.findHash(deBloom.hash(v))); + EXPECT_TRUE(deBloom.findHash(deBloom.hashFloat(v))); } falsePositives = 0; for (const auto v : kNegativeIntLookups) { - falsePositives += deBloom.findHash(deBloom.hash(v)); + falsePositives += deBloom.findHash(deBloom.hashInt64(v)); } EXPECT_LE(falsePositives, 2); } @@ -185,18 +185,18 @@ TEST_F(BloomFilterTest, FPPTest) { const ByteArray byte_array( 8, reinterpret_cast(tmp.c_str())); members.push_back(tmp); - bloomFilter.insertHash(bloomFilter.hash(&byte_array)); + bloomFilter.insertHash(bloomFilter.hashByteArray(&byte_array)); } for (int i = 0; i < totalCount; i++) { const ByteArray byte_array1( 8, reinterpret_cast(members[i].c_str())); - ASSERT_TRUE(bloomFilter.findHash(bloomFilter.hash(&byte_array1))); + ASSERT_TRUE(bloomFilter.findHash(bloomFilter.hashByteArray(&byte_array1))); std::string tmp = GetRandomString(7); const ByteArray byte_array2( 7, reinterpret_cast(tmp.c_str())); - if (bloomFilter.findHash(bloomFilter.hash(&byte_array2))) { + if (bloomFilter.findHash(bloomFilter.hashByteArray(&byte_array2))) { exist++; } } @@ -368,7 +368,8 @@ TEST_F(BloomFilterTest, XxHashTest) { auto hasherSeed0 = std::make_unique(); EXPECT_EQ( - HASHES_OF_LOOPING_BYTES_WITH_SEED_0[i], hasherSeed0->hash(&byteArray)) + HASHES_OF_LOOPING_BYTES_WITH_SEED_0[i], + hasherSeed0->hashByteArray(&byteArray)) << "Hash with seed 0 Error: " << i; } } @@ -386,7 +387,7 @@ TEST_F(BloomFilterTest, TestBloomFilterHashes) { auto hasherSeed0 = std::make_unique(); std::vector hashes; hashes.resize(kNumValues); - hasherSeed0->hashes( + hasherSeed0->hashesByteArray( byteArrayVector.data(), static_cast(byteArrayVector.size()), hashes.data()); diff --git a/velox/dwio/parquet/tests/reader/ParquetReaderTest.cpp b/velox/dwio/parquet/tests/reader/ParquetReaderTest.cpp index 79251e2b381f..7c3cd6498297 100644 --- a/velox/dwio/parquet/tests/reader/ParquetReaderTest.cpp +++ b/velox/dwio/parquet/tests/reader/ParquetReaderTest.cpp @@ -57,9 +57,27 @@ class ParquetReaderTest : public ParquetTestBase { const RowVectorPtr& expected) { const auto filePath(getExampleFilePath(fileName)); dwio::common::ReaderOptions readerOpts{leafPool_.get()}; + assertReadWithFilters( + fileName, fileSchema, std::move(filters), expected, readerOpts); + } + + void assertReadWithFilters( + const std::string& fileName, + const RowTypePtr& fileSchema, + FilterMap filters, + const RowVectorPtr& expected, + const facebook::velox::dwio::common::ReaderOptions& readerOpts, + std::shared_ptr + runtimeStats = nullptr) { + const auto filePath(getExampleFilePath(fileName)); auto reader = createReader(filePath, readerOpts); assertReadWithReaderAndFilters( - std::move(reader), fileName, fileSchema, std::move(filters), expected); + std::move(reader), + fileName, + fileSchema, + std::move(filters), + expected, + runtimeStats); } }; @@ -837,6 +855,335 @@ TEST_F(ParquetReaderTest, intMultipleFilters) { "int.parquet", intSchema(), std::move(filters), expected); } +TEST_F(ParquetReaderTest, bloomFilterBigint) { + std::string fileName = "sample_int64_string_int32_bloom_1k.snappy.parquet"; + // Using the below row from the parquet file for testing + // 7824166607706395581 | c74ddef8-b260-44f9-8889-752b0aafb2c1 | 607 + auto schema = ROW( + {"id", "i64", "uuid", "i32"}, {BIGINT(), BIGINT(), VARCHAR(), INTEGER()}); + auto expected = makeRowVector({ + makeFlatVector(std::vector{958}), + makeFlatVector(std::vector{7824166607706395581}), + makeFlatVector({"c74ddef8-b260-44f9-8889-752b0aafb2c1"}), + makeFlatVector(std::vector{607}), + }); + + facebook::velox::dwio::common::ReaderOptions readerOpts{leafPool_.get()}; + readerOpts.setReadBloomFilter(true); + + // Test equality filter + FilterMap bigintFilters; + bigintFilters.insert({"i64", exec::equal(7824166607706395581)}); + ParquetReaderTest::assertReadWithFilters( + fileName, schema, std::move(bigintFilters), expected, readerOpts); + + // Test IN filter with at least one value present in the column value. + // 607 is present in the column, 2000 and 4000 are not present. + bigintFilters.insert({"i64", exec::in({7824166607706395581, 2000, 4000})}); + assertReadWithFilters( + fileName, schema, std::move(bigintFilters), expected, readerOpts); + + readerOpts.setReadBloomFilter(false); + // Test IN filter with values not present in the column but are within the + // stats min/max range. With bloom filter disabled, no row groups should be + // skipped. + bigintFilters.insert( + {"i64", exec::in({7824166607706395582, 7824166607706395590})}); + std::shared_ptr runtimeStats = + std::make_shared(); + assertReadWithFilters( + fileName, + schema, + std::move(bigintFilters), + makeRowVector({}), + readerOpts, + runtimeStats); + EXPECT_EQ(runtimeStats->skippedStrides, 0); + + readerOpts.setReadBloomFilter(true); + + // Test IN filter with values not present in the column but are within the + // stats min/max range. With bloom filter enabled, row groups should be + // skipped. + bigintFilters.insert( + {"i64", exec::in({7824166607706395582, 7824166607706395590})}); + std::shared_ptr runtimeStats1 = + std::make_shared(); + assertReadWithFilters( + fileName, + schema, + std::move(bigintFilters), + makeRowVector({}), + readerOpts, + runtimeStats1); + EXPECT_EQ(runtimeStats1->skippedStrides, 1); + + // Test BETWEEN filter, at least one value in the range is present in the + // column. + bigintFilters.insert( + {"i64", exec::between(7824166607706395581, 7824166607706395582)}); + assertReadWithFilters( + fileName, schema, std::move(bigintFilters), expected, readerOpts); + + readerOpts.setReadBloomFilter(false); + // Test BETWEEN filter, None of the values in the range are present in the + // column but are within stats min/max range. With bloom filter disabled, no + // row groups should be skipped + bigintFilters.insert( + {"i64", exec::between(7824166607706395582, 7824166607706395590)}); + std::shared_ptr runtimeStats2 = + std::make_shared(); + assertReadWithFilters( + fileName, + schema, + std::move(bigintFilters), + makeRowVector({}), + readerOpts, + runtimeStats2); + EXPECT_EQ(runtimeStats2->skippedStrides, 0); + + readerOpts.setReadBloomFilter(true); + // Test BETWEEN filter, None of the values in the range are present in the + // column but are within stats min/max range. With bloom filter enabled, row + // groups should be skipped + bigintFilters.insert( + {"i64", exec::between(7824166607706395582, 7824166607706395590)}); + std::shared_ptr runtimeStats3 = + std::make_shared(); + assertReadWithFilters( + fileName, + schema, + std::move(bigintFilters), + makeRowVector({}), + readerOpts, + runtimeStats3); + EXPECT_EQ(runtimeStats3->skippedStrides, 1); +} + +TEST_F(ParquetReaderTest, bloomFilterString) { + std::string fileName = "sample_int64_string_int32_bloom_1k.snappy.parquet"; + // Using the below row from the parquet file for testing + // 7824166607706395581 | c74ddef8-b260-44f9-8889-752b0aafb2c1 | 607 + auto schema = ROW( + {"id", "i64", "uuid", "i32"}, {BIGINT(), BIGINT(), VARCHAR(), INTEGER()}); + auto expected = makeRowVector({ + makeFlatVector(std::vector{958}), + makeFlatVector(std::vector{7824166607706395581}), + makeFlatVector({"c74ddef8-b260-44f9-8889-752b0aafb2c1"}), + makeFlatVector(std::vector{607}), + }); + + facebook::velox::dwio::common::ReaderOptions readerOpts{leafPool_.get()}; + readerOpts.setReadBloomFilter(true); + + // Test equality filter + FilterMap stringFilters; + stringFilters.insert( + {"uuid", exec::equal("c74ddef8-b260-44f9-8889-752b0aafb2c1")}); + assertReadWithFilters( + fileName, schema, std::move(stringFilters), expected, readerOpts); + + // Test IN filter with at least one value present in the column + stringFilters.insert( + {"uuid", + exec::in( + {"c74ddef8-b260-44f9-8889-752b0aafb2c1", + "not_exists1", + "not_exists2"})}); + assertReadWithFilters( + fileName, schema, std::move(stringFilters), expected, readerOpts); + + readerOpts.setReadBloomFilter(false); + // Test IN filter with none of the values present in the column. + // With bloom filter disabled, no row groups should be skipped. + stringFilters.insert( + {"uuid", + exec::in( + {"c74ddef8-b260-44f9-8889-notexists", + "not_exists1", + "not_exists2"})}); + std::shared_ptr runtimeStats = + std::make_shared(); + assertReadWithFilters( + fileName, + schema, + std::move(stringFilters), + makeRowVector({}), + readerOpts, + runtimeStats); + EXPECT_EQ(runtimeStats->skippedStrides, 0); + + readerOpts.setReadBloomFilter(true); + // Test IN filter with none of the values present in the column. + // With bloom filter enabled, row groups should be skipped. + stringFilters.insert( + {"uuid", + exec::in( + {"c74ddef8-b260-44f9-8889-notexists", + "not_exists1", + "not_exists2"})}); + std::shared_ptr runtimeStats1 = + std::make_shared(); + assertReadWithFilters( + fileName, + schema, + std::move(stringFilters), + makeRowVector({}), + readerOpts, + runtimeStats1); + EXPECT_EQ(runtimeStats1->skippedStrides, 1); + + // Test BETWEEN filter with lower and upper being same value. Bloom filter for + // string range is applicable only when lower = upper + stringFilters.insert( + {"uuid", + exec::between( + "c74ddef8-b260-44f9-8889-752b0aafb2c1", + "c74ddef8-b260-44f9-8889-752b0aafb2c1")}); + assertReadWithFilters( + fileName, schema, std::move(stringFilters), expected, readerOpts); + + // Test BETWEEN filter with lower = upper, value is not present in the column, + // but within stats min/max range.. With bloom filter disabled, no row groups + // should be skipped. + readerOpts.setReadBloomFilter(false); + stringFilters.insert( + {"uuid", + exec::between( + "c74ddef8-b260-44f9-8889-notb0aafb2c1", + "c74ddef8-b260-44f9-8889-notb0aafb2c1")}); + std::shared_ptr runtimeStats2 = + std::make_shared(); + assertReadWithFilters( + fileName, + schema, + std::move(stringFilters), + makeRowVector({}), + readerOpts, + runtimeStats2); + EXPECT_EQ(runtimeStats2->skippedStrides, 0); + + // Test BETWEEN filter with lower = upper, value is not present in the column, + // but within stats min/max range.. With bloom filter enabled, row groups + // should be skipped. + readerOpts.setReadBloomFilter(true); + stringFilters.insert( + {"uuid", + exec::between( + "c74ddef8-b260-44f9-8889-notb0aafb2c1", + "c74ddef8-b260-44f9-8889-notb0aafb2c1")}); + std::shared_ptr runtimeStats3 = + std::make_shared(); + assertReadWithFilters( + fileName, + schema, + std::move(stringFilters), + makeRowVector({}), + readerOpts, + runtimeStats3); + EXPECT_EQ(runtimeStats3->skippedStrides, 1); +} + +TEST_F(ParquetReaderTest, bloomFilterInteger) { + std::string fileName = "sample_int64_string_int32_bloom_1k.snappy.parquet"; + // Using the below row from the parquet file for testing + // 7824166607706395581 | c74ddef8-b260-44f9-8889-752b0aafb2c1 | 607 + auto schema = ROW( + {"id", "i64", "uuid", "i32"}, {BIGINT(), BIGINT(), VARCHAR(), INTEGER()}); + auto expected = makeRowVector({ + makeFlatVector(std::vector{958}), + makeFlatVector(std::vector{7824166607706395581}), + makeFlatVector({"c74ddef8-b260-44f9-8889-752b0aafb2c1"}), + makeFlatVector(std::vector{607}), + }); + + facebook::velox::dwio::common::ReaderOptions readerOpts{leafPool_.get()}; + readerOpts.setReadBloomFilter(true); + + FilterMap int32Filters; + // Test equality filter + int32Filters.insert({"i32", exec::equal(607)}); + assertReadWithFilters( + fileName, schema, std::move(int32Filters), expected, readerOpts); + + // Test IN filter with at least one value present in the column value. + // 607 is present in the column, 2000 and 4000 are not present. + int32Filters.insert({"i32", exec::in({607, 2000, 4000})}); + assertReadWithFilters( + fileName, schema, std::move(int32Filters), expected, readerOpts); + + readerOpts.setReadBloomFilter(false); + // Test IN filter with values not present in the column but are within the + // stats min/max range. With bloom filter disabled, no row groups should be + // skipped. + int32Filters.insert({"i32", exec::in({610, 4000})}); + std::shared_ptr runtimeStats = + std::make_shared(); + assertReadWithFilters( + fileName, + schema, + std::move(int32Filters), + makeRowVector({}), + readerOpts, + runtimeStats); + EXPECT_EQ(runtimeStats->skippedStrides, 0); + + readerOpts.setReadBloomFilter(true); + + // Test IN filter with values not present in the column but are within the + // stats min/max range. With bloom filter enabled, row groups should be + // skipped. + int32Filters.insert({"i32", exec::in({610, 4000})}); + std::shared_ptr runtimeStats1 = + std::make_shared(); + assertReadWithFilters( + fileName, + schema, + std::move(int32Filters), + makeRowVector({}), + readerOpts, + runtimeStats1); + EXPECT_EQ(runtimeStats1->skippedStrides, 1); + + // Test BETWEEN filter, at least one value in the range is present in the + // column. + int32Filters.insert({"i32", exec::between(607, 610)}); + assertReadWithFilters( + fileName, schema, std::move(int32Filters), expected, readerOpts); + + readerOpts.setReadBloomFilter(false); + // Test BETWEEN filter, None of the values in the range are present in the + // column but are within stats min/max range. With bloom filter disabled, no + // row groups should be skipped + int32Filters.insert({"i32", exec::between(1985, 1988)}); + std::shared_ptr runtimeStats2 = + std::make_shared(); + assertReadWithFilters( + fileName, + schema, + std::move(int32Filters), + makeRowVector({}), + readerOpts, + runtimeStats2); + EXPECT_EQ(runtimeStats2->skippedStrides, 0); + + readerOpts.setReadBloomFilter(true); + // Test BETWEEN filter, None of the values in the range are present in the + // column but are within stats min/max range. With bloom filter enabled, row + // groups should be skipped + int32Filters.insert({"i32", exec::between(1985, 1988)}); + std::shared_ptr runtimeStats3 = + std::make_shared(); + assertReadWithFilters( + fileName, + schema, + std::move(int32Filters), + makeRowVector({}), + readerOpts, + runtimeStats3); + EXPECT_EQ(runtimeStats3->skippedStrides, 1); +} + TEST_F(ParquetReaderTest, doubleFilters) { // Read sample.parquet with the double filter "b < 10.0". FilterMap filters; diff --git a/velox/type/Filter.cpp b/velox/type/Filter.cpp index d703946c6807..b556e096dc83 100644 --- a/velox/type/Filter.cpp +++ b/velox/type/Filter.cpp @@ -280,6 +280,35 @@ bool BigintRange::testingEquals(const Filter& other) const { (upper_ == otherBigintRange->upper_); } +bool BigintRange::testBloomFilter( + const AbstractBloomFilter& bloomFilter, + const velox::Type& type) const { + // Don't test bloom filter for a wider range + if ((upper_ - lower_) > kMaxBloomFilterChecks) + return true; + + switch (type.kind()) { + case TypeKind::INTEGER: + for (int32_t val = lower32_; val <= upper32_; ++val) { + if (bloomFilter.mightContainInt32(val)) { + return true; + } + } + break; + case TypeKind::BIGINT: + for (int64_t val = lower_; val <= upper_; ++val) { + if (bloomFilter.mightContainInt64(val)) { + return true; + } + } + break; + default: + return true; + } + + return false; +} + folly::dynamic NegatedBigintRange::serialize() const { auto obj = Filter::serializeBase("NegatedBigintRange"); obj["lower"] = nonNegated_->lower(); @@ -1846,6 +1875,7 @@ std::unique_ptr BigintRange::mergeWith(const Filter* other) const { case FilterKind::kNegatedBigintValuesUsingBitmask: case FilterKind::kNegatedBigintValuesUsingHashTable: { bool bothNullAllowed = nullAllowed_ && other->testNull(); + bool unused = false; if (!other->testInt64Range(lower_, upper_, false)) { return nullOrFalse(bothNullAllowed); } diff --git a/velox/type/Filter.h b/velox/type/Filter.h index f46b26c5f12a..1f95c8ff056c 100644 --- a/velox/type/Filter.h +++ b/velox/type/Filter.h @@ -60,6 +60,7 @@ enum class FilterKind { }; class Filter; +class AbstractBloomFilter; using FilterPtr = std::unique_ptr; using SubfieldFilters = std::unordered_map>; @@ -73,6 +74,8 @@ class Filter : public velox::ISerializable { Filter(bool deterministic, bool nullAllowed, FilterKind kind) : nullAllowed_(nullAllowed), deterministic_(deterministic), kind_(kind) {} + static constexpr int kMaxBloomFilterChecks = 10; + public: virtual ~Filter() = default; @@ -83,6 +86,12 @@ class Filter : public velox::ISerializable { // runtime. static constexpr bool deterministic = true; + virtual bool testBloomFilter( + const AbstractBloomFilter& bloomFilter, + const velox::Type& type) const { + return true; + } + FilterKind kind() const { return kind_; } @@ -293,6 +302,14 @@ class Filter : public velox::ISerializable { } }; +class AbstractBloomFilter { + public: + virtual bool mightContainInt32(int32_t value) const = 0; + virtual bool mightContainInt64(int64_t value) const = 0; + virtual bool mightContainString(const std::string& value) const = 0; + virtual ~AbstractBloomFilter() = default; +}; + /// TODO Check if this filter is needed. This should not be passed down. class AlwaysFalse final : public Filter { public: @@ -780,6 +797,10 @@ class BigintRange final : public Filter { return !(min > upper_ || max < lower_); } + bool testBloomFilter( + const AbstractBloomFilter& bloomFilter, + const velox::Type& type) const override; + int64_t lower() const { return lower_; } @@ -983,6 +1004,30 @@ class BigintValuesUsingHashTable final : public Filter { } } + bool testBloomFilter( + const AbstractBloomFilter& bloomFilter, + const velox::Type& type) const override { + // Limit checks to IN-list with upto 10 values. + if (values_.size() > kMaxBloomFilterChecks) { + return true; + } + // For IN-list, if any value matches, return true. + for (auto i = 0; i < values_.size() && i < 10; ++i) { + if (type.kind() == TypeKind::INTEGER) { + int32_t val = values_[i]; + if (bloomFilter.mightContainInt32(val)) { + return true; + } + } else if ( + type.kind() == TypeKind::BIGINT && + bloomFilter.mightContainInt64(values_[i])) { + return true; + } + } + + return false; + } + bool testInt64(int64_t value) const final; xsimd::batch_bool testValues(xsimd::batch) const final; xsimd::batch_bool testValues(xsimd::batch) const final; @@ -1123,6 +1168,31 @@ class BigintValuesUsingBitmask final : public Filter { bool testingEquals(const Filter& other) const final; + bool testBloomFilter( + const AbstractBloomFilter& bloomFilter, + const velox::Type& type) const override { + // Limit checks to only maxBloomFilterChecks values + if (bitmask_.size() > kMaxBloomFilterChecks) { + return true; + } + + for (int i = 0; i < bitmask_.size(); i++) { + if (bitmask_[i]) { + int64_t val = min_ + i; + // For IN-list, if any value matches, return true. + if (type.kind() == TypeKind::INTEGER && + bloomFilter.mightContainInt32((int32_t)val)) { + return true; + } else if ( + type.kind() == TypeKind::BIGINT && + bloomFilter.mightContainInt64(val)) { + return true; + } + } + } + return false; + } + private: std::unique_ptr mergeWith(int64_t min, int64_t max, const Filter* other) const; @@ -1660,6 +1730,15 @@ class BytesRange final : public AbstractRange { } } + bool testBloomFilter( + const AbstractBloomFilter& bloomFilter, + const velox::Type& type) const override { + if (!singleValue_) { + return true; + } + return bloomFilter.mightContainString(lower_); + } + std::string toString() const override { return fmt::format( "BytesRange: {}{}, {}{} {}", @@ -1944,6 +2023,23 @@ class BytesValues final : public Filter { } } + bool testBloomFilter( + const AbstractBloomFilter& bloomFilter, + const velox::Type& type) const override { + // For IN-list, if any value matches, return true. + // Limit checks to IN-list with only 10 values. + if (values_.size() > kMaxBloomFilterChecks) { + return true; + } + + for (auto it = values_.begin(); it != values_.end(); ++it) { + if (bloomFilter.mightContainString(*it)) { + return true; + } + } + return false; + } + bool testLength(int32_t length) const final { return lengths_.contains(length); } @@ -1994,6 +2090,29 @@ class BigintMultiRange final : public Filter { std::unique_ptr clone( std::optional nullAllowed = std::nullopt) const final; + bool testBloomFilter( + const AbstractBloomFilter& bloomFilter, + const velox::Type& type) const override { + int numRange = 0; + // Limit number of values to check to maxBloomFilterChecks. + std::for_each( + ranges_.begin(), + ranges_.end(), + [this, &numRange](const std::unique_ptr& range) { + numRange = numRange + range->upper() - range->lower() + 1; + }); + if (numRange > kMaxBloomFilterChecks) { + return true; + } + + for (auto& range : ranges_) { + if (range->testBloomFilter(bloomFilter, type)) { + return true; + } + } + return false; + } + bool testInt64(int64_t value) const final; bool testInt64Range(int64_t min, int64_t max, bool hasNull) const final;