Skip to content

Commit

Permalink
Add test for ParquetReaderBenchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
nmahadevuni committed Feb 12, 2024
1 parent 7b68a82 commit f21dead
Show file tree
Hide file tree
Showing 4 changed files with 333 additions and 269 deletions.
35 changes: 25 additions & 10 deletions velox/dwio/parquet/tests/reader/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

add_executable(velox_dwio_parquet_reader_test ParquetReaderTest.cpp)
add_test(
NAME velox_dwio_parquet_reader_test
COMMAND velox_dwio_parquet_reader_test
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})
target_link_libraries(
velox_dwio_parquet_reader_test velox_dwio_native_parquet_reader
velox_link_libs ${TEST_LINK_LIBS})

add_executable(velox_dwio_parquet_page_reader_test ParquetPageReaderTest.cpp)
add_test(
NAME velox_dwio_parquet_page_reader_test
Expand All @@ -43,16 +34,40 @@ target_link_libraries(
ZLIB::ZLIB
${TEST_LINK_LIBS})

add_library(velox_dwio_parquet_reader_benchmark_lib ParquetReaderBenchmark.h)
target_link_libraries(
velox_dwio_parquet_reader_benchmark_lib
velox_dwio_parquet_reader
velox_dwio_parquet_writer
velox_exec_test_lib
velox_exec
velox_hive_connector
Folly::folly
${FOLLY_BENCHMARK}
${TEST_LINK_LIBS})

add_executable(velox_dwio_parquet_reader_benchmark ParquetReaderBenchmark.cpp)
target_link_libraries(
velox_dwio_parquet_reader_benchmark
velox_dwio_parquet_reader_benchmark_lib
velox_dwio_parquet_reader
velox_dwio_parquet_writer
velox_exec_test_lib
velox_exec
velox_hive_connector
Folly::folly
${FOLLY_BENCHMARK})
${FOLLY_BENCHMARK}
${TEST_LINK_LIBS})

add_executable(velox_dwio_parquet_reader_test ParquetReaderTest.cpp
ParquetReaderBenchmarkTest.cpp)
add_test(
NAME velox_dwio_parquet_reader_test
COMMAND velox_dwio_parquet_reader_test
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})
target_link_libraries(
velox_dwio_parquet_reader_test velox_dwio_native_parquet_reader
velox_dwio_parquet_reader_benchmark_lib velox_link_libs ${TEST_LINK_LIBS})

add_executable(velox_dwio_parquet_structure_decoder_test
NestedStructureDecoderTest.cpp)
Expand Down
260 changes: 1 addition & 259 deletions velox/dwio/parquet/tests/reader/ParquetReaderBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,272 +14,14 @@
* limitations under the License.
*/

#include "velox/dwio/common/FileSink.h"
#include "velox/dwio/common/Options.h"
#include "velox/dwio/common/Statistics.h"
#include "velox/dwio/common/tests/utils/DataSetBuilder.h"
#include "velox/dwio/parquet/RegisterParquetReader.h"
#include "velox/dwio/parquet/reader/ParquetReader.h"
#include "velox/dwio/parquet/writer/Writer.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"

#include <folly/Benchmark.h>
#include <folly/init/Init.h>
#include "velox/dwio/parquet/tests/reader/ParquetReaderBenchmark.h"

using namespace facebook::velox;
using namespace facebook::velox::dwio;
using namespace facebook::velox::dwio::common;
using namespace facebook::velox::parquet;
using namespace facebook::velox::test;

const uint32_t kNumRowsPerBatch = 60000;
const uint32_t kNumBatches = 50;
const uint32_t kNumRowsPerRowGroup = 10000;
const double kFilterErrorMargin = 0.2;

