Skip to content

Commit 5cafa5d

Browse files
committed
Add parquet bloom filter read support for int,bigint,string columns
1 parent 25a19c9 commit 5cafa5d

33 files changed

+873
-65
lines changed
+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include <cstdint>
20+
#include <string>
21+
22+
class AbstractBloomFilter {
23+
public:
24+
virtual bool mightContain(int32_t value) const = 0;
25+
virtual bool mightContain(int64_t value) const = 0;
26+
virtual bool mightContain(const std::string& value) const = 0;
27+
virtual ~AbstractBloomFilter() = default;
28+
};

velox/common/base/BitUtil.h

+4
Original file line numberDiff line numberDiff line change
@@ -998,6 +998,10 @@ void storeBitsToByte(uint8_t bits, uint8_t* bytes, unsigned index) {
998998
}
999999
}
10001000

1001+
constexpr bool IsMultipleOf8(int64_t n) {
1002+
return (n & 7) == 0;
1003+
}
1004+
10011005
} // namespace bits
10021006
} // namespace velox
10031007
} // namespace facebook

velox/connectors/hive/HiveConfig.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,10 @@ bool HiveConfig::isFileColumnNamesReadAsLowerCase(
168168
config_->get<bool>(kFileColumnNamesReadAsLowerCase, false));
169169
}
170170

