Skip to content

Commit

Permalink
feat: log number of rows
Browse files Browse the repository at this point in the history
Print to external log file the number of fetched rows.
  • Loading branch information
rimarin committed Feb 22, 2024
1 parent 5164a1c commit 8e1d3f1
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 3 deletions.
2 changes: 1 addition & 1 deletion extension/parquet/include/parquet_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ struct ParquetReaderScanState {
bool prefetch_mode = false;
bool current_group_prefetched = false;
set<string> fetchedFiles;
unordered_map<string, set<int64_t>> fetchedRowGroups;
unordered_map<string, set<uint64_t>> fetchedRowGroups;
};

struct ParquetColumnDefinition {
Expand Down
20 changes: 19 additions & 1 deletion extension/parquet/parquet_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ struct ParquetReadGlobalState : public GlobalTableFunctionState {
vector<column_t> column_ids;
TableFilterSet *filters;
set<string> fetchedFiles;
unordered_map<string, set<int64_t>> fetchedRowsGroups;
unordered_map<string, set<uint64_t>> fetchedRowsGroups;
unordered_map<string, uint64_t> fetchedGroupsToRows;

idx_t MaxThreads() const override {
return max_threads;
Expand Down Expand Up @@ -618,6 +619,14 @@ class ParquetScanFunction {
gstate.fetchedFiles.insert(data.scan_state.fetchedFiles.begin(), data.scan_state.fetchedFiles.end());
for (const auto &fileToRowsGroups : data.scan_state.fetchedRowGroups){
gstate.fetchedRowsGroups[fileToRowsGroups.first].insert(fileToRowsGroups.second.begin(), fileToRowsGroups.second.end());
auto fileRowGroups = gstate.fetchedRowsGroups[data.reader->file_name];
for (const auto &rowGroupIdx : fileRowGroups){
auto rowGroupsMetadata = data.reader->metadata->metadata->row_groups;
if (rowGroupIdx < rowGroupsMetadata.size()){
auto rowGroupMetadata = rowGroupsMetadata[rowGroupIdx];
gstate.fetchedGroupsToRows[data.reader->file_name + to_string(rowGroupIdx)] = rowGroupMetadata.num_rows;
}
}
}

bind_data.chunk_count++;
Expand All @@ -633,6 +642,15 @@ class ParquetScanFunction {
numPartitionsFile.open("partitions.log", std::fstream::out);
numPartitionsFile << gstate.fetchedFiles.size() << "\n";
numPartitionsFile.close();
// Log the number of rows
uint64_t totalRows = 0;
for (const auto &rowGroupToRows : gstate.fetchedGroupsToRows){
totalRows += rowGroupToRows.second;
}
std::ofstream numRowsFile;
numRowsFile.open("rows.log", std::fstream::out);
numRowsFile << totalRows << "\n";
numRowsFile.close();
// Log the number of fetched row groups
uint64_t totalRowGroups = 0;
for (const auto &fileToRowsGroups : gstate.fetchedRowsGroups){
Expand Down
2 changes: 1 addition & 1 deletion extension/parquet/parquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,7 @@ bool ParquetReader::ScanInternal(ParquetReaderScanState &state, DataChunk &resul

if (!rowGroupSkipped){
// Keep track of the fetched row group
auto fetchedRowGroup = state.group_idx_list[state.current_group];
uint64_t fetchedRowGroup = state.group_idx_list[state.current_group];
state.fetchedRowGroups[file_name].emplace(fetchedRowGroup);
// Keep track of the fetched file
state.fetchedFiles.emplace(file_name);
Expand Down

0 comments on commit 8e1d3f1

Please sign in to comment.