class ParquetReaderBenchmark {
public:
explicit ParquetReaderBenchmark(
bool disableDictionary,
const RowTypePtr& rowType)
: disableDictionary_(disableDictionary) {
rootPool_ = memory::memoryManager()->addRootPool("ParquetReaderBenchmark");
leafPool_ = rootPool_->addLeafChild("ParquetReaderBenchmark");
dataSetBuilder_ = std::make_unique<DataSetBuilder>(*leafPool_, 0);
auto path = fileFolder_->path + "/" + fileName_;
auto localWriteFile = std::make_unique<LocalWriteFile>(path, true, false);
auto sink =
std::make_unique<WriteFileSink>(std::move(localWriteFile), path);
facebook::velox::parquet::WriterOptions options;
if (disableDictionary_) {
// The parquet file is in plain encoding format.
options.enableDictionary = false;
}
options.memoryPool = rootPool_.get();
writer_ = std::make_unique<facebook::velox::parquet::Writer>(
std::move(sink), options, rowType);
}

~ParquetReaderBenchmark() {}

void writeToFile(
const std::vector<RowVectorPtr>& batches,
bool /*forRowGroupSkip*/) {
for (auto& batch : batches) {
writer_->write(batch);
}
writer_->flush();
writer_->close();
}

FilterSpec createFilterSpec(
const std::string& columnName,
float startPct,
float selectPct,
const TypePtr& type,
bool isForRowGroupSkip,
bool allowNulls) {
switch (type->childAt(0)->kind()) {
case TypeKind::BIGINT:
case TypeKind::INTEGER:
return FilterSpec(
columnName,
startPct,
selectPct,
FilterKind::kBigintRange,
isForRowGroupSkip,
allowNulls);
case TypeKind::DOUBLE:
return FilterSpec(
columnName,
startPct,
selectPct,
FilterKind::kDoubleRange,
isForRowGroupSkip,
allowNulls);
case TypeKind::HUGEINT:
return FilterSpec(
columnName,
startPct,
selectPct,
FilterKind::kHugeintRange,
isForRowGroupSkip,
allowNulls);
case TypeKind::VARCHAR:
return FilterSpec(
columnName,
startPct,
selectPct,
FilterKind::kBytesRange,
isForRowGroupSkip,
allowNulls);
default:
VELOX_FAIL("Unsupported Data Type {}", type->childAt(0)->toString());
}
return FilterSpec(columnName, startPct, selectPct, FilterKind(), false);
}

std::shared_ptr<ScanSpec> createScanSpec(
const std::vector<RowVectorPtr>& batches,
RowTypePtr& rowType,
const std::vector<FilterSpec>& filterSpecs,
std::vector<uint64_t>& hitRows) {
std::unique_ptr<FilterGenerator> filterGenerator =
std::make_unique<FilterGenerator>(rowType, 0);
auto filters = filterGenerator->makeSubfieldFilters(
filterSpecs, batches, nullptr, hitRows);
auto scanSpec = filterGenerator->makeScanSpec(std::move(filters));
return scanSpec;
}

std::unique_ptr<RowReader> createReader(
std::shared_ptr<ScanSpec> scanSpec,
const RowTypePtr& rowType) {
dwio::common::ReaderOptions readerOpts{leafPool_.get()};
auto input = std::make_unique<BufferedInput>(
std::make_shared<LocalReadFile>(fileFolder_->path + "/" + fileName_),
readerOpts.getMemoryPool());

std::unique_ptr<Reader> reader =
std::make_unique<ParquetReader>(std::move(input), readerOpts);

dwio::common::RowReaderOptions rowReaderOpts;
rowReaderOpts.select(
std::make_shared<facebook::velox::dwio::common::ColumnSelector>(
rowType, rowType->names()));
rowReaderOpts.setScanSpec(scanSpec);
auto rowReader = reader->createRowReader(rowReaderOpts);

return rowReader;
}

// This method is the place where we do the read opeartions.
// scanSpec contains the setting of filters. e.g.
// filterRateX100 = 30 means it would filter out 70% of rows and 30% remain.
// nullsRateX100 = 70 means it would filter out 70% of rows and 30% remain.
// Return the number of rows after the filter and null-filter.
int read(
const RowTypePtr& rowType,
std::shared_ptr<ScanSpec> scanSpec,
uint32_t nextSize) {
auto rowReader = createReader(scanSpec, rowType);
runtimeStats_ = dwio::common::RuntimeStatistics();

rowReader->resetFilterCaches();
auto result = BaseVector::create(rowType, 1, leafPool_.get());
int resultSize = 0;
while (true) {
bool hasData = rowReader->next(nextSize, result);

if (!hasData) {
break;
}
if (result->size() == 0) {
continue;
}

auto rowVector = result->asUnchecked<RowVector>();
for (auto i = 0; i < rowVector->childrenSize(); ++i) {
rowVector->childAt(i)->loadedVector();
}

VELOX_CHECK_EQ(
rowVector->childrenSize(),
1,
"The benchmark is performed on single columns. So the result should only contain one column.")

for (int i = 0; i < rowVector->size(); i++) {
resultSize += !rowVector->childAt(0)->isNullAt(i);
}
}

rowReader->updateRuntimeStats(runtimeStats_);
return resultSize;
}

void readSingleColumn(
const std::string& columnName,
const TypePtr& type,
float startPct,
float selectPct,
uint8_t nullsRateX100,
uint32_t nextSize) {
folly::BenchmarkSuspender suspender;

auto rowType = ROW({columnName}, {type});
// Generating the data (consider the null rate).
auto batches =
dataSetBuilder_->makeDataset(rowType, kNumBatches, kNumRowsPerBatch)
.withRowGroupSpecificData(kNumRowsPerRowGroup)
.withNullsForField(Subfield(columnName), nullsRateX100)
.build();
writeToFile(*batches, true);
std::vector<FilterSpec> filterSpecs;

// Filters on List and Map are not supported currently.
if (type->kind() != TypeKind::ARRAY && type->kind() != TypeKind::MAP) {
filterSpecs.emplace_back(createFilterSpec(
columnName, startPct, selectPct, rowType, false, false));
}

std::vector<uint64_t> hitRows;
auto scanSpec = createScanSpec(*batches, rowType, filterSpecs, hitRows);

suspender.dismiss();

// Filter range is generated from a small sample data of 4096 rows. So the
// upperBound and lowerBound are introduced to estimate the result size.
auto resultSize = read(rowType, scanSpec, nextSize);

// Calculate the expected number of rows after the filters.
// Add one to expected to avoid 0 in calculating upperBound and lowerBound.
int expected = kNumBatches * kNumRowsPerBatch *
(1 - (double)nullsRateX100 / 100) * ((double)selectPct / 100) +
1;

// Make the upperBound and lowerBound large enough to avoid very small
// resultSize and expected size, where the diff ratio is relatively very
// large.
int upperBound = expected * (1 + kFilterErrorMargin) + 1;
int lowerBound = expected * (1 - kFilterErrorMargin) - 1;
upperBound = std::max(16, upperBound);
lowerBound = std::max(0, lowerBound);

VELOX_CHECK(
resultSize <= upperBound && resultSize >= lowerBound,
"Result Size {} and Expected Size {} Mismatch",
resultSize,
expected);
}

private:
const std::string fileName_ = "test.parquet";
const std::shared_ptr<facebook::velox::exec::test::TempDirectoryPath>
fileFolder_ = facebook::velox::exec::test::TempDirectoryPath::create();
const bool disableDictionary_;

std::unique_ptr<test::DataSetBuilder> dataSetBuilder_;
std::shared_ptr<memory::MemoryPool> rootPool_;
std::shared_ptr<memory::MemoryPool> leafPool_;
std::unique_ptr<facebook::velox::parquet::Writer> writer_;
RuntimeStatistics runtimeStats_;
};

void run(
uint32_t,
const std::string& columnName,
const TypePtr& type,
float filterRateX100,
uint8_t nullsRateX100,
uint32_t nextSize,
bool disableDictionary) {
RowTypePtr rowType = ROW({columnName}, {type});
ParquetReaderBenchmark benchmark(disableDictionary, rowType);
BIGINT()->toString();
benchmark.readSingleColumn(
columnName, type, 0, filterRateX100, nullsRateX100, nextSize);
}

#define PARQUET_BENCHMARKS_FILTER_NULLS(_type_, _name_, _filter_, _null_) \
BENCHMARK_NAMED_PARAM( \
run, \
Expand Down
Loading

0 comments on commit f21dead

Please sign in to comment.