171+
bool HiveConfig::isParquetReadBloomFilter(const config::ConfigBase* session) {
172+
return session->get<bool>(kParquetReadBloomFilter, false);
173+
}
174+
171175
bool HiveConfig::isPartitionPathAsLowerCase(
172176
const config::ConfigBase* session) const {
173177
return session->get<bool>(kPartitionPathAsLowerCaseSession, true);

velox/connectors/hive/HiveConfig.h

+5
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,9 @@ class HiveConfig {
126126
static constexpr const char* kOrcUseColumnNamesSession =
127127
"hive_orc_use_column_names";
128128

129+
static constexpr const char* kParquetReadBloomFilter =
130+
"hive.parquet.read-bloom-filter";
131+
129132
/// Reads the source file column name as lower case.
130133
static constexpr const char* kFileColumnNamesReadAsLowerCase =
131134
"file-column-names-read-as-lower-case";
@@ -295,6 +298,8 @@ class HiveConfig {
295298
bool isFileColumnNamesReadAsLowerCase(
296299
const config::ConfigBase* session) const;
297300

301+
bool isParquetReadBloomFilter(const config::ConfigBase* session);
302+
298303
bool isPartitionPathAsLowerCase(const config::ConfigBase* session) const;
299304

300305
bool allowNullPartitionKeys(const config::ConfigBase* session) const;

velox/connectors/hive/HiveConnector.cpp

+15
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,21 @@ std::unique_ptr<DataSource> HiveConnector::createDataSource(
7373
std::string,
7474
std::shared_ptr<connector::ColumnHandle>>& columnHandles,
7575
ConnectorQueryCtx* connectorQueryCtx) {
76+
dwio::common::ReaderOptions options(connectorQueryCtx->memoryPool());
77+
options.setMaxCoalesceBytes(hiveConfig_->maxCoalescedBytes());
78+
options.setMaxCoalesceDistance(hiveConfig_->maxCoalescedDistanceBytes());
79+
options.setPrefetchRowGroups(hiveConfig_->prefetchRowGroups());
80+
options.setLoadQuantum(hiveConfig_->loadQuantum());
81+
options.setFileColumnNamesReadAsLowerCase(
82+
hiveConfig_->isFileColumnNamesReadAsLowerCase(
83+
connectorQueryCtx->sessionProperties()));
84+
options.setUseColumnNamesForColumnMapping(
85+
hiveConfig_->isOrcUseColumnNames(connectorQueryCtx->sessionProperties()));
86+
options.setFooterEstimatedSize(hiveConfig_->footerEstimatedSize());
87+
options.setFilePreloadThreshold(hiveConfig_->filePreloadThreshold());
88+
options.setParquetReadBloomFilter(hiveConfig_->isParquetReadBloomFilter(
89+
connectorQueryCtx->sessionProperties()));
90+
7691
return std::make_unique<HiveDataSource>(
7792
outputType,
7893
tableHandle,

velox/dwio/common/FormatData.h

+1
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ class FormatData {
104104
/// file.
105105
virtual void filterRowGroups(
106106
const velox::common::ScanSpec& scanSpec,
107+
dwio::common::BufferedInput& bufferedInput,
107108
uint64_t rowsPerRowGroup,
108109
const StatsContext& writerContext,
109110
FilterRowGroupsResult&) = 0;

velox/dwio/common/Options.h

+10
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,11 @@ class ReaderOptions : public io::ReaderOptions {
492492
return *this;
493493
}
494494

495+
ReaderOptions& setParquetReadBloomFilter(bool flag) {
496+
parquetReadBloomFilter_ = flag;
497+
return *this;
498+
}
499+
495500
ReaderOptions& setIOExecutor(std::shared_ptr<folly::Executor> executor) {
496501
ioExecutor_ = std::move(executor);
497502
return *this;
@@ -553,6 +558,10 @@ class ReaderOptions : public io::ReaderOptions {
553558
return useColumnNamesForColumnMapping_;
554559
}
555560

561+
bool parquetReadBloomFilter() const {
562+
return parquetReadBloomFilter_;
563+
}
564+
556565
const std::shared_ptr<random::RandomSkipTracker>& randomSkip() const {
557566
return randomSkip_;
558567
}
@@ -587,6 +596,7 @@ class ReaderOptions : public io::ReaderOptions {
587596
uint64_t filePreloadThreshold_{kDefaultFilePreloadThreshold};
588597
bool fileColumnNamesReadAsLowerCase_{false};
589598
bool useColumnNamesForColumnMapping_{false};
599+
bool parquetReadBloomFilter_{false};
590600
std::shared_ptr<folly::Executor> ioExecutor_;
591601
std::shared_ptr<random::RandomSkipTracker> randomSkip_;
592602
std::shared_ptr<velox::common::ScanSpec> scanSpec_;

velox/dwio/common/SelectiveColumnReader.cpp

+4-2
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,10 @@ SelectiveColumnReader::SelectiveColumnReader(
5555
void SelectiveColumnReader::filterRowGroups(
5656
uint64_t rowGroupSize,
5757
const dwio::common::StatsContext& context,
58-
FormatData::FilterRowGroupsResult& result) const {
59-
formatData_->filterRowGroups(*scanSpec_, rowGroupSize, context, result);
58+
FormatData::FilterRowGroupsResult& result,
59+
dwio::common::BufferedInput& bufferedInput) const {
60+
formatData_->filterRowGroups(
61+
*scanSpec_, bufferedInput, rowGroupSize, context, result);
6062
}
6163

6264
const std::vector<SelectiveColumnReader*>& SelectiveColumnReader::children()

velox/dwio/common/SelectiveColumnReader.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "velox/common/memory/Memory.h"
2020
#include "velox/common/process/ProcessBase.h"
2121
#include "velox/common/process/TraceHistory.h"
22+
#include "velox/dwio/common/BufferedInput.h"
2223
#include "velox/dwio/common/FormatData.h"
2324
#include "velox/dwio/common/IntDecoder.h"
2425
#include "velox/dwio/common/Mutation.h"
@@ -389,7 +390,8 @@ class SelectiveColumnReader {
389390
virtual void filterRowGroups(
390391
uint64_t rowGroupSize,
391392
const dwio::common::StatsContext& context,
392-
FormatData::FilterRowGroupsResult&) const;
393+
FormatData::FilterRowGroupsResult&,
394+
BufferedInput& bufferedInput) const;
393395

394396
raw_vector<int32_t>& innerNonNullRows() {
395397
return innerNonNullRows_;

velox/dwio/common/SelectiveStructColumnReader.cpp

+5-3
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ namespace facebook::velox::dwio::common {
2424
void SelectiveStructColumnReaderBase::filterRowGroups(
2525
uint64_t rowGroupSize,
2626
const dwio::common::StatsContext& context,
27-
FormatData::FilterRowGroupsResult& result) const {
28-
SelectiveColumnReader::filterRowGroups(rowGroupSize, context, result);
27+
FormatData::FilterRowGroupsResult& result,
28+
dwio::common::BufferedInput& bufferedInput) const {
29+
SelectiveColumnReader::filterRowGroups(
30+
rowGroupSize, context, result, bufferedInput);
2931
for (const auto& child : children_) {
30-
child->filterRowGroups(rowGroupSize, context, result);
32+
child->filterRowGroups(rowGroupSize, context, result, bufferedInput);
3133
}
3234
}
3335

velox/dwio/common/SelectiveStructColumnReader.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader {
3838
void filterRowGroups(
3939
uint64_t rowGroupSize,
4040
const dwio::common::StatsContext& context,
41-
FormatData::FilterRowGroupsResult&) const override;
41+
FormatData::FilterRowGroupsResult&,
42+
dwio::common::BufferedInput& bufferedInput) const override;
4243

4344
void read(vector_size_t offset, RowSet rows, const uint64_t* incomingNulls)
4445
override;

velox/dwio/dwrf/reader/DwrfData.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ void DwrfData::readNulls(
138138

139139
void DwrfData::filterRowGroups(
140140
const common::ScanSpec& scanSpec,
141+
dwio::common::BufferedInput& bufferedInput,
141142
uint64_t rowGroupSize,
142143
const dwio::common::StatsContext& writerContext,
143144
FilterRowGroupsResult& result) {

velox/dwio/dwrf/reader/DwrfData.h

+2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#pragma once
1818

1919
#include "velox/common/memory/Memory.h"
20+
#include "velox/dwio/common/BufferedInput.h"
2021
#include "velox/dwio/common/FormatData.h"
2122
#include "velox/dwio/common/TypeWithId.h"
2223
#include "velox/dwio/common/compression/Compression.h"
@@ -52,6 +53,7 @@ class DwrfData : public dwio::common::FormatData {
5253

5354
void filterRowGroups(
5455
const common::ScanSpec& scanSpec,
56+
dwio::common::BufferedInput& bufferedInput,
5557
uint64_t rowsPerRowGroup,
5658
const dwio::common::StatsContext& writerContext,
5759
FilterRowGroupsResult&) override;

velox/dwio/dwrf/reader/DwrfReader.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -485,7 +485,8 @@ void DwrfRowReader::checkSkipStrides(uint64_t strideSize) {
485485
StatsContext context(
486486
getReader().getWriterName(), getReader().getWriterVersion());
487487
DwrfData::FilterRowGroupsResult res;
488-
getSelectiveColumnReader()->filterRowGroups(strideSize, context, res);
488+
getSelectiveColumnReader()->filterRowGroups(
489+
strideSize, context, res, readerBaseShared()->getBufferedInput());
489490
if (auto& metadataFilter = options_.getMetadataFilter()) {
490491
metadataFilter->eval(res.metadataFilterResults, res.filterResult);
491492
}

velox/dwio/parquet/common/CMakeLists.txt

+2-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
add_library(velox_dwio_native_parquet_common BloomFilter.cpp XxHasher.cpp)
15+
add_library(velox_dwio_native_parquet_common BloomFilter.cpp XxHasher.cpp
16+
ParquetBloomFilter.h)
1617

1718
target_link_libraries(
1819
velox_dwio_native_parquet_common
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include "velox/common/base/AbstractBloomFilter.h"
20+
#include "velox/dwio/parquet/common/BloomFilter.h"
21+
22+
namespace facebook::velox::parquet {
23+
24+
class ParquetBloomFilter final : public AbstractBloomFilter {
25+
public:
26+
ParquetBloomFilter(
27+
std::shared_ptr<facebook::velox::parquet::BloomFilter> bloomFilter)
28+
: bloomFilter_(bloomFilter) {}
29+
30+
bool mightContain(int32_t value) const override {
31+
return bloomFilter_->findHash(bloomFilter_->hash(value));
32+
}
33+
34+
bool mightContain(int64_t value) const override {
35+
return bloomFilter_->findHash(bloomFilter_->hash(value));
36+
}
37+
38+
bool mightContain(const std::string& value) const override {
39+
ByteArray byteArray{value};
40+
return bloomFilter_->findHash(bloomFilter_->hash(&byteArray));
41+
}
42+
43+
private:
44+
std::shared_ptr<facebook::velox::parquet::BloomFilter> bloomFilter_;
45+
};
46+
47+
} // namespace facebook::velox::parquet
48+

velox/dwio/parquet/reader/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ velox_link_libraries(
3232
velox_type
3333
velox_dwio_common
3434
velox_dwio_common_compression
35+
velox_dwio_native_parquet_common
3536
fmt::fmt
3637
arrow
3738
Snappy::snappy

velox/dwio/parquet/reader/Metadata.cpp

+14
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,15 @@ bool ColumnChunkMetaDataPtr::hasDictionaryPageOffset() const {
210210
thriftColumnChunkPtr(ptr_)->meta_data.__isset.dictionary_page_offset;
211211
}
212212

213+
bool ColumnChunkMetaDataPtr::hasBloomFilterOffset() const {
214+
return hasMetadata() &&
215+
thriftColumnChunkPtr(ptr_)->meta_data.__isset.bloom_filter_offset;
216+
}
217+
218+
bool ColumnChunkMetaDataPtr::hasCryptoMetadata() const {
219+
return thriftColumnChunkPtr(ptr_)->__isset.crypto_metadata;
220+
}
221+
213222
std::unique_ptr<dwio::common::ColumnStatistics>
214223
ColumnChunkMetaDataPtr::getColumnStatistics(
215224
const TypePtr type,
@@ -228,6 +237,11 @@ int64_t ColumnChunkMetaDataPtr::dictionaryPageOffset() const {
228237
return thriftColumnChunkPtr(ptr_)->meta_data.dictionary_page_offset;
229238
}
230239

240+
int64_t ColumnChunkMetaDataPtr::bloomFilterOffset() const {
241+
VELOX_CHECK(hasBloomFilterOffset());
242+
return thriftColumnChunkPtr(ptr_)->meta_data.bloom_filter_offset;
243+
}
244+
231245
common::CompressionKind ColumnChunkMetaDataPtr::compression() const {
232246
return thriftCodecToCompressionKind(
233247
thriftColumnChunkPtr(ptr_)->meta_data.codec);

velox/dwio/parquet/reader/Metadata.h

+8
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@ class ColumnChunkMetaDataPtr {
3737
/// Check the presence of the dictionary page offset in ColumnChunk metadata.
3838
bool hasDictionaryPageOffset() const;
3939

40+
// Check the presence of the bloom filter offset in ColumnChunk metadata
41+
bool hasBloomFilterOffset() const;
42+
43+
// Check the presence of crypto metadata in ColumnChunk metadata
44+
bool hasCryptoMetadata() const;
45+
4046
/// Return the ColumnChunk statistics.
4147
std::unique_ptr<dwio::common::ColumnStatistics> getColumnStatistics(
4248
const TypePtr type,
@@ -52,6 +58,8 @@ class ColumnChunkMetaDataPtr {
5258
/// Must check for its presence using hasDictionaryPageOffset().
5359
int64_t dictionaryPageOffset() const;
5460

61+
int64_t bloomFilterOffset() const;
62+
5563
/// The compression.
5664
common::CompressionKind compression() const;
5765

0 commit comments

Comments
 (0)