Skip to content

Commit

Permalink
Add parquet bloom filter read support for int,bigint,string columns
Browse files Browse the repository at this point in the history
  • Loading branch information
nmahadevuni committed Mar 6, 2025
1 parent 9a5946a commit bea99c9
Show file tree
Hide file tree
Showing 24 changed files with 936 additions and 147 deletions.
7 changes: 7 additions & 0 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ bool HiveConfig::isFileColumnNamesReadAsLowerCase(
config_->get<bool>(kFileColumnNamesReadAsLowerCase, false));
}

bool HiveConfig::isParquetReadBloomFilter(
const config::ConfigBase* session) const {
return session->get<bool>(
kParquetReadBloomFilterSession,
config_->get<bool>(kParquetReadBloomFilter, false));
}

bool HiveConfig::isPartitionPathAsLowerCase(
const config::ConfigBase* session) const {
return session->get<bool>(kPartitionPathAsLowerCaseSession, true);
Expand Down
9 changes: 9 additions & 0 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ class HiveConfig {
static constexpr const char* kParquetUseColumnNamesSession =
"parquet_use_column_names";

// Read bloom filters from parquet files to filter row groups.
static constexpr const char* kParquetReadBloomFilter =
"hive.parquet.read-bloom-filter";

static constexpr const char* kParquetReadBloomFilterSession =
"hive_parquet_read_bloom_filter";

/// Reads the source file column name as lower case.
static constexpr const char* kFileColumnNamesReadAsLowerCase =
"file-column-names-read-as-lower-case";
Expand Down Expand Up @@ -199,6 +206,8 @@ class HiveConfig {
bool isFileColumnNamesReadAsLowerCase(
const config::ConfigBase* session) const;

bool isParquetReadBloomFilter(const config::ConfigBase* session) const;

bool isPartitionPathAsLowerCase(const config::ConfigBase* session) const;

bool allowNullPartitionKeys(const config::ConfigBase* session) const;
Expand Down
5 changes: 5 additions & 0 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,11 @@ void configureReaderOptions(

readerOptions.setFileFormat(hiveSplit->fileFormat);
}

if (readerOptions.fileFormat() == dwio::common::FileFormat::PARQUET) {
readerOptions.setReadBloomFilter(
hiveConfig->isParquetReadBloomFilter(sessionProperties));
}
}

void configureRowReaderOptions(
Expand Down
4 changes: 3 additions & 1 deletion velox/connectors/hive/tests/HiveConfigTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ TEST(HiveConfigTest, defaultConfig) {
ASSERT_EQ(hiveConfig.gcsCredentialsPath(), "");
ASSERT_FALSE(hiveConfig.isOrcUseColumnNames(emptySession.get()));
ASSERT_FALSE(hiveConfig.isFileColumnNamesReadAsLowerCase(emptySession.get()));

ASSERT_FALSE(hiveConfig.isParquetReadBloomFilter(emptySession.get()));
ASSERT_EQ(hiveConfig.maxCoalescedBytes(emptySession.get()), 128 << 20);
ASSERT_EQ(
hiveConfig.maxCoalescedDistanceBytes(emptySession.get()), 512 << 10);
Expand All @@ -64,6 +64,7 @@ TEST(HiveConfigTest, overrideConfig) {
{HiveConfig::kGcsCredentialsPath, "hey"},
{HiveConfig::kOrcUseColumnNames, "true"},
{HiveConfig::kFileColumnNamesReadAsLowerCase, "true"},
{HiveConfig::kParquetReadBloomFilter, "true"},
{HiveConfig::kAllowNullPartitionKeys, "false"},
{HiveConfig::kMaxCoalescedBytes, "100"},
{HiveConfig::kMaxCoalescedDistance, "100kB"},
Expand Down Expand Up @@ -92,6 +93,7 @@ TEST(HiveConfigTest, overrideConfig) {
ASSERT_EQ(hiveConfig.maxCoalescedBytes(emptySession.get()), 100);
ASSERT_EQ(
hiveConfig.maxCoalescedDistanceBytes(emptySession.get()), 100 << 10);
ASSERT_TRUE(hiveConfig.isParquetReadBloomFilter(emptySession.get()));
ASSERT_EQ(hiveConfig.numCacheFileHandles(), 100);
ASSERT_FALSE(hiveConfig.isFileHandleCacheEnabled());
ASSERT_EQ(hiveConfig.sortWriterMaxOutputRows(emptySession.get()), 100);
Expand Down
10 changes: 10 additions & 0 deletions velox/dwio/common/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,11 @@ class ReaderOptions : public io::ReaderOptions {
return *this;
}

ReaderOptions& setReadBloomFilter(bool flag) {
readBloomFilter_ = flag;
return *this;
}

ReaderOptions& setIOExecutor(std::shared_ptr<folly::Executor> executor) {
ioExecutor_ = std::move(executor);
return *this;
Expand Down Expand Up @@ -567,6 +572,10 @@ class ReaderOptions : public io::ReaderOptions {
return useColumnNamesForColumnMapping_;
}

bool readBloomFilter() const {
return readBloomFilter_;
}

const std::shared_ptr<random::RandomSkipTracker>& randomSkip() const {
return randomSkip_;
}
Expand Down Expand Up @@ -609,6 +618,7 @@ class ReaderOptions : public io::ReaderOptions {
uint64_t filePreloadThreshold_{kDefaultFilePreloadThreshold};
bool fileColumnNamesReadAsLowerCase_{false};
bool useColumnNamesForColumnMapping_{false};
bool readBloomFilter_{false};
std::shared_ptr<folly::Executor> ioExecutor_;
std::shared_ptr<random::RandomSkipTracker> randomSkip_;
std::shared_ptr<velox::common::ScanSpec> scanSpec_;
Expand Down
83 changes: 36 additions & 47 deletions velox/dwio/parquet/common/BloomFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,31 +68,31 @@ class BloomFilter {
///
/// @param value the value to hash.
/// @return hash result.
virtual uint64_t hash(int32_t value) const = 0;
virtual uint64_t hashInt32(int32_t value) const = 0;

/// Compute hash for 64 bits value by using its plain encoding result.
///
/// @param value the value to hash.
/// @return hash result.
virtual uint64_t hash(int64_t value) const = 0;
virtual uint64_t hashInt64(int64_t value) const = 0;

/// Compute hash for float value by using its plain encoding result.
///
/// @param value the value to hash.
/// @return hash result.
virtual uint64_t hash(float value) const = 0;
virtual uint64_t hashFloat(float value) const = 0;

/// Compute hash for double value by using its plain encoding result.
///
/// @param value the value to hash.
/// @return hash result.
virtual uint64_t hash(double value) const = 0;
virtual uint64_t hashDouble(double value) const = 0;

/// Compute hash for bytearray by using its plain encoding result.
///
/// @param value the value to hash.
/// @return hash result.
virtual uint64_t hash(const ByteArray* value) const = 0;
virtual uint64_t hashByteArray(const ByteArray* value) const = 0;

/// Batch compute hashes for 32 bits values by using its plain encoding
/// result.
Expand All @@ -101,8 +101,8 @@ class BloomFilter {
/// @param num_values the number of values to hash.
/// @param hashes a pointer to the output hash values, its length should be
/// equal to num_values.
virtual void hashes(const int32_t* values, int numValues, uint64_t* hashes)
const = 0;
virtual void
hashesInt32(const int32_t* values, int numValues, uint64_t* hashes) const = 0;

/// Batch compute hashes for 64 bits values by using its plain encoding
/// result.
Expand All @@ -111,16 +111,16 @@ class BloomFilter {
/// @param num_values the number of values to hash.
/// @param hashes a pointer to the output hash values, its length should be
/// equal to num_values.
virtual void hashes(const int64_t* values, int numValues, uint64_t* hashes)
const = 0;
virtual void
hashesInt64(const int64_t* values, int numValues, uint64_t* hashes) const = 0;

/// Batch compute hashes for float values by using its plain encoding result.
///
/// @param values values a pointer to the values to hash.
/// @param num_values the number of values to hash.
/// @param hashes a pointer to the output hash values, its length should be
/// equal to num_values.
virtual void hashes(const float* values, int numValues, uint64_t* hashes)
virtual void hashesFloat(const float* values, int numValues, uint64_t* hashes)
const = 0;

/// Batch compute hashes for double values by using its plain encoding result.
Expand All @@ -129,8 +129,8 @@ class BloomFilter {
/// @param num_values the number of values to hash.
/// @param hashes a pointer to the output hash values, its length should be
/// equal to num_values.
virtual void hashes(const double* values, int numValues, uint64_t* hashes)
const = 0;
virtual void
hashesDouble(const double* values, int numValues, uint64_t* hashes) const = 0;

/// Batch compute hashes for bytearray values by using its plain encoding
/// result.
Expand All @@ -139,8 +139,10 @@ class BloomFilter {
/// @param num_values the number of values to hash.
/// @param hashes a pointer to the output hash values, its length should be
/// equal to num_values.
virtual void hashes(const ByteArray* values, int numValues, uint64_t* hashes)
const = 0;
virtual void hashesByteArray(
const ByteArray* values,
int numValues,
uint64_t* hashes) const = 0;

virtual ~BloomFilter() = default;

Expand Down Expand Up @@ -248,54 +250,41 @@ class BlockSplitBloomFilter : public BloomFilter {
return numBytes_;
}

uint64_t hash(int32_t value) const override {
return hasher_->hash(value);
uint64_t hashInt32(int32_t value) const override {
return hasher_->hashInt32(value);
}
uint64_t hash(int64_t value) const override {
return hasher_->hash(value);
uint64_t hashInt64(int64_t value) const override {
return hasher_->hashInt64(value);
}
uint64_t hash(float value) const override {
return hasher_->hash(value);
uint64_t hashFloat(float value) const override {
return hasher_->hashFloat(value);
}
uint64_t hash(double value) const override {
return hasher_->hash(value);
uint64_t hashDouble(double value) const override {
return hasher_->hashDouble(value);
}
uint64_t hash(const ByteArray* value) const override {
return hasher_->hash(value);
uint64_t hashByteArray(const ByteArray* value) const override {
return hasher_->hashByteArray(value);
}

void hashes(const int32_t* values, int numValues, uint64_t* hashes)
void hashesInt32(const int32_t* values, int numValues, uint64_t* hashes)
const override {
hasher_->hashes(values, numValues, hashes);
hasher_->hashesInt32(values, numValues, hashes);
}
void hashes(const int64_t* values, int numValues, uint64_t* hashes)
void hashesInt64(const int64_t* values, int numValues, uint64_t* hashes)
const override {
hasher_->hashes(values, numValues, hashes);
hasher_->hashesInt64(values, numValues, hashes);
}
void hashes(const float* values, int numValues, uint64_t* hashes)
void hashesFloat(const float* values, int numValues, uint64_t* hashes)
const override {
hasher_->hashes(values, numValues, hashes);
hasher_->hashesFloat(values, numValues, hashes);
}
void hashes(const double* values, int numValues, uint64_t* hashes)
void hashesDouble(const double* values, int numValues, uint64_t* hashes)
const override {
hasher_->hashes(values, numValues, hashes);
hasher_->hashesDouble(values, numValues, hashes);
}
void hashes(const ByteArray* values, int numValues, uint64_t* hashes)
void hashesByteArray(const ByteArray* values, int numValues, uint64_t* hashes)
const override {
hasher_->hashes(values, numValues, hashes);
}

uint64_t hash(const int32_t* value) const {
return hasher_->hash(*value);
}
uint64_t hash(const int64_t* value) const {
return hasher_->hash(*value);
}
uint64_t hash(const float* value) const {
return hasher_->hash(*value);
}
uint64_t hash(const double* value) const {
return hasher_->hash(*value);
hasher_->hashesByteArray(values, numValues, hashes);
}

/// Deserialize the Bloom filter from an input stream. It is used when
Expand Down
3 changes: 2 additions & 1 deletion velox/dwio/parquet/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ velox_add_library(
BloomFilter.cpp
XxHasher.cpp
LevelComparison.cpp
LevelConversion.cpp)
LevelConversion.cpp
ParquetBloomFilter.h)

velox_link_libraries(
velox_dwio_parquet_common
Expand Down
38 changes: 23 additions & 15 deletions velox/dwio/parquet/common/Hasher.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,31 +47,31 @@ class Hasher {
///
/// @param value the value to hash.
/// @return hash result.
virtual uint64_t hash(int32_t value) const = 0;
virtual uint64_t hashInt32(int32_t value) const = 0;

/// Compute hash for 64 bits value by using its plain encoding result.
///
/// @param value the value to hash.
/// @return hash result.
virtual uint64_t hash(int64_t value) const = 0;
virtual uint64_t hashInt64(int64_t value) const = 0;

/// Compute hash for float value by using its plain encoding result.
///
/// @param value the value to hash.
/// @return hash result.
virtual uint64_t hash(float value) const = 0;
virtual uint64_t hashFloat(float value) const = 0;

/// Compute hash for double value by using its plain encoding result.
///
/// @param value the value to hash.
/// @return hash result.
virtual uint64_t hash(double value) const = 0;
virtual uint64_t hashDouble(double value) const = 0;

/// Compute hash for ByteArray value by using its plain encoding result.
///
/// @param value the value to hash.
/// @return hash result.
virtual uint64_t hash(const ByteArray* value) const = 0;
virtual uint64_t hashByteArray(const ByteArray* value) const = 0;

/// Batch compute hashes for 32 bits values by using its plain encoding
/// result.
Expand All @@ -80,8 +80,10 @@ class Hasher {
/// @param num_values the number of values to hash.
/// @param hashes a pointer to the output hash values, its length should be
/// equal to num_values.
virtual void hashes(const int32_t* values, int num_values, uint64_t* hashes)
const = 0;
virtual void hashesInt32(
const int32_t* values,
int num_values,
uint64_t* hashes) const = 0;

/// Batch compute hashes for 64 bits values by using its plain encoding
/// result.
Expand All @@ -90,26 +92,30 @@ class Hasher {
/// @param num_values the number of values to hash.
/// @param hashes a pointer to the output hash values, its length should be
/// equal to num_values.
virtual void hashes(const int64_t* values, int num_values, uint64_t* hashes)
const = 0;
virtual void hashesInt64(
const int64_t* values,
int num_values,
uint64_t* hashes) const = 0;

/// Batch compute hashes for float values by using its plain encoding result.
///
/// @param values a pointer to the values to hash.
/// @param num_values the number of values to hash.
/// @param hashes a pointer to the output hash values, its length should be
/// equal to num_values.
virtual void hashes(const float* values, int num_values, uint64_t* hashes)
const = 0;
virtual void
hashesFloat(const float* values, int num_values, uint64_t* hashes) const = 0;

/// Batch compute hashes for double values by using its plain encoding result.
///
/// @param values a pointer to the values to hash.
/// @param num_values the number of values to hash.
/// @param hashes a pointer to the output hash values, its length should be
/// equal to num_values.
virtual void hashes(const double* values, int num_values, uint64_t* hashes)
const = 0;
virtual void hashesDouble(
const double* values,
int num_values,
uint64_t* hashes) const = 0;

/// Batch compute hashes for ByteArray values by using its plain encoding
/// result.
Expand All @@ -118,8 +124,10 @@ class Hasher {
/// @param num_values the number of values to hash.
/// @param hashes a pointer to the output hash values, its length should be
/// equal to num_values.
virtual void hashes(const ByteArray* values, int num_values, uint64_t* hashes)
const = 0;
virtual void hashesByteArray(
const ByteArray* values,
int num_values,
uint64_t* hashes) const = 0;

virtual ~Hasher() = default;
};
Expand Down
Loading

0 comments on commit bea99c9

Please sign in to comment.