Skip to content

Commit 1a25ec2

Browse files
committed
refactor: Refactor FileMetadata to support writing of ORC's proto for ORC writing preparation
1 parent 4b1740d commit 1a25ec2

27 files changed

+1474
-337
lines changed

velox/dwio/dwrf/common/FileMetadata.h

+1,030
Large diffs are not rendered by default.

velox/dwio/dwrf/test/ColumnWriterIndexTest.cpp

+18-12
Original file line numberDiff line numberDiff line change
@@ -371,9 +371,10 @@ class WriterEncodingIndexTest2 {
371371
}
372372
}
373373
proto::StripeFooter stripeFooter;
374+
auto sfw = StripeFooterWriteWrapper(&stripeFooter);
374375
columnWriter->flush(
375-
[&stripeFooter](uint32_t /* unused */) -> proto::ColumnEncoding& {
376-
return *stripeFooter.add_encoding();
376+
[&sfw](uint32_t /* unused */) -> ColumnEncodingWriteWrapper {
377+
return sfw.addEncoding();
377378
});
378379

379380
// Simulate continue writing to next stripe, so internally buffered data
@@ -821,9 +822,10 @@ class IntegerColumnWriterDirectEncodingIndexTest : public testing::Test {
821822
// *all* streams
822823
EXPECT_CALL(*mockIndexBuilderPtr, add(0, -1)).Times(positionCount);
823824
proto::StripeFooter stripeFooter;
825+
auto sfw = StripeFooterWriteWrapper(&stripeFooter);
824826
columnWriter->flush(
825-
[&stripeFooter](uint32_t /* unused */) -> proto::ColumnEncoding& {
826-
return *stripeFooter.add_encoding();
827+
[&sfw](uint32_t /* unused */) -> ColumnEncodingWriteWrapper {
828+
return sfw.addEncoding();
827829
});
828830
} else {
829831
for (size_t i = 0; i != pageCount; ++i) {
@@ -847,9 +849,10 @@ class IntegerColumnWriterDirectEncodingIndexTest : public testing::Test {
847849
EXPECT_CALL(*mockIndexBuilderPtr, flush());
848850
EXPECT_CALL(*mockIndexBuilderPtr, add(0, -1)).Times(positionCount);
849851
proto::StripeFooter stripeFooter;
852+
auto sfw = StripeFooterWriteWrapper(&stripeFooter);
850853
columnWriter->flush(
851-
[&stripeFooter](uint32_t /* unused */) -> proto::ColumnEncoding& {
852-
return *stripeFooter.add_encoding();
854+
[&sfw](uint32_t /* unused */) -> ColumnEncodingWriteWrapper {
855+
return sfw.addEncoding();
853856
});
854857
}
855858

@@ -972,9 +975,10 @@ class StringColumnWriterDictionaryEncodingIndexTest : public testing::Test {
972975
// Recording PRESENT stream starting positions for the new stripe.
973976
EXPECT_CALL(*mockIndexBuilderPtr, add(0, -1)).Times(4);
974977
proto::StripeFooter stripeFooter;
978+
auto sfw = StripeFooterWriteWrapper(&stripeFooter);
975979
columnWriter->flush(
976-
[&stripeFooter](uint32_t /* unused */) -> proto::ColumnEncoding& {
977-
return *stripeFooter.add_encoding();
980+
[&sfw](uint32_t /* unused */) -> ColumnEncodingWriteWrapper {
981+
return sfw.addEncoding();
978982
});
979983

980984
// Simulate continue writing to next stripe, so internally buffered data
@@ -1128,9 +1132,10 @@ class StringColumnWriterDirectEncodingIndexTest : public testing::Test {
11281132
// *all* streams
11291133
EXPECT_CALL(*mockIndexBuilderPtr, add(0, -1)).Times(positionCount);
11301134
proto::StripeFooter stripeFooter;
1135+
auto sfw = StripeFooterWriteWrapper(&stripeFooter);
11311136
columnWriter->flush(
1132-
[&stripeFooter](uint32_t /* unused */) -> proto::ColumnEncoding& {
1133-
return *stripeFooter.add_encoding();
1137+
[&sfw](uint32_t /* unused */) -> ColumnEncodingWriteWrapper {
1138+
return sfw.addEncoding();
11341139
});
11351140
} else {
11361141
for (size_t i = 0; i != pageCount; ++i) {
@@ -1154,9 +1159,10 @@ class StringColumnWriterDirectEncodingIndexTest : public testing::Test {
11541159
EXPECT_CALL(*mockIndexBuilderPtr, flush());
11551160
EXPECT_CALL(*mockIndexBuilderPtr, add(0, -1)).Times(positionCount);
11561161
proto::StripeFooter stripeFooter;
1162+
auto sfw = StripeFooterWriteWrapper(&stripeFooter);
11571163
columnWriter->flush(
1158-
[&stripeFooter](uint32_t /* unused */) -> proto::ColumnEncoding& {
1159-
return *stripeFooter.add_encoding();
1164+
[&sfw](uint32_t /* unused */) -> ColumnEncodingWriteWrapper {
1165+
return sfw.addEncoding();
11601166
});
11611167
}
11621168

velox/dwio/dwrf/test/ColumnWriterTest.cpp

+36-25
Original file line numberDiff line numberDiff line change
@@ -328,34 +328,36 @@ void testDataTypeWriter(
328328
const TypePtr& type,
329329
std::vector<std::optional<T>>& data,
330330
const uint32_t sequence = 0,
331-
DwrfFormat format = DwrfFormat::kDwrf) {
331+
dwio::common::FileFormat fileFormat = dwio::common::FileFormat::DWRF) {
332332
// Generate a seed and randomly shuffle the data
333333
uint32_t seed = Random::rand32();
334334
std::shuffle(data.begin(), data.end(), std::default_random_engine(seed));
335335

336336
auto config = std::make_shared<Config>();
337337
auto pool = memory::memoryManager()->addLeafPool();
338-
WriterContext context{config, memory::memoryManager()->addRootPool()};
338+
WriterContext context{
339+
config, memory::memoryManager()->addRootPool(), fileFormat};
339340
context.initBuffer();
340341
auto rowType = ROW({type});
341342
auto dataTypeWithId = TypeWithId::create(type, 1);
342343

343344
// write
344-
auto writer = BaseColumnWriter::create(
345-
context, *dataTypeWithId, sequence, nullptr, format);
345+
auto writer =
346+
BaseColumnWriter::create(context, *dataTypeWithId, sequence, nullptr);
346347
auto size = data.size();
347348
auto batch = populateBatch(data, pool.get(), type);
348349
const size_t stripeCount = 2;
349350
const size_t strideCount = 3;
350351

351352
for (auto stripeI = 0; stripeI < stripeCount; ++stripeI) {
352353
proto::StripeFooter sf;
354+
auto sfw = StripeFooterWriteWrapper(&sf);
353355
for (auto strideI = 0; strideI < strideCount; ++strideI) {
354356
writer->write(batch, common::Ranges::of(0, size));
355357
writer->createIndexEntry();
356358
}
357-
writer->flush([&sf](uint32_t /* unused */) -> proto::ColumnEncoding& {
358-
return *sf.add_encoding();
359+
writer->flush([&sfw](uint32_t /* unused */) -> ColumnEncodingWriteWrapper {
360+
return sfw.addEncoding();
359361
});
360362

361363
TestStripeStreams streams(context, sf, rowType, pool.get());
@@ -461,7 +463,7 @@ TEST_F(ColumnWriterTest, TestNullBooleanWriter) {
461463
}
462464

463465
TEST_F(ColumnWriterTest, testDecimalWriter) {
464-
const auto format = DwrfFormat::kOrc;
466+
const auto format = dwio::common::FileFormat::ORC;
465467
auto genShortDecimals = [&](bool hasNull) {
466468
std::vector<std::optional<int64_t>> shortDecimals;
467469
for (auto i = 0; i < ITERATIONS; ++i) {
@@ -1003,6 +1005,7 @@ void testMapWriter(
10031005
}
10041006

10051007
proto::StripeFooter sf;
1008+
auto sfw = StripeFooterWriteWrapper(&sf);
10061009
std::vector<VectorPtr> writtenBatches;
10071010

10081011
// Write map/row
@@ -1020,8 +1023,8 @@ void testMapWriter(
10201023
writtenBatches.push_back(toWrite);
10211024
}
10221025

1023-
writer->flush([&sf](uint32_t /* unused */) -> proto::ColumnEncoding& {
1024-
return *sf.add_encoding();
1026+
writer->flush([&sfw](uint32_t /* unused */) -> ColumnEncodingWriteWrapper {
1027+
return sfw.addEncoding();
10251028
});
10261029

10271030
auto validate = [&](bool returnFlatVector = false) {
@@ -1145,6 +1148,7 @@ void testMapWriterRow(
11451148
}
11461149

11471150
proto::StripeFooter sf;
1151+
auto sfw = StripeFooterWriteWrapper(&sf);
11481152
std::vector<VectorPtr> writtenBatches;
11491153

11501154
// Write map/row
@@ -1156,8 +1160,8 @@ void testMapWriterRow(
11561160
writer->createIndexEntry();
11571161
writtenBatches.push_back(toWrite);
11581162

1159-
writer->flush([&sf](uint32_t /* unused */) -> proto::ColumnEncoding& {
1160-
return *sf.add_encoding();
1163+
writer->flush([&sfw](uint32_t /* unused */) -> ColumnEncodingWriteWrapper {
1164+
return sfw.addEncoding();
11611165
});
11621166

11631167
auto validate = [&](bool returnFlatVector = false) {
@@ -2174,15 +2178,16 @@ struct IntegerColumnWriterTypedTestCase {
21742178

21752179
for (size_t i = 0; i != flushCount; ++i) {
21762180
proto::StripeFooter stripeFooter;
2181+
auto sfw = StripeFooterWriteWrapper(&stripeFooter);
21772182
for (size_t j = 0; j != repetitionCount; ++j) {
21782183
columnWriter->write(batch, common::Ranges::of(0, batch->size()));
21792184
postProcess(*columnWriter, i, j);
21802185
columnWriter->createIndexEntry();
21812186
}
21822187
// We only flush once per stripe.
21832188
columnWriter->flush(
2184-
[&stripeFooter](uint32_t /* unused */) -> proto::ColumnEncoding& {
2185-
return *stripeFooter.add_encoding();
2189+
[&sfw](uint32_t /* unused */) -> ColumnEncodingWriteWrapper {
2190+
return sfw.addEncoding();
21862191
});
21872192

21882193
// Read and verify.
@@ -3408,6 +3413,7 @@ struct StringColumnWriterTestCase {
34083413

34093414
for (size_t i = 0; i != flushCount; ++i) {
34103415
proto::StripeFooter stripeFooter;
3416+
auto sfw = StripeFooterWriteWrapper(&stripeFooter);
34113417
// Write Stride
34123418
for (size_t j = 0; j != repetitionCount; ++j) {
34133419
// TODO: break the batch into multiple strides.
@@ -3418,8 +3424,8 @@ struct StringColumnWriterTestCase {
34183424

34193425
// Flush when all strides are written (once per stripe).
34203426
columnWriter->flush(
3421-
[&stripeFooter](uint32_t /* unused */) -> proto::ColumnEncoding& {
3422-
return *stripeFooter.add_encoding();
3427+
[&sfw](uint32_t /* unused */) -> ColumnEncodingWriteWrapper {
3428+
return sfw.addEncoding();
34233429
});
34243430

34253431
// Read and verify.
@@ -4248,8 +4254,9 @@ TEST_F(ColumnWriterTest, IntDictWriterDirectValueOverflow) {
42484254
writer->write(vector, common::Ranges::of(0, size));
42494255
writer->createIndexEntry();
42504256
proto::StripeFooter sf;
4251-
writer->flush([&sf](auto /* unused */) -> proto::ColumnEncoding& {
4252-
return *sf.add_encoding();
4257+
auto sfw = StripeFooterWriteWrapper(&sf);
4258+
writer->flush([&sfw](auto /* unused */) -> ColumnEncodingWriteWrapper {
4259+
return sfw.addEncoding();
42534260
});
42544261
auto& enc = sf.encoding(0);
42554262
ASSERT_EQ(enc.kind(), proto::ColumnEncoding_Kind_DICTIONARY);
@@ -4293,8 +4300,9 @@ TEST_F(ColumnWriterTest, ShortDictWriterDictValueOverflow) {
42934300
writer->write(vector, common::Ranges::of(0, size));
42944301
writer->createIndexEntry();
42954302
proto::StripeFooter sf;
4296-
writer->flush([&sf](auto /* unused */) -> proto::ColumnEncoding& {
4297-
return *sf.add_encoding();
4303+
auto sfw = StripeFooterWriteWrapper(&sf);
4304+
writer->flush([&sfw](uint32_t /* unused */) -> ColumnEncodingWriteWrapper {
4305+
return sfw.addEncoding();
42984306
});
42994307
auto& enc = sf.encoding(0);
43004308
ASSERT_EQ(enc.kind(), proto::ColumnEncoding_Kind_DICTIONARY);
@@ -4334,8 +4342,9 @@ TEST_F(ColumnWriterTest, RemovePresentStream) {
43344342
writer->write(vector, common::Ranges::of(0, size));
43354343
writer->createIndexEntry();
43364344
proto::StripeFooter sf;
4337-
writer->flush([&sf](auto /* unused */) -> proto::ColumnEncoding& {
4338-
return *sf.add_encoding();
4345+
auto sfw = StripeFooterWriteWrapper(&sf);
4346+
writer->flush([&sfw](uint32_t /* unused */) -> ColumnEncodingWriteWrapper {
4347+
return sfw.addEncoding();
43394348
});
43404349

43414350
// get data stream
@@ -4372,8 +4381,9 @@ TEST_F(ColumnWriterTest, ColumnIdInStream) {
43724381
writer->write(vector, common::Ranges::of(0, size));
43734382
writer->createIndexEntry();
43744383
proto::StripeFooter sf;
4375-
writer->flush([&sf](auto /* unused */) -> proto::ColumnEncoding& {
4376-
return *sf.add_encoding();
4384+
auto sfw = StripeFooterWriteWrapper(&sf);
4385+
writer->flush([&sfw](uint32_t /* unused */) -> ColumnEncodingWriteWrapper {
4386+
return sfw.addEncoding();
43774387
});
43784388

43794389
// get data stream
@@ -4501,8 +4511,9 @@ struct DictColumnWriterTestCase {
45014511
writer->createIndexEntry();
45024512

45034513
proto::StripeFooter sf;
4504-
writer->flush([&sf](uint32_t /* unused */) -> proto::ColumnEncoding& {
4505-
return *sf.add_encoding();
4514+
auto sfw = StripeFooterWriteWrapper(&sf);
4515+
writer->flush([&sfw](uint32_t /* unused */) -> ColumnEncodingWriteWrapper {
4516+
return sfw.addEncoding();
45064517
});
45074518

45084519
// Reading the vector out

velox/dwio/dwrf/test/DecryptionTests.cpp

+24-12
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ TEST(Decryption, NotEncrypted) {
3232
HiveTypeParser parser;
3333
auto type = parser.parse("struct<a:int>");
3434
proto::Footer footer;
35-
ProtoUtils::writeType(*type, footer);
35+
auto footerWrapper = FooterWriteWrapper(&footer);
36+
ProtoUtils::writeType(*type, footerWrapper);
3637
TestDecrypterFactory factory;
3738
auto handler = DecryptionHandler::create(footer, &factory);
3839
ASSERT_FALSE(handler->isEncrypted());
@@ -42,7 +43,8 @@ TEST(Decryption, NoKeyProvider) {
4243
HiveTypeParser parser;
4344
auto type = parser.parse("struct<a:int>");
4445
proto::Footer footer;
45-
ProtoUtils::writeType(*type, footer);
46+
auto footerWrapper = FooterWriteWrapper(&footer);
47+
ProtoUtils::writeType(*type, footerWrapper);
4648
footer.mutable_encryption();
4749
TestDecrypterFactory factory;
4850
ASSERT_THROW(
@@ -53,7 +55,8 @@ TEST(Decryption, EmptyGroup) {
5355
HiveTypeParser parser;
5456
auto type = parser.parse("struct<a:int>");
5557
proto::Footer footer;
56-
ProtoUtils::writeType(*type, footer);
58+
auto footerWrapper = FooterWriteWrapper(&footer);
59+
ProtoUtils::writeType(*type, footerWrapper);
5760
auto enc = footer.mutable_encryption();
5861
enc->set_keyprovider(proto::Encryption_KeyProvider_UNKNOWN);
5962
TestDecrypterFactory factory;
@@ -65,7 +68,8 @@ TEST(Decryption, EmptyNodes) {
6568
HiveTypeParser parser;
6669
auto type = parser.parse("struct<a:int>");
6770
proto::Footer footer;
68-
ProtoUtils::writeType(*type, footer);
71+
auto footerWrapper = FooterWriteWrapper(&footer);
72+
ProtoUtils::writeType(*type, footerWrapper);
6973
auto enc = footer.mutable_encryption();
7074
enc->set_keyprovider(proto::Encryption_KeyProvider_UNKNOWN);
7175
auto group = enc->add_encryptiongroups();
@@ -79,7 +83,8 @@ TEST(Decryption, StatsMismatch) {
7983
HiveTypeParser parser;
8084
auto type = parser.parse("struct<a:int>");
8185
proto::Footer footer;
82-
ProtoUtils::writeType(*type, footer);
86+
auto footerWrapper = FooterWriteWrapper(&footer);
87+
ProtoUtils::writeType(*type, footerWrapper);
8388
auto enc = footer.mutable_encryption();
8489
enc->set_keyprovider(proto::Encryption_KeyProvider_UNKNOWN);
8590
auto group = enc->add_encryptiongroups();
@@ -96,7 +101,8 @@ TEST(Decryption, KeyExistenceMismatch) {
96101
HiveTypeParser parser;
97102
auto type = parser.parse("struct<a:int>");
98103
proto::Footer footer;
99-
ProtoUtils::writeType(*type, footer);
104+
auto footerWrapper = FooterWriteWrapper(&footer);
105+
ProtoUtils::writeType(*type, footerWrapper);
100106
auto enc = footer.mutable_encryption();
101107
enc->set_keyprovider(proto::Encryption_KeyProvider_UNKNOWN);
102108
for (size_t i = 0; i < 2; ++i) {
@@ -116,7 +122,8 @@ TEST(Decryption, ReuseStripeKey) {
116122
HiveTypeParser parser;
117123
auto type = parser.parse("struct<a:int>");
118124
proto::Footer footer;
119-
ProtoUtils::writeType(*type, footer);
125+
auto footerWrapper = FooterWriteWrapper(&footer);
126+
ProtoUtils::writeType(*type, footerWrapper);
120127
auto enc = footer.mutable_encryption();
121128
enc->set_keyprovider(proto::Encryption_KeyProvider_UNKNOWN);
122129
auto group = enc->add_encryptiongroups();
@@ -135,7 +142,8 @@ TEST(Decryption, StripeKeyMismatch) {
135142
HiveTypeParser parser;
136143
auto type = parser.parse("struct<a:int>");
137144
proto::Footer footer;
138-
ProtoUtils::writeType(*type, footer);
145+
auto footerWrapper = FooterWriteWrapper(&footer);
146+
ProtoUtils::writeType(*type, footerWrapper);
139147
auto enc = footer.mutable_encryption();
140148
enc->set_keyprovider(proto::Encryption_KeyProvider_UNKNOWN);
141149
auto group = enc->add_encryptiongroups();
@@ -153,7 +161,8 @@ TEST(Decryption, Basic) {
153161
HiveTypeParser parser;
154162
auto type = parser.parse("struct<a:int,b:float,c:string,d:double>");
155163
proto::Footer footer;
156-
ProtoUtils::writeType(*type, footer);
164+
auto footerWrapper = FooterWriteWrapper(&footer);
165+
ProtoUtils::writeType(*type, footerWrapper);
157166
auto enc = footer.mutable_encryption();
158167
enc->set_keyprovider(proto::Encryption_KeyProvider_UNKNOWN);
159168
for (auto i = 0; i < 5; ++i) {
@@ -183,7 +192,8 @@ TEST(Decryption, NestedType) {
183192
auto type = parser.parse(
184193
"struct<a:int,b:map<float,map<int,double>>,c:struct<a:string,b:int>,d:array<double>>");
185194
proto::Footer footer;
186-
ProtoUtils::writeType(*type, footer);
195+
auto footerWrapper = FooterWriteWrapper(&footer);
196+
ProtoUtils::writeType(*type, footerWrapper);
187197
auto enc = footer.mutable_encryption();
188198
enc->set_keyprovider(proto::Encryption_KeyProvider_UNKNOWN);
189199

@@ -222,7 +232,8 @@ TEST(Decryption, RootNode) {
222232
HiveTypeParser parser;
223233
auto type = parser.parse("struct<a:int,b:int>");
224234
proto::Footer footer;
225-
ProtoUtils::writeType(*type, footer);
235+
auto footerWrapper = FooterWriteWrapper(&footer);
236+
ProtoUtils::writeType(*type, footerWrapper);
226237
auto enc = footer.mutable_encryption();
227238
enc->set_keyprovider(proto::Encryption_KeyProvider_UNKNOWN);
228239
auto group = enc->add_encryptiongroups();
@@ -238,7 +249,8 @@ TEST(Decryption, GroupOverlap) {
238249
HiveTypeParser parser;
239250
auto type = parser.parse("struct<a:struct<a:float,b:double>>");
240251
proto::Footer footer;
241-
ProtoUtils::writeType(*type, footer);
252+
auto footerWrapper = FooterWriteWrapper(&footer);
253+
ProtoUtils::writeType(*type, footerWrapper);
242254
auto enc = footer.mutable_encryption();
243255
enc->set_keyprovider(proto::Encryption_KeyProvider_UNKNOWN);
244256

0 commit comments

Comments
 (0)