From 8a3c2c2d296cba012f967dd81bb8c8be0fffddf6 Mon Sep 17 00:00:00 2001 From: Rui Mo Date: Tue, 11 Feb 2025 16:42:55 +0000 Subject: [PATCH 1/4] Support scan filter for decimal in ORC --- .../reader/SelectiveDecimalColumnReader.cpp | 173 ++++++++++++++++-- .../reader/SelectiveDecimalColumnReader.h | 19 +- velox/exec/tests/CMakeLists.txt | 1 + velox/exec/tests/TableScanTest.cpp | 121 ++++++++++++ velox/exec/tests/data/long_decimal.orc | Bin 0 -> 485 bytes velox/exec/tests/data/short_decimal.orc | Bin 0 -> 429 bytes 6 files changed, 301 insertions(+), 13 deletions(-) create mode 100644 velox/exec/tests/data/long_decimal.orc create mode 100644 velox/exec/tests/data/short_decimal.orc diff --git a/velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.cpp index 4f6c19d8c445..1a3d369384bf 100644 --- a/velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.cpp @@ -67,16 +67,17 @@ void SelectiveDecimalColumnReader::seekToRowGroup(int64_t index) { template template -void SelectiveDecimalColumnReader::readHelper(RowSet rows) { - vector_size_t numRows = rows.back() + 1; +void SelectiveDecimalColumnReader::readHelper( + common::Filter* filter, + RowSet rows) { ExtractToReader extractValues(this); - common::AlwaysTrue filter; + common::AlwaysTrue alwaysTrue; DirectRleColumnVisitor< int64_t, common::AlwaysTrue, decltype(extractValues), kDense> - visitor(filter, this, rows, extractValues); + visitor(alwaysTrue, this, rows, extractValues); // decode scale stream if (version_ == velox::dwrf::RleVersion_1) { @@ -96,14 +97,161 @@ void SelectiveDecimalColumnReader::readHelper(RowSet rows) { // reset numValues_ before reading values numValues_ = 0; valueSize_ = sizeof(DataT); + vector_size_t numRows = rows.back() + 1; ensureValuesCapacity(numRows); // decode value stream facebook::velox::dwio::common:: ColumnVisitor - valueVisitor(filter, this, rows, extractValues); + valueVisitor(alwaysTrue, this, rows, extractValues); decodeWithVisitor>(valueDecoder_.get(), valueVisitor); readOffset_ += numRows; + + // Fill decimals before applying filter. + fillDecimals(); + + const auto rawNulls = nullsInReadRange_ + ? (kDense ? nullsInReadRange_->as() : rawResultNulls_) + : nullptr; + // Process filter. + process(filter, rows, rawNulls); +} + +template +void SelectiveDecimalColumnReader::processNulls( + bool isNull, + const RowSet& rows, + const uint64_t* rawNulls) { + if (!rawNulls) { + return; + } + returnReaderNulls_ = false; + anyNulls_ = !isNull; + allNull_ = isNull; + + auto rawDecimal = values_->asMutable(); + auto rawScale = scaleBuffer_->asMutable(); + + vector_size_t idx = 0; + if (isNull) { + for (vector_size_t i = 0; i < numValues_; i++) { + if (bits::isBitNull(rawNulls, i)) { + bits::setNull(rawResultNulls_, idx); + addOutputRow(rows[i]); + idx++; + } + } + } else { + for (vector_size_t i = 0; i < numValues_; i++) { + if (!bits::isBitNull(rawNulls, i)) { + bits::setNull(rawResultNulls_, idx, false); + rawDecimal[idx] = rawDecimal[i]; + rawScale[idx] = rawScale[i]; + addOutputRow(rows[i]); + idx++; + } + } + } +} + +template +void SelectiveDecimalColumnReader::processFilter( + const common::Filter* filter, + const RowSet& rows, + const uint64_t* rawNulls) { + VELOX_CHECK_NOT_NULL(filter, "Filter must not be null."); + returnReaderNulls_ = false; + anyNulls_ = false; + allNull_ = true; + + vector_size_t idx = 0; + auto rawDecimal = values_->asMutable(); + for (vector_size_t i = 0; i < numValues_; i++) { + if (rawNulls && bits::isBitNull(rawNulls, i)) { + if (filter->testNull()) { + bits::setNull(rawResultNulls_, idx); + addOutputRow(rows[i]); + anyNulls_ = true; + idx++; + } + } else { + bool tested; + if constexpr (std::is_same_v) { + tested = filter->testInt64(rawDecimal[i]); + } else { + tested = filter->testInt128(rawDecimal[i]); + } + + if (tested) { + if (rawNulls) { + bits::setNull(rawResultNulls_, idx, false); + } + rawDecimal[idx] = rawDecimal[i]; + addOutputRow(rows[i]); + allNull_ = false; + idx++; + } + } + } +} + +template +void SelectiveDecimalColumnReader::process( + const common::Filter* filter, + const RowSet& rows, + const uint64_t* rawNulls) { + // Treat the filter as kAlwaysTrue if any of the following conditions are met: + // 1) No filter found; + // 2) Filter is kIsNotNull but rawNulls == NULL (no elements is null). + auto filterKind = + !filter || (filter->kind() == common::FilterKind::kIsNotNull && !rawNulls) + ? common::FilterKind::kAlwaysTrue + : filter->kind(); + switch (filterKind) { + case common::FilterKind::kAlwaysTrue: + // Simply add all rows to output. + for (vector_size_t i = 0; i < numValues_; i++) { + addOutputRow(rows[i]); + } + break; + case common::FilterKind::kIsNull: + processNulls(true, rows, rawNulls); + break; + case common::FilterKind::kIsNotNull: + processNulls(false, rows, rawNulls); + break; + case common::FilterKind::kBigintRange: + case common::FilterKind::kBigintValuesUsingHashTable: + case common::FilterKind::kBigintValuesUsingBitmask: + case common::FilterKind::kNegatedBigintRange: + case common::FilterKind::kNegatedBigintValuesUsingHashTable: + case common::FilterKind::kNegatedBigintValuesUsingBitmask: + case common::FilterKind::kBigintMultiRange: { + if constexpr (std::is_same_v) { + processFilter(filter, rows, rawNulls); + } else { + const auto actualType = CppToType::create(); + VELOX_NYI( + "Expected type BIGINT, but found file type {}.", + actualType->toString()); + } + break; + } + case common::FilterKind::kHugeintValuesUsingHashTable: + case common::FilterKind::kHugeintRange: { + if constexpr (std::is_same_v) { + processFilter(filter, rows, rawNulls); + } else { + const auto actualType = CppToType::create(); + VELOX_NYI( + "Expected type HUGEINT, but found file type {}.", + actualType->toString()); + } + break; + } + default: + VELOX_NYI("Unsupported filter: {}.", static_cast(filterKind)); + } } template @@ -111,14 +259,13 @@ void SelectiveDecimalColumnReader::read( int64_t offset, const RowSet& rows, const uint64_t* incomingNulls) { - VELOX_CHECK(!scanSpec_->filter()); VELOX_CHECK(!scanSpec_->valueHook()); prepareRead(offset, rows, incomingNulls); bool isDense = rows.back() == rows.size() - 1; if (isDense) { - readHelper(rows); + readHelper(scanSpec_->filter(), rows); } else { - readHelper(rows); + readHelper(scanSpec_->filter(), rows); } } @@ -126,16 +273,18 @@ template void SelectiveDecimalColumnReader::getValues( const RowSet& rows, VectorPtr* result) { + rawValues_ = values_->asMutable(); + getIntValues(rows, requestedType_, result); +} + +template +void SelectiveDecimalColumnReader::fillDecimals() { auto nullsPtr = resultNulls() ? resultNulls()->template as() : nullptr; auto scales = scaleBuffer_->as(); auto values = values_->asMutable(); - DecimalUtil::fillDecimals( values, nullsPtr, values, scales, numValues_, scale_); - - rawValues_ = values_->asMutable(); - getIntValues(rows, requestedType_, result); } template class SelectiveDecimalColumnReader; diff --git a/velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.h b/velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.h index 67a82b051e36..4482ef47fc50 100644 --- a/velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.h @@ -49,7 +49,24 @@ class SelectiveDecimalColumnReader : public SelectiveColumnReader { private: template - void readHelper(RowSet rows); + void readHelper(common::Filter* filter, RowSet rows); + + // Process IsNull and IsNotNull filters. + void processNulls(bool isNull, const RowSet& rows, const uint64_t* rawNulls); + + // Process filters on decimal values. + void processFilter( + const common::Filter* filter, + const RowSet& rows, + const uint64_t* rawNulls); + + // Dispatch to the respective filter processing based on the filter type. + void process( + const common::Filter* filter, + const RowSet& rows, + const uint64_t* rawNulls); + + void fillDecimals(); std::unique_ptr> valueDecoder_; std::unique_ptr> scaleDecoder_; diff --git a/velox/exec/tests/CMakeLists.txt b/velox/exec/tests/CMakeLists.txt index 572ba537c10f..947f7f22a6de 100644 --- a/velox/exec/tests/CMakeLists.txt +++ b/velox/exec/tests/CMakeLists.txt @@ -129,6 +129,7 @@ target_link_libraries( velox_aggregates velox_dwio_common velox_dwio_common_exception + velox_dwio_orc_reader velox_dwio_common_test_utils velox_dwio_parquet_reader velox_dwio_parquet_writer diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index 92e65abe8e70..d4d963506838 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -34,6 +34,7 @@ #include "velox/connectors/hive/HivePartitionFunction.h" #include "velox/dwio/common/CacheInputStream.h" #include "velox/dwio/common/tests/utils/DataFiles.h" +#include "velox/dwio/orc/reader/OrcReader.h" #include "velox/exec/Cursor.h" #include "velox/exec/Exchange.h" #include "velox/exec/OutputBufferManager.h" @@ -45,6 +46,7 @@ #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" #include "velox/expression/ExprToSubfieldFilter.h" +#include "velox/functions/lib/IsNull.h" #include "velox/type/Timestamp.h" #include "velox/type/Type.h" #include "velox/type/tests/SubfieldFiltersBuilder.h" @@ -78,6 +80,7 @@ class TableScanTest : public virtual HiveConnectorTestBase { HiveConnectorTestBase::SetUp(); exec::ExchangeSource::factories().clear(); exec::ExchangeSource::registerFactory(createLocalExchangeSource); + orc::registerOrcReaderFactory(); } static void SetUpTestCase() { @@ -1855,6 +1858,124 @@ TEST_F(TableScanTest, validFileNoData) { assertQuery(op, split, ""); } +TEST_F(TableScanTest, shortDecimalFilter) { + functions::registerIsNotNullFunction("isnotnull"); + + std::vector> values = { + 123456789123456789L, + 987654321123456L, + std::nullopt, + 2000000000000000L, + 5000000000000000L, + 987654321987654321L, + 100000000000000L, + 1230000000123456L, + 120000000123456L, + std::nullopt}; + auto rowVector = makeRowVector({ + makeNullableFlatVector(values, DECIMAL(18, 6)), + }); + createDuckDbTable({rowVector}); + + auto filePath = facebook::velox::test::getDataFilePath( + "velox/exec/tests", "data/short_decimal.orc"); + auto split = exec::test::HiveConnectorSplitBuilder(filePath) + .start(0) + .length(fs::file_size(filePath)) + .fileFormat(dwio::common::FileFormat::ORC) + .build(); + + auto rowType = ROW({"d"}, {DECIMAL(18, 6)}); + + // Is not null. + auto op = + PlanBuilder().tableScan(rowType, {}, "isnotnull(d)", rowType).planNode(); + assertQuery(op, split, "SELECT c0 FROM tmp where c0 is not null"); + + // Is null. + op = PlanBuilder().tableScan(rowType, {}, "is_null(d)", rowType).planNode(); + assertQuery(op, split, "SELECT c0 FROM tmp where c0 is null"); + + // BigintRange. + op = + PlanBuilder() + .tableScan( + rowType, + {}, + "d > 2000000000.0::DECIMAL(18, 6) and d < 6000000000.0::DECIMAL(18, 6)", + rowType) + .planNode(); + assertQuery( + op, + split, + "SELECT c0 FROM tmp where c0 > 2000000000.0 and c0 < 6000000000.0"); + + // NegatedBigintRange. + op = + PlanBuilder() + .tableScan( + rowType, + {}, + "not(d between 2000000000.0::DECIMAL(18, 6) and 6000000000.0::DECIMAL(18, 6))", + rowType) + .planNode(); + assertQuery( + op, + split, + "SELECT c0 FROM tmp where c0 < 2000000000.0 or c0 > 6000000000.0"); +} + +TEST_F(TableScanTest, longDecimalFilter) { + functions::registerIsNotNullFunction("isnotnull"); + + std::vector> values = { + HugeInt::parse("123456789123456789123456789" + std::string(9, '0')), + HugeInt::parse("987654321123456789" + std::string(9, '0')), + std::nullopt, + HugeInt::parse("2" + std::string(37, '0')), + HugeInt::parse("5" + std::string(37, '0')), + HugeInt::parse("987654321987654321987654321" + std::string(9, '0')), + HugeInt::parse("1" + std::string(26, '0')), + HugeInt::parse("123000000012345678" + std::string(10, '0')), + HugeInt::parse("120000000123456789" + std::string(9, '0')), + HugeInt::parse("9" + std::string(37, '0'))}; + auto rowVector = makeRowVector({ + makeNullableFlatVector(values, DECIMAL(38, 18)), + }); + createDuckDbTable({rowVector}); + + auto filePath = facebook::velox::test::getDataFilePath( + "velox/exec/tests", "data/long_decimal.orc"); + auto split = exec::test::HiveConnectorSplitBuilder(filePath) + .start(0) + .length(fs::file_size(filePath)) + .fileFormat(dwio::common::FileFormat::ORC) + .build(); + + auto rowType = ROW({"d"}, {DECIMAL(38, 18)}); + auto op = + PlanBuilder().tableScan(rowType, {}, "isnotnull(d)", rowType).planNode(); + assertQuery(op, split, "SELECT c0 FROM tmp where c0 is not null"); + + // Is null. + op = PlanBuilder().tableScan(rowType, {}, "is_null(d)", rowType).planNode(); + assertQuery(op, split, "SELECT c0 FROM tmp where c0 is null"); + + // HugeintRange. + op = + PlanBuilder() + .tableScan( + rowType, + {}, + "d > 2000000000.0::DECIMAL(38, 18) and d < 6000000000.0::DECIMAL(38, 18)", + rowType) + .planNode(); + assertQuery( + op, + split, + "SELECT c0 FROM tmp where c0 > 2000000000.0 and c0 < 6000000000.0"); +} + // An invalid (size = 0) file. TEST_F(TableScanTest, emptyFile) { auto filePath = TempFilePath::create(); diff --git a/velox/exec/tests/data/long_decimal.orc b/velox/exec/tests/data/long_decimal.orc new file mode 100644 index 0000000000000000000000000000000000000000..f732246b469aab0ccddbf80fbdfb98ecf6adc79a GIT binary patch literal 485 zcmeYdau#G@;9?VE;ou5js9|6T7vKuz;$#qHR1xyv;52gL;xshiW#kYNvE=2_V`LDJ zk~cIn6l4rBHZ`#@H#0RcHqtY(Fa#kpivUJ;28Mt453FZkSoLSof%ngEJbwE9`$L{p zce=N~oc?x;bVEbK-bGD|`&P3wG#r?5?dbK#uelzbIK!wSX^ z5gg0{9~h;$fRv9!o`SIkml6jDj}RlH6l01K8wa0;nt_m(B9}ydQMz7YL1J=7s$OwH zVo|nUS!z*nW`3R!tFfN3o}mGk71$|OaHkk>GJ!&ylhX(h+9s%OVPy6J1$IQ9!vsGM nh6u(aRzp2=Ju{A(i~>v&4GjWHObk34&IX)KUznNwgPbJ*j+A$+ literal 0 HcmV?d00001 diff --git a/velox/exec/tests/data/short_decimal.orc b/velox/exec/tests/data/short_decimal.orc new file mode 100644 index 0000000000000000000000000000000000000000..d442711fc750540744f81c1bb561d2ecf180f0af GIT binary patch literal 429 zcmeYdau#G@;9?VE;ou5jh+$wb72q=F;$#qHR1s3+;4o6);xshiWsDFKwzM!eGc_?b zG6XX8AglmJb_Rxj_Z#vV7*_phI{Wp~*=@`XhmQ8QO+3ilaAD17zJ?R)7pOiuwc!2x z>2J44H?*veZ|Gb$aog(kEDfFA*S?-G<7HrQV?V=i;7lUO5gcp+3=-U6no+_KNHYj9 zO0WTGMj&kkq*;M9rw|hd0}wOSFfe$yaJg}TUCw0(bgUV~u@-2K<>Zo(5-~J1v@{Tu z($F(9HZe6b3t)`!)M8|q!}uYBgIQoVqZAjAvXSsqFxKEw;^5#BVq}zJOi^Ov;L{K? zVAE3MlE^Pg*Gnu&OwLHvD=tVZ%GN7OEh^5;&l6%b)-%>KG~luVJH!g^5Ccvod@e~q ta*4wPKM#fo#w1olJz&^!%wQB?l4xiUP-0@>(Qq{2Z2H2?>>uPT0RTfgUfTcw literal 0 HcmV?d00001 From 64ba08eb4e5a8de2a41d17bce1a8ea24ecd54758 Mon Sep 17 00:00:00 2001 From: Rui Mo Date: Thu, 20 Feb 2025 14:59:48 +0000 Subject: [PATCH 2/4] Add e2e test --- velox/dwio/dwrf/reader/ReaderBase.cpp | 14 ++-- velox/dwio/dwrf/test/E2EFilterTest.cpp | 86 ++++++++++++++++++++++--- velox/dwio/dwrf/utils/ProtoUtils.cpp | 19 +++++- velox/dwio/dwrf/writer/ColumnWriter.cpp | 28 ++++++-- velox/dwio/dwrf/writer/Writer.cpp | 7 +- 5 files changed, 133 insertions(+), 21 deletions(-) diff --git a/velox/dwio/dwrf/reader/ReaderBase.cpp b/velox/dwio/dwrf/reader/ReaderBase.cpp index 71dbc2e41510..16bd779b500a 100644 --- a/velox/dwio/dwrf/reader/ReaderBase.cpp +++ b/velox/dwio/dwrf/reader/ReaderBase.cpp @@ -343,13 +343,19 @@ std::shared_ptr ReaderBase::convertType( return SMALLINT(); case TypeKind::INTEGER: return INTEGER(); - case TypeKind::BIGINT: + case TypeKind::BIGINT: { + TypePtr converted; if (type.format() == DwrfFormat::kOrc && type.getOrcPtr()->kind() == proto::orc::Type_Kind_DECIMAL) { - return DECIMAL( - type.getOrcPtr()->precision(), type.getOrcPtr()->scale()); + converted = + DECIMAL(type.getOrcPtr()->precision(), type.getOrcPtr()->scale()); + } else { + converted = BIGINT(); + common::testutil::TestValue::adjust( + "facebook::velox::dwrf::ReaderBase::convertType", &converted); } - return BIGINT(); + return converted; + } case TypeKind::HUGEINT: if (type.format() == DwrfFormat::kOrc && type.getOrcPtr()->kind() == proto::orc::Type_Kind_DECIMAL) { diff --git a/velox/dwio/dwrf/test/E2EFilterTest.cpp b/velox/dwio/dwrf/test/E2EFilterTest.cpp index 43b67e91e550..c3790c8556fb 100644 --- a/velox/dwio/dwrf/test/E2EFilterTest.cpp +++ b/velox/dwio/dwrf/test/E2EFilterTest.cpp @@ -15,6 +15,7 @@ */ #include "velox/common/base/Portability.h" +#include "velox/common/base/tests/GTestUtils.h" #include "velox/common/testutil/TestValue.h" #include "velox/dwio/common/tests/utils/E2EFilterTestBase.h" #include "velox/dwio/dwrf/reader/DwrfReader.h" @@ -64,11 +65,11 @@ class E2EFilterTest : public E2EFilterTestBase { const TypePtr& type, const std::vector& batches, bool forRowGroupSkip = false) override { - auto options = createWriterOptions(type); + setWriterOptions(type); int32_t flushCounter = 0; // If we test row group skip, we have all the data in one stripe. For // scan, we start a stripe every 'flushEveryNBatches_' batches. - options.flushPolicyFactory = [&]() { + options_.flushPolicyFactory = [&]() { return std::make_unique([&]() { return forRowGroupSkip ? false : (++flushCounter % flushEveryNBatches_ == 0); @@ -80,8 +81,8 @@ class E2EFilterTest : public E2EFilterTestBase { dwio::common::FileSink::Options{.pool = leafPool_.get()}); ASSERT_TRUE(sink->isBuffered()); auto* sinkPtr = sink.get(); - options.memoryPool = rootPool_.get(); - writer_ = std::make_unique(std::move(sink), options); + options_.memoryPool = rootPool_.get(); + writer_ = std::make_unique(std::move(sink), options_); for (auto& batch : batches) { writer_->write(batch); } @@ -105,9 +106,10 @@ class E2EFilterTest : public E2EFilterTestBase { } std::unordered_set flatMapColumns_; + dwrf::WriterOptions options_; private: - dwrf::WriterOptions createWriterOptions(const TypePtr& type) { + void setWriterOptions(const TypePtr& type) { auto config = std::make_shared(); config->set(dwrf::Config::COMPRESSION, CompressionKind_NONE); config->set(dwrf::Config::USE_VINTS, useVInts_); @@ -148,10 +150,8 @@ class E2EFilterTest : public E2EFilterTestBase { config->set>>( dwrf::Config::MAP_FLAT_COLS_STRUCT_KEYS, mapFlatColsStructKeys); } - dwrf::WriterOptions options; - options.config = config; - options.schema = writerSchema; - return options; + options_.config = config; + options_.schema = writerSchema; } std::unique_ptr writer_; @@ -227,6 +227,74 @@ TEST_F(E2EFilterTest, byteRle) { 20); } +DEBUG_ONLY_TEST_F(E2EFilterTest, shortDecimal) { + testutil::TestValue::enable(); + options_.format = DwrfFormat::kOrc; + const std::unordered_map types = { + {"shortdecimal_val:decimal(8, 5)", DECIMAL(8, 5)}, + {"shortdecimal_val:decimal(10, 5)", DECIMAL(10, 5)}, + {"shortdecimal_val:decimal(17, 5)", DECIMAL(17, 5)}}; + + for (const auto& pair : types) { + SCOPED_TESTVALUE_SET( + "facebook::velox::dwrf::ReaderBase::convertType", + std::function( + [&](TypePtr* type) { *type = pair.second; })); + testWithTypes( + pair.first, + [&]() { + makeIntDistribution( + "shortdecimal_val", + 10, // min + 100, // max + 22, // repeats + 19, // rareFrequency + -999, // rareMin + 30000, // rareMax + true); + }, + false, + {"shortdecimal_val"}, + 20); + } + options_.format = DwrfFormat::kDwrf; +} + +DEBUG_ONLY_TEST_F(E2EFilterTest, longDecimal) { + testutil::TestValue::enable(); + options_.format = DwrfFormat::kOrc; + const std::unordered_map types = { + {"longdecimal_val:decimal(30, 10)", DECIMAL(30, 10)}, + {"longdecimal_val:decimal(37, 15)", DECIMAL(37, 15)}}; + + SCOPED_TESTVALUE_SET( + "facebook::velox::dwrf::ProtoUtils::writeType", + std::function([&](bool* kindSet) { *kindSet = true; })); + for (const auto& pair : types) { + SCOPED_TESTVALUE_SET( + "facebook::velox::dwrf::ReaderBase::convertType", + std::function( + [&](TypePtr* type) { *type = pair.second; })); + testWithTypes( + pair.first, + [&]() { + makeIntDistribution( + "longdecimal_val", + 10, // min + 100, // max + 22, // repeats + 19, // rareFrequency + -999, // rareMin + 30000, // rareMax + true); + }, + true, + {"longdecimal_val"}, + 20); + } + options_.format = DwrfFormat::kDwrf; +} + TEST_F(E2EFilterTest, floatAndDouble) { testWithTypes( "float_val:float," diff --git a/velox/dwio/dwrf/utils/ProtoUtils.cpp b/velox/dwio/dwrf/utils/ProtoUtils.cpp index 405d2e79ddfb..e907c4126d3e 100644 --- a/velox/dwio/dwrf/utils/ProtoUtils.cpp +++ b/velox/dwio/dwrf/utils/ProtoUtils.cpp @@ -57,9 +57,22 @@ void ProtoUtils::writeType( if (parent) { parent->add_subtypes(footer.types_size() - 1); } - auto kind = - VELOX_STATIC_FIELD_DYNAMIC_DISPATCH(SchemaType, kind, type.kind()); - self->set_kind(kind); + bool kindSet = false; + if (type.kind() == TypeKind::HUGEINT) { + // Hugeint is not supported by DWRF, and this branch is only for ORC + // testing before the ORC footer write is implemented. + auto kind = SchemaType::kind; + self->set_kind(kind); + common::testutil::TestValue::adjust( + "facebook::velox::dwrf::ProtoUtils::writeType", &kindSet); + } else { + auto kind = + VELOX_STATIC_FIELD_DYNAMIC_DISPATCH(SchemaType, kind, type.kind()); + self->set_kind(kind); + kindSet = true; + } + VELOX_CHECK(kindSet, "Unknown type {}.", type.toString()); + switch (type.kind()) { case TypeKind::ROW: { auto& row = type.asRow(); diff --git a/velox/dwio/dwrf/writer/ColumnWriter.cpp b/velox/dwio/dwrf/writer/ColumnWriter.cpp index 2a4cf2077961..a1084f4d6104 100644 --- a/velox/dwio/dwrf/writer/ColumnWriter.cpp +++ b/velox/dwio/dwrf/writer/ColumnWriter.cpp @@ -2183,7 +2183,12 @@ std::unique_ptr BaseColumnWriter::create( context, type, sequence, onRecordPosition); ret->children_.reserve(type.size()); for (int32_t i = 0; i < type.size(); ++i) { - ret->children_.push_back(create(context, *type.childAt(i), sequence)); + ret->children_.push_back(create( + context, + *type.childAt(i), + sequence, + /*onRecordPosition=*/nullptr, + format)); } return ret; } @@ -2199,15 +2204,30 @@ std::unique_ptr BaseColumnWriter::create( } auto ret = std::make_unique( context, type, sequence, onRecordPosition); - ret->children_.push_back(create(context, *type.childAt(0), sequence)); - ret->children_.push_back(create(context, *type.childAt(1), sequence)); + ret->children_.push_back(create( + context, + *type.childAt(0), + sequence, + /*onRecordPosition=*/nullptr, + format)); + ret->children_.push_back(create( + context, + *type.childAt(1), + sequence, + /*onRecordPosition=*/nullptr, + format)); return ret; } case TypeKind::ARRAY: { VELOX_CHECK_EQ(type.size(), 1, "Array should have exactly one child"); auto ret = std::make_unique( context, type, sequence, onRecordPosition); - ret->children_.push_back(create(context, *type.childAt(0), sequence)); + ret->children_.push_back(create( + context, + *type.childAt(0), + sequence, + /*onRecordPosition=*/nullptr, + format)); return ret; } default: diff --git a/velox/dwio/dwrf/writer/Writer.cpp b/velox/dwio/dwrf/writer/Writer.cpp index d6011c38f8de..b5af93a2cc1b 100644 --- a/velox/dwio/dwrf/writer/Writer.cpp +++ b/velox/dwio/dwrf/writer/Writer.cpp @@ -200,7 +200,12 @@ Writer::Writer( } if (options.columnWriterFactory == nullptr) { - writer_ = BaseColumnWriter::create(writerBase_->getContext(), *schema_); + writer_ = BaseColumnWriter::create( + writerBase_->getContext(), + *schema_, + /*sequence=*/0, + /*onRecordPosition=*/nullptr, + options.format); } else { writer_ = options.columnWriterFactory(writerBase_->getContext(), *schema_); } From 12e2a49556a958ce96cdd42705d65e81d4f62e69 Mon Sep 17 00:00:00 2001 From: Rui Mo Date: Fri, 21 Feb 2025 11:15:56 +0000 Subject: [PATCH 3/4] Remove table scan test --- velox/dwio/dwrf/test/E2EFilterTest.cpp | 2 +- velox/exec/tests/CMakeLists.txt | 1 - velox/exec/tests/TableScanTest.cpp | 121 ------------------------ velox/exec/tests/data/long_decimal.orc | Bin 485 -> 0 bytes velox/exec/tests/data/short_decimal.orc | Bin 429 -> 0 bytes 5 files changed, 1 insertion(+), 123 deletions(-) delete mode 100644 velox/exec/tests/data/long_decimal.orc delete mode 100644 velox/exec/tests/data/short_decimal.orc diff --git a/velox/dwio/dwrf/test/E2EFilterTest.cpp b/velox/dwio/dwrf/test/E2EFilterTest.cpp index c3790c8556fb..6a56da80e891 100644 --- a/velox/dwio/dwrf/test/E2EFilterTest.cpp +++ b/velox/dwio/dwrf/test/E2EFilterTest.cpp @@ -288,7 +288,7 @@ DEBUG_ONLY_TEST_F(E2EFilterTest, longDecimal) { 30000, // rareMax true); }, - true, + false, {"longdecimal_val"}, 20); } diff --git a/velox/exec/tests/CMakeLists.txt b/velox/exec/tests/CMakeLists.txt index 947f7f22a6de..572ba537c10f 100644 --- a/velox/exec/tests/CMakeLists.txt +++ b/velox/exec/tests/CMakeLists.txt @@ -129,7 +129,6 @@ target_link_libraries( velox_aggregates velox_dwio_common velox_dwio_common_exception - velox_dwio_orc_reader velox_dwio_common_test_utils velox_dwio_parquet_reader velox_dwio_parquet_writer diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index d4d963506838..92e65abe8e70 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -34,7 +34,6 @@ #include "velox/connectors/hive/HivePartitionFunction.h" #include "velox/dwio/common/CacheInputStream.h" #include "velox/dwio/common/tests/utils/DataFiles.h" -#include "velox/dwio/orc/reader/OrcReader.h" #include "velox/exec/Cursor.h" #include "velox/exec/Exchange.h" #include "velox/exec/OutputBufferManager.h" @@ -46,7 +45,6 @@ #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" #include "velox/expression/ExprToSubfieldFilter.h" -#include "velox/functions/lib/IsNull.h" #include "velox/type/Timestamp.h" #include "velox/type/Type.h" #include "velox/type/tests/SubfieldFiltersBuilder.h" @@ -80,7 +78,6 @@ class TableScanTest : public virtual HiveConnectorTestBase { HiveConnectorTestBase::SetUp(); exec::ExchangeSource::factories().clear(); exec::ExchangeSource::registerFactory(createLocalExchangeSource); - orc::registerOrcReaderFactory(); } static void SetUpTestCase() { @@ -1858,124 +1855,6 @@ TEST_F(TableScanTest, validFileNoData) { assertQuery(op, split, ""); } -TEST_F(TableScanTest, shortDecimalFilter) { - functions::registerIsNotNullFunction("isnotnull"); - - std::vector> values = { - 123456789123456789L, - 987654321123456L, - std::nullopt, - 2000000000000000L, - 5000000000000000L, - 987654321987654321L, - 100000000000000L, - 1230000000123456L, - 120000000123456L, - std::nullopt}; - auto rowVector = makeRowVector({ - makeNullableFlatVector(values, DECIMAL(18, 6)), - }); - createDuckDbTable({rowVector}); - - auto filePath = facebook::velox::test::getDataFilePath( - "velox/exec/tests", "data/short_decimal.orc"); - auto split = exec::test::HiveConnectorSplitBuilder(filePath) - .start(0) - .length(fs::file_size(filePath)) - .fileFormat(dwio::common::FileFormat::ORC) - .build(); - - auto rowType = ROW({"d"}, {DECIMAL(18, 6)}); - - // Is not null. - auto op = - PlanBuilder().tableScan(rowType, {}, "isnotnull(d)", rowType).planNode(); - assertQuery(op, split, "SELECT c0 FROM tmp where c0 is not null"); - - // Is null. - op = PlanBuilder().tableScan(rowType, {}, "is_null(d)", rowType).planNode(); - assertQuery(op, split, "SELECT c0 FROM tmp where c0 is null"); - - // BigintRange. - op = - PlanBuilder() - .tableScan( - rowType, - {}, - "d > 2000000000.0::DECIMAL(18, 6) and d < 6000000000.0::DECIMAL(18, 6)", - rowType) - .planNode(); - assertQuery( - op, - split, - "SELECT c0 FROM tmp where c0 > 2000000000.0 and c0 < 6000000000.0"); - - // NegatedBigintRange. - op = - PlanBuilder() - .tableScan( - rowType, - {}, - "not(d between 2000000000.0::DECIMAL(18, 6) and 6000000000.0::DECIMAL(18, 6))", - rowType) - .planNode(); - assertQuery( - op, - split, - "SELECT c0 FROM tmp where c0 < 2000000000.0 or c0 > 6000000000.0"); -} - -TEST_F(TableScanTest, longDecimalFilter) { - functions::registerIsNotNullFunction("isnotnull"); - - std::vector> values = { - HugeInt::parse("123456789123456789123456789" + std::string(9, '0')), - HugeInt::parse("987654321123456789" + std::string(9, '0')), - std::nullopt, - HugeInt::parse("2" + std::string(37, '0')), - HugeInt::parse("5" + std::string(37, '0')), - HugeInt::parse("987654321987654321987654321" + std::string(9, '0')), - HugeInt::parse("1" + std::string(26, '0')), - HugeInt::parse("123000000012345678" + std::string(10, '0')), - HugeInt::parse("120000000123456789" + std::string(9, '0')), - HugeInt::parse("9" + std::string(37, '0'))}; - auto rowVector = makeRowVector({ - makeNullableFlatVector(values, DECIMAL(38, 18)), - }); - createDuckDbTable({rowVector}); - - auto filePath = facebook::velox::test::getDataFilePath( - "velox/exec/tests", "data/long_decimal.orc"); - auto split = exec::test::HiveConnectorSplitBuilder(filePath) - .start(0) - .length(fs::file_size(filePath)) - .fileFormat(dwio::common::FileFormat::ORC) - .build(); - - auto rowType = ROW({"d"}, {DECIMAL(38, 18)}); - auto op = - PlanBuilder().tableScan(rowType, {}, "isnotnull(d)", rowType).planNode(); - assertQuery(op, split, "SELECT c0 FROM tmp where c0 is not null"); - - // Is null. - op = PlanBuilder().tableScan(rowType, {}, "is_null(d)", rowType).planNode(); - assertQuery(op, split, "SELECT c0 FROM tmp where c0 is null"); - - // HugeintRange. - op = - PlanBuilder() - .tableScan( - rowType, - {}, - "d > 2000000000.0::DECIMAL(38, 18) and d < 6000000000.0::DECIMAL(38, 18)", - rowType) - .planNode(); - assertQuery( - op, - split, - "SELECT c0 FROM tmp where c0 > 2000000000.0 and c0 < 6000000000.0"); -} - // An invalid (size = 0) file. TEST_F(TableScanTest, emptyFile) { auto filePath = TempFilePath::create(); diff --git a/velox/exec/tests/data/long_decimal.orc b/velox/exec/tests/data/long_decimal.orc deleted file mode 100644 index f732246b469aab0ccddbf80fbdfb98ecf6adc79a..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 485 zcmeYdau#G@;9?VE;ou5js9|6T7vKuz;$#qHR1xyv;52gL;xshiW#kYNvE=2_V`LDJ zk~cIn6l4rBHZ`#@H#0RcHqtY(Fa#kpivUJ;28Mt453FZkSoLSof%ngEJbwE9`$L{p zce=N~oc?x;bVEbK-bGD|`&P3wG#r?5?dbK#uelzbIK!wSX^ z5gg0{9~h;$fRv9!o`SIkml6jDj}RlH6l01K8wa0;nt_m(B9}ydQMz7YL1J=7s$OwH zVo|nUS!z*nW`3R!tFfN3o}mGk71$|OaHkk>GJ!&ylhX(h+9s%OVPy6J1$IQ9!vsGM nh6u(aRzp2=Ju{A(i~>v&4GjWHObk34&IX)KUznNwgPbJ*j+A$+ diff --git a/velox/exec/tests/data/short_decimal.orc b/velox/exec/tests/data/short_decimal.orc deleted file mode 100644 index d442711fc750540744f81c1bb561d2ecf180f0af..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 429 zcmeYdau#G@;9?VE;ou5jh+$wb72q=F;$#qHR1s3+;4o6);xshiWsDFKwzM!eGc_?b zG6XX8AglmJb_Rxj_Z#vV7*_phI{Wp~*=@`XhmQ8QO+3ilaAD17zJ?R)7pOiuwc!2x z>2J44H?*veZ|Gb$aog(kEDfFA*S?-G<7HrQV?V=i;7lUO5gcp+3=-U6no+_KNHYj9 zO0WTGMj&kkq*;M9rw|hd0}wOSFfe$yaJg}TUCw0(bgUV~u@-2K<>Zo(5-~J1v@{Tu z($F(9HZe6b3t)`!)M8|q!}uYBgIQoVqZAjAvXSsqFxKEw;^5#BVq}zJOi^Ov;L{K? zVAE3MlE^Pg*Gnu&OwLHvD=tVZ%GN7OEh^5;&l6%b)-%>KG~luVJH!g^5Ccvod@e~q ta*4wPKM#fo#w1olJz&^!%wQB?l4xiUP-0@>(Qq{2Z2H2?>>uPT0RTfgUfTcw From f61dc1f1f7cb016dc09cd86e98c12cbf26443a82 Mon Sep 17 00:00:00 2001 From: Rui Mo Date: Mon, 24 Feb 2025 16:20:14 +0000 Subject: [PATCH 4/4] Fix --- velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.cpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.cpp index 1a3d369384bf..644bc07de1c9 100644 --- a/velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.cpp @@ -261,6 +261,13 @@ void SelectiveDecimalColumnReader::read( const uint64_t* incomingNulls) { VELOX_CHECK(!scanSpec_->valueHook()); prepareRead(offset, rows, incomingNulls); + if (!resultNulls_ || !resultNulls_->unique() || + resultNulls_->capacity() * 8 < rows.size()) { + // Make sure a dedicated resultNulls_ is allocated with enough capacity as + // RleDecoder always assumes it is available. + resultNulls_ = AlignedBuffer::allocate(rows.size(), memoryPool_); + rawResultNulls_ = resultNulls_->asMutable(); + } bool isDense = rows.back() == rows.size() - 1; if (isDense) { readHelper(scanSpec_->filter(), rows);