diff --git a/velox/dwio/parquet/tests/reader/CMakeLists.txt b/velox/dwio/parquet/tests/reader/CMakeLists.txt index ff8aa8a9c27e..10ecc1c4ba14 100644 --- a/velox/dwio/parquet/tests/reader/CMakeLists.txt +++ b/velox/dwio/parquet/tests/reader/CMakeLists.txt @@ -12,15 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -add_executable(velox_dwio_parquet_reader_test ParquetReaderTest.cpp) -add_test( - NAME velox_dwio_parquet_reader_test - COMMAND velox_dwio_parquet_reader_test - WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) -target_link_libraries( - velox_dwio_parquet_reader_test velox_dwio_native_parquet_reader - velox_link_libs ${TEST_LINK_LIBS}) - add_executable(velox_dwio_parquet_page_reader_test ParquetPageReaderTest.cpp) add_test( NAME velox_dwio_parquet_page_reader_test @@ -43,16 +34,40 @@ target_link_libraries( ZLIB::ZLIB ${TEST_LINK_LIBS}) +add_library(velox_dwio_parquet_reader_benchmark_lib ParquetReaderBenchmark.h) +target_link_libraries( + velox_dwio_parquet_reader_benchmark_lib + velox_dwio_parquet_reader + velox_dwio_parquet_writer + velox_exec_test_lib + velox_exec + velox_hive_connector + Folly::folly + ${FOLLY_BENCHMARK} + ${TEST_LINK_LIBS}) + add_executable(velox_dwio_parquet_reader_benchmark ParquetReaderBenchmark.cpp) target_link_libraries( velox_dwio_parquet_reader_benchmark + velox_dwio_parquet_reader_benchmark_lib velox_dwio_parquet_reader velox_dwio_parquet_writer velox_exec_test_lib velox_exec velox_hive_connector Folly::folly - ${FOLLY_BENCHMARK}) + ${FOLLY_BENCHMARK} + ${TEST_LINK_LIBS}) + +add_executable(velox_dwio_parquet_reader_test ParquetReaderTest.cpp + ParquetReaderBenchmarkTest.cpp) +add_test( + NAME velox_dwio_parquet_reader_test + COMMAND velox_dwio_parquet_reader_test + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) +target_link_libraries( + velox_dwio_parquet_reader_test velox_dwio_native_parquet_reader + velox_dwio_parquet_reader_benchmark_lib velox_link_libs ${TEST_LINK_LIBS}) add_executable(velox_dwio_parquet_structure_decoder_test NestedStructureDecoderTest.cpp) diff --git a/velox/dwio/parquet/tests/reader/ParquetReaderBenchmark.cpp b/velox/dwio/parquet/tests/reader/ParquetReaderBenchmark.cpp index a15a1060f714..c255fefe889c 100644 --- a/velox/dwio/parquet/tests/reader/ParquetReaderBenchmark.cpp +++ b/velox/dwio/parquet/tests/reader/ParquetReaderBenchmark.cpp @@ -14,17 +14,7 @@ * limitations under the License. */ -#include "velox/dwio/common/FileSink.h" -#include "velox/dwio/common/Options.h" -#include "velox/dwio/common/Statistics.h" -#include "velox/dwio/common/tests/utils/DataSetBuilder.h" -#include "velox/dwio/parquet/RegisterParquetReader.h" -#include "velox/dwio/parquet/reader/ParquetReader.h" -#include "velox/dwio/parquet/writer/Writer.h" -#include "velox/exec/tests/utils/TempDirectoryPath.h" - -#include -#include +#include "velox/dwio/parquet/tests/reader/ParquetReaderBenchmark.h" using namespace facebook::velox; using namespace facebook::velox::dwio; @@ -32,254 +22,6 @@ using namespace facebook::velox::dwio::common; using namespace facebook::velox::parquet; using namespace facebook::velox::test; -const uint32_t kNumRowsPerBatch = 60000; -const uint32_t kNumBatches = 50; -const uint32_t kNumRowsPerRowGroup = 10000; -const double kFilterErrorMargin = 0.2; - -class ParquetReaderBenchmark { - public: - explicit ParquetReaderBenchmark( - bool disableDictionary, - const RowTypePtr& rowType) - : disableDictionary_(disableDictionary) { - rootPool_ = memory::memoryManager()->addRootPool("ParquetReaderBenchmark"); - leafPool_ = rootPool_->addLeafChild("ParquetReaderBenchmark"); - dataSetBuilder_ = std::make_unique(*leafPool_, 0); - auto path = fileFolder_->path + "/" + fileName_; - auto localWriteFile = std::make_unique(path, true, false); - auto sink = - std::make_unique(std::move(localWriteFile), path); - facebook::velox::parquet::WriterOptions options; - if (disableDictionary_) { - // The parquet file is in plain encoding format. - options.enableDictionary = false; - } - options.memoryPool = rootPool_.get(); - writer_ = std::make_unique( - std::move(sink), options, rowType); - } - - ~ParquetReaderBenchmark() {} - - void writeToFile( - const std::vector& batches, - bool /*forRowGroupSkip*/) { - for (auto& batch : batches) { - writer_->write(batch); - } - writer_->flush(); - writer_->close(); - } - - FilterSpec createFilterSpec( - const std::string& columnName, - float startPct, - float selectPct, - const TypePtr& type, - bool isForRowGroupSkip, - bool allowNulls) { - switch (type->childAt(0)->kind()) { - case TypeKind::BIGINT: - case TypeKind::INTEGER: - return FilterSpec( - columnName, - startPct, - selectPct, - FilterKind::kBigintRange, - isForRowGroupSkip, - allowNulls); - case TypeKind::DOUBLE: - return FilterSpec( - columnName, - startPct, - selectPct, - FilterKind::kDoubleRange, - isForRowGroupSkip, - allowNulls); - case TypeKind::HUGEINT: - return FilterSpec( - columnName, - startPct, - selectPct, - FilterKind::kHugeintRange, - isForRowGroupSkip, - allowNulls); - case TypeKind::VARCHAR: - return FilterSpec( - columnName, - startPct, - selectPct, - FilterKind::kBytesRange, - isForRowGroupSkip, - allowNulls); - default: - VELOX_FAIL("Unsupported Data Type {}", type->childAt(0)->toString()); - } - return FilterSpec(columnName, startPct, selectPct, FilterKind(), false); - } - - std::shared_ptr createScanSpec( - const std::vector& batches, - RowTypePtr& rowType, - const std::vector& filterSpecs, - std::vector& hitRows) { - std::unique_ptr filterGenerator = - std::make_unique(rowType, 0); - auto filters = filterGenerator->makeSubfieldFilters( - filterSpecs, batches, nullptr, hitRows); - auto scanSpec = filterGenerator->makeScanSpec(std::move(filters)); - return scanSpec; - } - - std::unique_ptr createReader( - std::shared_ptr scanSpec, - const RowTypePtr& rowType) { - dwio::common::ReaderOptions readerOpts{leafPool_.get()}; - auto input = std::make_unique( - std::make_shared(fileFolder_->path + "/" + fileName_), - readerOpts.getMemoryPool()); - - std::unique_ptr reader = - std::make_unique(std::move(input), readerOpts); - - dwio::common::RowReaderOptions rowReaderOpts; - rowReaderOpts.select( - std::make_shared( - rowType, rowType->names())); - rowReaderOpts.setScanSpec(scanSpec); - auto rowReader = reader->createRowReader(rowReaderOpts); - - return rowReader; - } - - // This method is the place where we do the read opeartions. - // scanSpec contains the setting of filters. e.g. - // filterRateX100 = 30 means it would filter out 70% of rows and 30% remain. - // nullsRateX100 = 70 means it would filter out 70% of rows and 30% remain. - // Return the number of rows after the filter and null-filter. - int read( - const RowTypePtr& rowType, - std::shared_ptr scanSpec, - uint32_t nextSize) { - auto rowReader = createReader(scanSpec, rowType); - runtimeStats_ = dwio::common::RuntimeStatistics(); - - rowReader->resetFilterCaches(); - auto result = BaseVector::create(rowType, 1, leafPool_.get()); - int resultSize = 0; - while (true) { - bool hasData = rowReader->next(nextSize, result); - - if (!hasData) { - break; - } - if (result->size() == 0) { - continue; - } - - auto rowVector = result->asUnchecked(); - for (auto i = 0; i < rowVector->childrenSize(); ++i) { - rowVector->childAt(i)->loadedVector(); - } - - VELOX_CHECK_EQ( - rowVector->childrenSize(), - 1, - "The benchmark is performed on single columns. So the result should only contain one column.") - - for (int i = 0; i < rowVector->size(); i++) { - resultSize += !rowVector->childAt(0)->isNullAt(i); - } - } - - rowReader->updateRuntimeStats(runtimeStats_); - return resultSize; - } - - void readSingleColumn( - const std::string& columnName, - const TypePtr& type, - float startPct, - float selectPct, - uint8_t nullsRateX100, - uint32_t nextSize) { - folly::BenchmarkSuspender suspender; - - auto rowType = ROW({columnName}, {type}); - // Generating the data (consider the null rate). - auto batches = - dataSetBuilder_->makeDataset(rowType, kNumBatches, kNumRowsPerBatch) - .withRowGroupSpecificData(kNumRowsPerRowGroup) - .withNullsForField(Subfield(columnName), nullsRateX100) - .build(); - writeToFile(*batches, true); - std::vector filterSpecs; - - // Filters on List and Map are not supported currently. - if (type->kind() != TypeKind::ARRAY && type->kind() != TypeKind::MAP) { - filterSpecs.emplace_back(createFilterSpec( - columnName, startPct, selectPct, rowType, false, false)); - } - - std::vector hitRows; - auto scanSpec = createScanSpec(*batches, rowType, filterSpecs, hitRows); - - suspender.dismiss(); - - // Filter range is generated from a small sample data of 4096 rows. So the - // upperBound and lowerBound are introduced to estimate the result size. - auto resultSize = read(rowType, scanSpec, nextSize); - - // Calculate the expected number of rows after the filters. - // Add one to expected to avoid 0 in calculating upperBound and lowerBound. - int expected = kNumBatches * kNumRowsPerBatch * - (1 - (double)nullsRateX100 / 100) * ((double)selectPct / 100) + - 1; - - // Make the upperBound and lowerBound large enough to avoid very small - // resultSize and expected size, where the diff ratio is relatively very - // large. - int upperBound = expected * (1 + kFilterErrorMargin) + 1; - int lowerBound = expected * (1 - kFilterErrorMargin) - 1; - upperBound = std::max(16, upperBound); - lowerBound = std::max(0, lowerBound); - - VELOX_CHECK( - resultSize <= upperBound && resultSize >= lowerBound, - "Result Size {} and Expected Size {} Mismatch", - resultSize, - expected); - } - - private: - const std::string fileName_ = "test.parquet"; - const std::shared_ptr - fileFolder_ = facebook::velox::exec::test::TempDirectoryPath::create(); - const bool disableDictionary_; - - std::unique_ptr dataSetBuilder_; - std::shared_ptr rootPool_; - std::shared_ptr leafPool_; - std::unique_ptr writer_; - RuntimeStatistics runtimeStats_; -}; - -void run( - uint32_t, - const std::string& columnName, - const TypePtr& type, - float filterRateX100, - uint8_t nullsRateX100, - uint32_t nextSize, - bool disableDictionary) { - RowTypePtr rowType = ROW({columnName}, {type}); - ParquetReaderBenchmark benchmark(disableDictionary, rowType); - BIGINT()->toString(); - benchmark.readSingleColumn( - columnName, type, 0, filterRateX100, nullsRateX100, nextSize); -} - #define PARQUET_BENCHMARKS_FILTER_NULLS(_type_, _name_, _filter_, _null_) \ BENCHMARK_NAMED_PARAM( \ run, \ diff --git a/velox/dwio/parquet/tests/reader/ParquetReaderBenchmark.h b/velox/dwio/parquet/tests/reader/ParquetReaderBenchmark.h new file mode 100644 index 000000000000..95ee805451c8 --- /dev/null +++ b/velox/dwio/parquet/tests/reader/ParquetReaderBenchmark.h @@ -0,0 +1,283 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/dwio/common/FileSink.h" +#include "velox/dwio/common/Options.h" +#include "velox/dwio/common/Statistics.h" +#include "velox/dwio/common/tests/utils/DataSetBuilder.h" +#include "velox/dwio/parquet/RegisterParquetReader.h" +#include "velox/dwio/parquet/reader/ParquetReader.h" +#include "velox/dwio/parquet/writer/Writer.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" + +#include +#include + +using namespace facebook::velox; +using namespace facebook::velox::dwio; +using namespace facebook::velox::dwio::common; +using namespace facebook::velox::parquet; +using namespace facebook::velox::test; + +const uint32_t kNumRowsPerBatch = 60000; +const uint32_t kNumBatches = 50; +const uint32_t kNumRowsPerRowGroup = 10000; +const double kFilterErrorMargin = 0.2; + +class ParquetReaderBenchmark { + public: + explicit ParquetReaderBenchmark( + bool disableDictionary, + const RowTypePtr& rowType) + : disableDictionary_(disableDictionary) { + rootPool_ = memory::memoryManager()->addRootPool("ParquetReaderBenchmark"); + leafPool_ = rootPool_->addLeafChild("ParquetReaderBenchmark"); + dataSetBuilder_ = std::make_unique(*leafPool_, 0); + auto path = fileFolder_->path + "/" + fileName_; + auto localWriteFile = std::make_unique(path, true, false); + auto sink = + std::make_unique(std::move(localWriteFile), path); + facebook::velox::parquet::WriterOptions options; + if (disableDictionary_) { + // The parquet file is in plain encoding format. + options.enableDictionary = false; + } + options.memoryPool = rootPool_.get(); + writer_ = std::make_unique( + std::move(sink), options, rowType); + } + + ~ParquetReaderBenchmark() {} + + void writeToFile( + const std::vector& batches, + bool /*forRowGroupSkip*/) { + for (auto& batch : batches) { + writer_->write(batch); + } + writer_->flush(); + writer_->close(); + } + + FilterSpec createFilterSpec( + const std::string& columnName, + float startPct, + float selectPct, + const TypePtr& type, + bool isForRowGroupSkip, + bool allowNulls) { + switch (type->childAt(0)->kind()) { + case TypeKind::BIGINT: + case TypeKind::INTEGER: + return FilterSpec( + columnName, + startPct, + selectPct, + FilterKind::kBigintRange, + isForRowGroupSkip, + allowNulls); + case TypeKind::DOUBLE: + return FilterSpec( + columnName, + startPct, + selectPct, + FilterKind::kDoubleRange, + isForRowGroupSkip, + allowNulls); + case TypeKind::HUGEINT: + return FilterSpec( + columnName, + startPct, + selectPct, + FilterKind::kHugeintRange, + isForRowGroupSkip, + allowNulls); + case TypeKind::VARCHAR: + return FilterSpec( + columnName, + startPct, + selectPct, + FilterKind::kBytesRange, + isForRowGroupSkip, + allowNulls); + default: + VELOX_FAIL("Unsupported Data Type {}", type->childAt(0)->toString()); + } + return FilterSpec(columnName, startPct, selectPct, FilterKind(), false); + } + + std::shared_ptr createScanSpec( + const std::vector& batches, + RowTypePtr& rowType, + const std::vector& filterSpecs, + std::vector& hitRows) { + std::unique_ptr filterGenerator = + std::make_unique(rowType, 0); + auto filters = filterGenerator->makeSubfieldFilters( + filterSpecs, batches, nullptr, hitRows); + auto scanSpec = filterGenerator->makeScanSpec(std::move(filters)); + return scanSpec; + } + + std::unique_ptr createReader( + std::shared_ptr scanSpec, + const RowTypePtr& rowType) { + dwio::common::ReaderOptions readerOpts{leafPool_.get()}; + auto input = std::make_unique( + std::make_shared(fileFolder_->path + "/" + fileName_), + readerOpts.getMemoryPool()); + + std::unique_ptr reader = + std::make_unique(std::move(input), readerOpts); + + dwio::common::RowReaderOptions rowReaderOpts; + rowReaderOpts.select( + std::make_shared( + rowType, rowType->names())); + rowReaderOpts.setScanSpec(scanSpec); + auto rowReader = reader->createRowReader(rowReaderOpts); + + return rowReader; + } + + // This method is the place where we do the read opeartions. + // scanSpec contains the setting of filters. e.g. + // filterRateX100 = 30 means it would filter out 70% of rows and 30% remain. + // nullsRateX100 = 70 means it would filter out 70% of rows and 30% remain. + // Return the number of rows after the filter and null-filter. + int read( + const RowTypePtr& rowType, + std::shared_ptr scanSpec, + uint32_t nextSize) { + auto rowReader = createReader(scanSpec, rowType); + runtimeStats_ = dwio::common::RuntimeStatistics(); + + rowReader->resetFilterCaches(); + auto result = BaseVector::create(rowType, 1, leafPool_.get()); + int resultSize = 0; + while (true) { + bool hasData = rowReader->next(nextSize, result); + + if (!hasData) { + break; + } + if (result->size() == 0) { + continue; + } + + auto rowVector = result->asUnchecked(); + for (auto i = 0; i < rowVector->childrenSize(); ++i) { + rowVector->childAt(i)->loadedVector(); + } + + VELOX_CHECK_EQ( + rowVector->childrenSize(), + 1, + "The benchmark is performed on single columns. So the result should only contain one column.") + + for (int i = 0; i < rowVector->size(); i++) { + resultSize += !rowVector->childAt(0)->isNullAt(i); + } + } + + rowReader->updateRuntimeStats(runtimeStats_); + return resultSize; + } + + void readSingleColumn( + const std::string& columnName, + const TypePtr& type, + float startPct, + float selectPct, + uint8_t nullsRateX100, + uint32_t nextSize) { + folly::BenchmarkSuspender suspender; + + auto rowType = ROW({columnName}, {type}); + // Generating the data (consider the null rate). + auto batches = + dataSetBuilder_->makeDataset(rowType, kNumBatches, kNumRowsPerBatch) + .withRowGroupSpecificData(kNumRowsPerRowGroup) + .withNullsForField(Subfield(columnName), nullsRateX100) + .build(); + writeToFile(*batches, true); + std::vector filterSpecs; + + // Filters on List and Map are not supported currently. + if (type->kind() != TypeKind::ARRAY && type->kind() != TypeKind::MAP) { + filterSpecs.emplace_back(createFilterSpec( + columnName, startPct, selectPct, rowType, false, false)); + } + + std::vector hitRows; + auto scanSpec = createScanSpec(*batches, rowType, filterSpecs, hitRows); + + suspender.dismiss(); + + // Filter range is generated from a small sample data of 4096 rows. So the + // upperBound and lowerBound are introduced to estimate the result size. + auto resultSize = read(rowType, scanSpec, nextSize); + + // Calculate the expected number of rows after the filters. + // Add one to expected to avoid 0 in calculating upperBound and lowerBound. + int expected = kNumBatches * kNumRowsPerBatch * + (1 - (double)nullsRateX100 / 100) * ((double)selectPct / 100) + + 1; + + // Make the upperBound and lowerBound large enough to avoid very small + // resultSize and expected size, where the diff ratio is relatively very + // large. + int upperBound = expected * (1 + kFilterErrorMargin) + 1; + int lowerBound = expected * (1 - kFilterErrorMargin) - 1; + upperBound = std::max(16, upperBound); + lowerBound = std::max(0, lowerBound); + + VELOX_CHECK( + resultSize <= upperBound && resultSize >= lowerBound, + "Result Size {} and Expected Size {} Mismatch", + resultSize, + expected); + } + + private: + const std::string fileName_ = "test.parquet"; + const std::shared_ptr + fileFolder_ = facebook::velox::exec::test::TempDirectoryPath::create(); + const bool disableDictionary_; + + std::unique_ptr dataSetBuilder_; + std::shared_ptr rootPool_; + std::shared_ptr leafPool_; + std::unique_ptr writer_; + RuntimeStatistics runtimeStats_; +}; + +namespace { +void run( + uint32_t, + const std::string& columnName, + const TypePtr& type, + float filterRateX100, + uint8_t nullsRateX100, + uint32_t nextSize, + bool disableDictionary) { + RowTypePtr rowType = ROW({columnName}, {type}); + ParquetReaderBenchmark benchmark(disableDictionary, rowType); + BIGINT()->toString(); + benchmark.readSingleColumn( + columnName, type, 0, filterRateX100, nullsRateX100, nextSize); +} +} // namespace diff --git a/velox/dwio/parquet/tests/reader/ParquetReaderBenchmarkTest.cpp b/velox/dwio/parquet/tests/reader/ParquetReaderBenchmarkTest.cpp new file mode 100644 index 000000000000..a6b263a9450d --- /dev/null +++ b/velox/dwio/parquet/tests/reader/ParquetReaderBenchmarkTest.cpp @@ -0,0 +1,24 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/dwio/parquet/tests/reader/ParquetReaderBenchmark.h" +#include + +namespace test { +TEST(ParquetReaderBenchmarkTest, UnitTest) { + run(1, "BigInt", facebook::velox::BIGINT(), 20, 20, 5000, false); +} +} // namespace test \ No newline at end of file