Skip to content

Commit

Permalink
Add e2e test
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo committed Feb 21, 2025
1 parent 3a02d43 commit 3aa1d12
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 21 deletions.
14 changes: 10 additions & 4 deletions velox/dwio/dwrf/reader/ReaderBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,13 +343,19 @@ std::shared_ptr<const Type> ReaderBase::convertType(
return SMALLINT();
case TypeKind::INTEGER:
return INTEGER();
case TypeKind::BIGINT:
case TypeKind::BIGINT: {
TypePtr converted;
if (type.format() == DwrfFormat::kOrc &&
type.getOrcPtr()->kind() == proto::orc::Type_Kind_DECIMAL) {
return DECIMAL(
type.getOrcPtr()->precision(), type.getOrcPtr()->scale());
converted =
DECIMAL(type.getOrcPtr()->precision(), type.getOrcPtr()->scale());
} else {
converted = BIGINT();
common::testutil::TestValue::adjust(
"facebook::velox::dwrf::ReaderBase::convertType", &converted);
}
return BIGINT();
return converted;
}
case TypeKind::HUGEINT:
if (type.format() == DwrfFormat::kOrc &&
type.getOrcPtr()->kind() == proto::orc::Type_Kind_DECIMAL) {
Expand Down
86 changes: 77 additions & 9 deletions velox/dwio/dwrf/test/E2EFilterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "velox/common/base/Portability.h"
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/testutil/TestValue.h"
#include "velox/dwio/common/tests/utils/E2EFilterTestBase.h"
#include "velox/dwio/dwrf/reader/DwrfReader.h"
Expand Down Expand Up @@ -64,11 +65,11 @@ class E2EFilterTest : public E2EFilterTestBase {
const TypePtr& type,
const std::vector<RowVectorPtr>& batches,
bool forRowGroupSkip = false) override {
auto options = createWriterOptions(type);
setWriterOptions(type);
int32_t flushCounter = 0;
// If we test row group skip, we have all the data in one stripe. For
// scan, we start a stripe every 'flushEveryNBatches_' batches.
options.flushPolicyFactory = [&]() {
options_.flushPolicyFactory = [&]() {
return std::make_unique<LambdaFlushPolicy>([&]() {
return forRowGroupSkip ? false
: (++flushCounter % flushEveryNBatches_ == 0);
Expand All @@ -80,8 +81,8 @@ class E2EFilterTest : public E2EFilterTestBase {
dwio::common::FileSink::Options{.pool = leafPool_.get()});
ASSERT_TRUE(sink->isBuffered());
auto* sinkPtr = sink.get();
options.memoryPool = rootPool_.get();
writer_ = std::make_unique<dwrf::Writer>(std::move(sink), options);
options_.memoryPool = rootPool_.get();
writer_ = std::make_unique<dwrf::Writer>(std::move(sink), options_);
for (auto& batch : batches) {
writer_->write(batch);
}
Expand All @@ -105,9 +106,10 @@ class E2EFilterTest : public E2EFilterTestBase {
}

std::unordered_set<std::string> flatMapColumns_;
dwrf::WriterOptions options_;

private:
dwrf::WriterOptions createWriterOptions(const TypePtr& type) {
void setWriterOptions(const TypePtr& type) {
auto config = std::make_shared<dwrf::Config>();
config->set(dwrf::Config::COMPRESSION, CompressionKind_NONE);
config->set(dwrf::Config::USE_VINTS, useVInts_);
Expand Down Expand Up @@ -148,10 +150,8 @@ class E2EFilterTest : public E2EFilterTestBase {
config->set<const std::vector<std::vector<std::string>>>(
dwrf::Config::MAP_FLAT_COLS_STRUCT_KEYS, mapFlatColsStructKeys);
}
dwrf::WriterOptions options;
options.config = config;
options.schema = writerSchema;
return options;
options_.config = config;
options_.schema = writerSchema;
}

std::unique_ptr<dwrf::Writer> writer_;
Expand Down Expand Up @@ -227,6 +227,74 @@ TEST_F(E2EFilterTest, byteRle) {
20);
}

DEBUG_ONLY_TEST_F(E2EFilterTest, shortDecimal) {
testutil::TestValue::enable();
options_.format = DwrfFormat::kOrc;
const std::unordered_map<std::string, TypePtr> types = {
{"shortdecimal_val:decimal(8, 5)", DECIMAL(8, 5)},
{"shortdecimal_val:decimal(10, 5)", DECIMAL(10, 5)},
{"shortdecimal_val:decimal(17, 5)", DECIMAL(17, 5)}};

for (const auto& pair : types) {
SCOPED_TESTVALUE_SET(
"facebook::velox::dwrf::ReaderBase::convertType",
std::function<void(TypePtr*)>(
[&](TypePtr* type) { *type = pair.second; }));
testWithTypes(
pair.first,
[&]() {
makeIntDistribution<int64_t>(
"shortdecimal_val",
10, // min
100, // max
22, // repeats
19, // rareFrequency
-999, // rareMin
30000, // rareMax
true);
},
false,
{"shortdecimal_val"},
20);
}
options_.format = DwrfFormat::kDwrf;
}

DEBUG_ONLY_TEST_F(E2EFilterTest, longDecimal) {
testutil::TestValue::enable();
options_.format = DwrfFormat::kOrc;
const std::unordered_map<std::string, TypePtr> types = {
{"longdecimal_val:decimal(30, 10)", DECIMAL(30, 10)},
{"longdecimal_val:decimal(37, 15)", DECIMAL(37, 15)}};

SCOPED_TESTVALUE_SET(
"facebook::velox::dwrf::ProtoUtils::writeType",
std::function<void(bool*)>([&](bool* kindSet) { *kindSet = true; }));
for (const auto& pair : types) {
SCOPED_TESTVALUE_SET(
"facebook::velox::dwrf::ReaderBase::convertType",
std::function<void(TypePtr*)>(
[&](TypePtr* type) { *type = pair.second; }));
testWithTypes(
pair.first,
[&]() {
makeIntDistribution<int128_t>(
"longdecimal_val",
10, // min
100, // max
22, // repeats
19, // rareFrequency
-999, // rareMin
30000, // rareMax
true);
},
true,
{"longdecimal_val"},
20);
}
options_.format = DwrfFormat::kDwrf;
}

TEST_F(E2EFilterTest, floatAndDouble) {
testWithTypes(
"float_val:float,"
Expand Down
19 changes: 16 additions & 3 deletions velox/dwio/dwrf/utils/ProtoUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,22 @@ void ProtoUtils::writeType(
if (parent) {
parent->add_subtypes(footer.types_size() - 1);
}
auto kind =
VELOX_STATIC_FIELD_DYNAMIC_DISPATCH(SchemaType, kind, type.kind());
self->set_kind(kind);
bool kindSet = false;
if (type.kind() == TypeKind::HUGEINT) {
// Hugeint is not supported by DWRF, and this branch is only for ORC
// testing before the ORC footer write is implemented.
auto kind = SchemaType<TypeKind::BIGINT>::kind;
self->set_kind(kind);
common::testutil::TestValue::adjust(
"facebook::velox::dwrf::ProtoUtils::writeType", &kindSet);
} else {
auto kind =
VELOX_STATIC_FIELD_DYNAMIC_DISPATCH(SchemaType, kind, type.kind());
self->set_kind(kind);
kindSet = true;
}
VELOX_CHECK(kindSet, "Unknown type {}.", type.toString());

switch (type.kind()) {
case TypeKind::ROW: {
auto& row = type.asRow();
Expand Down
28 changes: 24 additions & 4 deletions velox/dwio/dwrf/writer/ColumnWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2183,7 +2183,12 @@ std::unique_ptr<BaseColumnWriter> BaseColumnWriter::create(
context, type, sequence, onRecordPosition);
ret->children_.reserve(type.size());
for (int32_t i = 0; i < type.size(); ++i) {
ret->children_.push_back(create(context, *type.childAt(i), sequence));
ret->children_.push_back(create(
context,
*type.childAt(i),
sequence,
/*onRecordPosition=*/nullptr,
format));
}
return ret;
}
Expand All @@ -2199,15 +2204,30 @@ std::unique_ptr<BaseColumnWriter> BaseColumnWriter::create(
}
auto ret = std::make_unique<MapColumnWriter>(
context, type, sequence, onRecordPosition);
ret->children_.push_back(create(context, *type.childAt(0), sequence));
ret->children_.push_back(create(context, *type.childAt(1), sequence));
ret->children_.push_back(create(
context,
*type.childAt(0),
sequence,
/*onRecordPosition=*/nullptr,
format));
ret->children_.push_back(create(
context,
*type.childAt(1),
sequence,
/*onRecordPosition=*/nullptr,
format));
return ret;
}
case TypeKind::ARRAY: {
VELOX_CHECK_EQ(type.size(), 1, "Array should have exactly one child");
auto ret = std::make_unique<ListColumnWriter>(
context, type, sequence, onRecordPosition);
ret->children_.push_back(create(context, *type.childAt(0), sequence));
ret->children_.push_back(create(
context,
*type.childAt(0),
sequence,
/*onRecordPosition=*/nullptr,
format));
return ret;
}
default:
Expand Down
7 changes: 6 additions & 1 deletion velox/dwio/dwrf/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,12 @@ Writer::Writer(
}

if (options.columnWriterFactory == nullptr) {
writer_ = BaseColumnWriter::create(writerBase_->getContext(), *schema_);
writer_ = BaseColumnWriter::create(
writerBase_->getContext(),
*schema_,
/*sequence=*/0,
/*onRecordPosition=*/nullptr,
options.format);
} else {
writer_ = options.columnWriterFactory(writerBase_->getContext(), *schema_);
}
Expand Down

0 comments on commit 3aa1d12

Please sign in to comment.