Skip to content

Commit 98e57bf

Browse files
committed
Move arrow parquet bloom filter code to velox dwio
1 parent 1c538f2 commit 98e57bf

File tree

9 files changed

+1311
-4
lines changed

9 files changed

+1311
-4
lines changed

velox/dwio/parquet/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
if(VELOX_ENABLE_PARQUET)
1616
add_subdirectory(thrift)
17+
add_subdirectory(common)
1718
add_subdirectory(reader)
1819
add_subdirectory(writer)
1920

+247
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
// Adapted from Apache Arrow.
18+
19+
#include "BloomFilter.h"
20+
#include "XxHasher.h"
21+
#include "velox/dwio/parquet/thrift/ThriftTransport.h"
22+
23+
#include <thrift/protocol/TCompactProtocol.h>
24+
#include <thrift/transport/TBufferTransports.h>
25+
26+
#include <cstdint>
27+
#include <cstring>
28+
#include <memory>
29+
30+
namespace facebook::velox::parquet {
31+
32+
constexpr uint32_t BlockSplitBloomFilter::SALT[kBitsSetPerBlock];
33+
34+
BlockSplitBloomFilter::BlockSplitBloomFilter(memory::MemoryPool* pool)
35+
: pool_(pool),
36+
hashStrategy_(HashStrategy::XXHASH),
37+
algorithm_(Algorithm::BLOCK),
38+
compressionStrategy_(CompressionStrategy::UNCOMPRESSED) {}
39+
40+
void BlockSplitBloomFilter::init(uint32_t numBytes) {
41+
if (numBytes < kMinimumBloomFilterBytes) {
42+
numBytes = kMinimumBloomFilterBytes;
43+
}
44+
45+
// Get next power of 2 if it is not power of 2.
46+
if ((numBytes & (numBytes - 1)) != 0) {
47+
numBytes = static_cast<uint32_t>(bits::nextPowerOfTwo(numBytes));
48+
}
49+
50+
if (numBytes > kMaximumBloomFilterBytes) {
51+
numBytes = kMaximumBloomFilterBytes;
52+
}
53+
54+
numBytes_ = numBytes;
55+
data_ = AlignedBuffer::allocate<char>(numBytes_, pool_);
56+
memset(data_->asMutable<char>(), 0, numBytes_);
57+
58+
this->hasher_ = std::make_unique<XxHasher>();
59+
}
60+
61+
void BlockSplitBloomFilter::init(const uint8_t* bitset, uint32_t numBytes) {
62+
VELOX_CHECK(bitset != nullptr);
63+
64+
if (numBytes < kMinimumBloomFilterBytes ||
65+
numBytes > kMaximumBloomFilterBytes || (numBytes & (numBytes - 1)) != 0) {
66+
VELOX_FAIL("Given length of bitset is illegal");
67+
}
68+
69+
numBytes_ = numBytes;
70+
data_ = AlignedBuffer::allocate<char>(numBytes_, pool_);
71+
memcpy(data_->asMutable<char>(), bitset, numBytes_);
72+
73+
this->hasher_ = std::make_unique<XxHasher>();
74+
}
75+
76+
static void validateBloomFilterHeader(const thrift::BloomFilterHeader& header) {
77+
std::stringstream error;
78+
if (!header.algorithm.__isset.BLOCK) {
79+
error << "Unsupported Bloom filter algorithm: ";
80+
error << header.algorithm;
81+
VELOX_FAIL(error.str());
82+
}
83+
84+
if (!header.hash.__isset.XXHASH) {
85+
error << "Unsupported Bloom filter hash: ", error << header.hash;
86+
VELOX_FAIL(error.str());
87+
}
88+
89+
if (!header.compression.__isset.UNCOMPRESSED) {
90+
error << "Unsupported Bloom filter compression: ",
91+
error << header.compression;
92+
VELOX_FAIL(error.str());
93+
}
94+
95+
if (header.numBytes <= 0 ||
96+
static_cast<uint32_t>(header.numBytes) >
97+
BloomFilter::kMaximumBloomFilterBytes) {
98+
error << "Bloom filter size is incorrect: " << header.numBytes
99+
<< ". Must be in range (" << 0 << ", "
100+
<< BloomFilter::kMaximumBloomFilterBytes << "].";
101+
VELOX_FAIL(error.str());
102+
}
103+
}
104+
105+
BlockSplitBloomFilter BlockSplitBloomFilter::deserialize(
106+
dwio::common::SeekableInputStream* input,
107+
memory::MemoryPool& pool) {
108+
const void* headerBuffer;
109+
int32_t size;
110+
input->Next(&headerBuffer, &size);
111+
const char* bufferStart = reinterpret_cast<const char*>(headerBuffer);
112+
const char* bufferEnd = bufferStart + size;
113+
114+
std::shared_ptr<thrift::ThriftTransport> transport =
115+
std::make_shared<thrift::ThriftStreamingTransport>(
116+
input, bufferStart, bufferEnd);
117+
apache::thrift::protocol::TCompactProtocolT<thrift::ThriftTransport> protocol(
118+
transport);
119+
thrift::BloomFilterHeader header;
120+
uint32_t headerSize = header.read(&protocol);
121+
validateBloomFilterHeader(header);
122+
123+
const int32_t bloomFilterSize = header.numBytes;
124+
if (bloomFilterSize + headerSize <= size) {
125+
// The bloom filter data is entirely contained in the buffer we just read
126+
// => just return it.
127+
BlockSplitBloomFilter bloomFilter(&pool);
128+
bloomFilter.init(
129+
reinterpret_cast<const uint8_t*>(headerBuffer) + headerSize,
130+
bloomFilterSize);
131+
return bloomFilter;
132+
}
133+
// We have read a part of the bloom filter already, copy it to the target
134+
// buffer and read the remaining part from the InputStream.
135+
auto buffer = AlignedBuffer::allocate<char>(bloomFilterSize, &pool);
136+
137+
const auto bloomFilterSizeInHeaderBuffer = size - headerSize;
138+
if (bloomFilterSizeInHeaderBuffer > 0) {
139+
std::memcpy(
140+
buffer->asMutable<char>(),
141+
reinterpret_cast<const uint8_t*>(headerBuffer) + headerSize,
142+
bloomFilterSizeInHeaderBuffer);
143+
}
144+
const auto requiredReadSize = bloomFilterSize - bloomFilterSizeInHeaderBuffer;
145+
146+
input->readFully(
147+
buffer->asMutable<char>() + bloomFilterSizeInHeaderBuffer,
148+
requiredReadSize);
149+
VELOX_CHECK_EQ(
150+
buffer->size(),
151+
bloomFilterSize,
152+
"Bloom Filter read failed: not enough data, read size: {}, actual size: {}",
153+
buffer->size(),
154+
bloomFilterSize);
155+
BlockSplitBloomFilter bloomFilter(&pool);
156+
bloomFilter.init(
157+
reinterpret_cast<const uint8_t*>(buffer->as<char>()), bloomFilterSize);
158+
return bloomFilter;
159+
}
160+
161+
void BlockSplitBloomFilter::writeTo(
162+
velox::dwio::common::AppendOnlyBufferedStream* sink) const {
163+
VELOX_CHECK(sink != nullptr);
164+
165+
thrift::BloomFilterHeader header;
166+
if (algorithm_ != BloomFilter::Algorithm::BLOCK) {
167+
VELOX_FAIL("BloomFilter does not support Algorithm other than BLOCK");
168+
}
169+
header.algorithm.__set_BLOCK(thrift::SplitBlockAlgorithm());
170+
if (hashStrategy_ != HashStrategy::XXHASH) {
171+
VELOX_FAIL("BloomFilter does not support Hash other than XXHASH");
172+
}
173+
header.hash.__set_XXHASH(thrift::XxHash());
174+
if (compressionStrategy_ != CompressionStrategy::UNCOMPRESSED) {
175+
VELOX_FAIL(
176+
"BloomFilter does not support Compression other than UNCOMPRESSED");
177+
}
178+
header.compression.__set_UNCOMPRESSED(thrift::Uncompressed());
179+
header.__set_numBytes(numBytes_);
180+
181+
std::shared_ptr<apache::thrift::transport::TMemoryBuffer> memBuffer =
182+
std::make_shared<apache::thrift::transport::TMemoryBuffer>();
183+
apache::thrift::protocol::TCompactProtocolFactoryT<
184+
apache::thrift::transport::TMemoryBuffer>
185+
factory;
186+
std::shared_ptr<apache::thrift::protocol::TProtocol> protocol =
187+
factory.getProtocol(memBuffer);
188+
try {
189+
memBuffer->resetBuffer();
190+
header.write(protocol.get());
191+
} catch (std::exception& e) {
192+
std::stringstream ss;
193+
ss << "Couldn't serialize thrift: " << e.what() << "\n";
194+
VELOX_FAIL(ss.str());
195+
}
196+
uint8_t* outBuffer;
197+
uint32_t outLength;
198+
memBuffer->getBuffer(&outBuffer, &outLength);
199+
// write header
200+
sink->write(reinterpret_cast<const char*>(outBuffer), outLength);
201+
// write bitset
202+
sink->write(data_->as<char>(), numBytes_);
203+
}
204+
205+
bool BlockSplitBloomFilter::findHash(uint64_t hash) const {
206+
const uint32_t bucketIndex = static_cast<uint32_t>(
207+
((hash >> 32) * (numBytes_ / kBytesPerFilterBlock)) >> 32);
208+
const uint32_t key = static_cast<uint32_t>(hash);
209+
const uint32_t* bitset32 =
210+
reinterpret_cast<const uint32_t*>(data_->as<char>());
211+
212+
for (int i = 0; i < kBitsSetPerBlock; ++i) {
213+
// Calculate mask for key in the given bitset.
214+
const uint32_t mask = UINT32_C(0x1) << ((key * SALT[i]) >> 27);
215+
if (0 == (bitset32[kBitsSetPerBlock * bucketIndex + i] & mask)) {
216+
return false;
217+
}
218+
}
219+
return true;
220+
}
221+
222+
void BlockSplitBloomFilter::insertHashImpl(uint64_t hash) {
223+
const uint32_t bucketIndex = static_cast<uint32_t>(
224+
((hash >> 32) * (numBytes_ / kBytesPerFilterBlock)) >> 32);
225+
const uint32_t key = static_cast<uint32_t>(hash);
226+
uint32_t* bitset32 = reinterpret_cast<uint32_t*>(data_->asMutable<char>());
227+
228+
for (int i = 0; i < kBitsSetPerBlock; i++) {
229+
// Calculate mask for key in the given bitset.
230+
const uint32_t mask = UINT32_C(0x1) << ((key * SALT[i]) >> 27);
231+
bitset32[bucketIndex * kBitsSetPerBlock + i] |= mask;
232+
}
233+
}
234+
235+
void BlockSplitBloomFilter::insertHash(uint64_t hash) {
236+
insertHashImpl(hash);
237+
}
238+
239+
void BlockSplitBloomFilter::insertHashes(
240+
const uint64_t* hashes,
241+
int numValues) {
242+
for (int i = 0; i < numValues; ++i) {
243+
insertHashImpl(hashes[i]);
244+
}
245+
}
246+
247+
} // namespace facebook::velox::parquet

0 commit comments

Comments
 (0)