Skip to content

Commit 8f0adb1

Browse files
duanmengfacebook-github-bot
authored andcommitted
Consolidate SpillStats (facebookincubator#9211)
Summary: Decouple spill stats from the spiller as row number and hash probe spilling might use more than one and different spillers. Consolidate to use one spill stats to collect the spill stats to streamline implementation. This PR introduces a synchronized spill stats within the operator to gather these stats and later on we could separate them for different types of spiller if offline analysis needs. Pull Request resolved: facebookincubator#9211 Reviewed By: tanjialiang Differential Revision: D55287100 Pulled By: xiaoxmeng fbshipit-source-id: ffde57d4f3425e3f3f679252504f7690e8dfce68
1 parent 3d30bf5 commit 8f0adb1

34 files changed

+196
-203
lines changed

velox/connectors/hive/HiveDataSink.cpp

+6-5
Original file line numberDiff line numberDiff line change
@@ -510,8 +510,9 @@ DataSink::Stats HiveDataSink::stats() const {
510510
for (int i = 0; i < writerInfo_.size(); ++i) {
511511
const auto& info = writerInfo_.at(i);
512512
VELOX_CHECK_NOT_NULL(info);
513-
if (!info->spillStats->empty()) {
514-
stats.spillStats += *info->spillStats;
513+
const auto spillStats = info->spillStats->rlock();
514+
if (!spillStats->empty()) {
515+
stats.spillStats += *spillStats;
515516
}
516517
}
517518
return stats;
@@ -719,15 +720,15 @@ HiveDataSink::maybeCreateBucketSortWriter(
719720
sortCompareFlags_,
720721
sortPool,
721722
writerInfo_.back()->nonReclaimableSectionHolder.get(),
722-
spillConfig_);
723+
spillConfig_,
724+
writerInfo_.back()->spillStats.get());
723725
return std::make_unique<dwio::common::SortingWriter>(
724726
std::move(writer),
725727
std::move(sortBuffer),
726728
hiveConfig_->sortWriterMaxOutputRows(
727729
connectorQueryCtx_->sessionProperties()),
728730
hiveConfig_->sortWriterMaxOutputBytes(
729-
connectorQueryCtx_->sessionProperties()),
730-
writerInfo_.back()->spillStats.get());
731+
connectorQueryCtx_->sessionProperties()));
731732
}
732733

733734
void HiveDataSink::splitInputRowsAndEnsureWriters() {

velox/connectors/hive/HiveDataSink.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ struct HiveWriterInfo {
355355
std::shared_ptr<memory::MemoryPool> _sortPool)
356356
: writerParameters(std::move(parameters)),
357357
nonReclaimableSectionHolder(new tsan_atomic<bool>(false)),
358-
spillStats(new common::SpillStats()),
358+
spillStats(std::make_unique<folly::Synchronized<common::SpillStats>>()),
359359
writerPool(std::move(_writerPool)),
360360
sinkPool(std::move(_sinkPool)),
361361
sortPool(std::move(_sortPool)) {}
@@ -364,7 +364,7 @@ struct HiveWriterInfo {
364364
const std::unique_ptr<tsan_atomic<bool>> nonReclaimableSectionHolder;
365365
/// Collects the spill stats from sort writer if the spilling has been
366366
/// triggered.
367-
const std::unique_ptr<common::SpillStats> spillStats;
367+
const std::unique_ptr<folly::Synchronized<common::SpillStats>> spillStats;
368368
const std::shared_ptr<memory::MemoryPool> writerPool;
369369
const std::shared_ptr<memory::MemoryPool> sinkPool;
370370
const std::shared_ptr<memory::MemoryPool> sortPool;

velox/dwio/common/SortingWriter.cpp

+2-9
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,15 @@ SortingWriter::SortingWriter(
2222
std::unique_ptr<Writer> writer,
2323
std::unique_ptr<exec::SortBuffer> sortBuffer,
2424
uint32_t maxOutputRowsConfig,
25-
uint64_t maxOutputBytesConfig,
26-
velox::common::SpillStats* spillStats)
25+
uint64_t maxOutputBytesConfig)
2726
: outputWriter_(std::move(writer)),
2827
maxOutputRowsConfig_(maxOutputRowsConfig),
2928
maxOutputBytesConfig_(maxOutputBytesConfig),
3029
sortPool_(sortBuffer->pool()),
3130
canReclaim_(sortBuffer->canSpill()),
32-
spillStats_(spillStats),
3331
sortBuffer_(std::move(sortBuffer)) {
3432
VELOX_CHECK_GT(maxOutputRowsConfig_, 0);
3533
VELOX_CHECK_GT(maxOutputBytesConfig_, 0);
36-
VELOX_CHECK_NOT_NULL(spillStats_);
3734
if (sortPool_->parent()->reclaimer() != nullptr) {
3835
sortPool_->setReclaimer(MemoryReclaimer::create(this));
3936
}
@@ -64,11 +61,7 @@ void SortingWriter::close() {
6461
outputWriter_->write(output);
6562
output = sortBuffer_->getOutput(maxOutputBatchRows);
6663
}
67-
auto spillStatsOr = sortBuffer_->spilledStats();
68-
if (spillStatsOr.has_value()) {
69-
VELOX_CHECK(canReclaim_);
70-
*spillStats_ = spillStatsOr.value();
71-
}
64+
7265
sortBuffer_.reset();
7366
sortPool_->release();
7467
outputWriter_->close();

velox/dwio/common/SortingWriter.h

+1-3
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ class SortingWriter : public Writer {
2929
std::unique_ptr<Writer> writer,
3030
std::unique_ptr<exec::SortBuffer> sortBuffer,
3131
uint32_t maxOutputRowsConfig,
32-
uint64_t maxOutputBytesConfig,
33-
velox::common::SpillStats* spillStats);
32+
uint64_t maxOutputBytesConfig);
3433

3534
~SortingWriter() override;
3635

@@ -81,7 +80,6 @@ class SortingWriter : public Writer {
8180
const uint64_t maxOutputBytesConfig_;
8281
memory::MemoryPool* const sortPool_;
8382
const bool canReclaim_;
84-
velox::common::SpillStats* const spillStats_;
8583

8684
std::unique_ptr<exec::SortBuffer> sortBuffer_;
8785
};

velox/exec/GroupingSet.cpp

+13-5
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ GroupingSet::GroupingSet(
5252
const std::optional<column_index_t>& groupIdChannel,
5353
const common::SpillConfig* spillConfig,
5454
tsan_atomic<bool>* nonReclaimableSection,
55-
OperatorCtx* operatorCtx)
55+
OperatorCtx* operatorCtx,
56+
folly::Synchronized<common::SpillStats>* spillStats)
5657
: preGroupedKeyChannels_(std::move(preGroupedKeys)),
5758
hashers_(std::move(hashers)),
5859
isGlobal_(hashers_.empty()),
@@ -69,7 +70,8 @@ GroupingSet::GroupingSet(
6970
stringAllocator_(operatorCtx->pool()),
7071
rows_(operatorCtx->pool()),
7172
isAdaptive_(queryConfig_.hashAdaptivityEnabled()),
72-
pool_(*operatorCtx->pool()) {
73+
pool_(*operatorCtx->pool()),
74+
spillStats_(spillStats) {
7375
VELOX_CHECK_NOT_NULL(nonReclaimableSection_);
7476
VELOX_CHECK(pool_.trackUsage());
7577
for (auto& hasher : hashers_) {
@@ -131,7 +133,8 @@ std::unique_ptr<GroupingSet> GroupingSet::createForMarkDistinct(
131133
/*groupIdColumn*/ std::nullopt,
132134
/*spillConfig*/ nullptr,
133135
nonReclaimableSection,
134-
operatorCtx);
136+
operatorCtx,
137+
/*spillStats_*/ nullptr);
135138
};
136139

137140
namespace {
@@ -939,7 +942,8 @@ void GroupingSet::spill() {
939942
makeSpillType(),
940943
rows->keyTypes().size(),
941944
std::vector<CompareFlags>(),
942-
spillConfig_);
945+
spillConfig_,
946+
spillStats_);
943947
}
944948
spiller_->spill();
945949
if (sortedAggregations_) {
@@ -958,7 +962,11 @@ void GroupingSet::spill(const RowContainerIterator& rowIterator) {
958962
auto* rows = table_->rows();
959963
VELOX_CHECK(pool_.trackUsage());
960964
spiller_ = std::make_unique<Spiller>(
961-
Spiller::Type::kAggregateOutput, rows, makeSpillType(), spillConfig_);
965+
Spiller::Type::kAggregateOutput,
966+
rows,
967+
makeSpillType(),
968+
spillConfig_,
969+
spillStats_);
962970

963971
spiller_->spill(rowIterator);
964972
table_->clear();

velox/exec/GroupingSet.h

+4-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ class GroupingSet {
4040
const std::optional<column_index_t>& groupIdChannel,
4141
const common::SpillConfig* spillConfig,
4242
tsan_atomic<bool>* nonReclaimableSection,
43-
OperatorCtx* operatorCtx);
43+
OperatorCtx* operatorCtx,
44+
folly::Synchronized<common::SpillStats>* spillStats);
4445

4546
~GroupingSet();
4647

@@ -359,6 +360,8 @@ class GroupingSet {
359360
// Temporary for case where an aggregate in toIntermediate() outputs post-init
360361
// state of aggregate for all rows.
361362
std::vector<char*> firstGroup_;
363+
364+
folly::Synchronized<common::SpillStats>* const spillStats_;
362365
};
363366

364367
} // namespace facebook::velox::exec

velox/exec/HashAggregation.cpp

+2-12
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,8 @@ void HashAggregation::initialize() {
106106
groupIdChannel,
107107
spillConfig_.has_value() ? &spillConfig_.value() : nullptr,
108108
&nonReclaimableSection_,
109-
operatorCtx_.get());
109+
operatorCtx_.get(),
110+
&spillStats_);
110111

111112
aggregationNode_.reset();
112113
}
@@ -188,13 +189,6 @@ void HashAggregation::updateRuntimeStats() {
188189
RuntimeMetric(hashTableStats.numTombstones);
189190
}
190191

191-
void HashAggregation::recordSpillStats() {
192-
auto spillStatsOr = groupingSet_->spilledStats();
193-
if (spillStatsOr.has_value()) {
194-
Operator::recordSpillStats(spillStatsOr.value());
195-
}
196-
}
197-
198192
void HashAggregation::prepareOutput(vector_size_t size) {
199193
if (output_) {
200194
VectorPtr output = std::move(output_);
@@ -388,7 +382,6 @@ void HashAggregation::noMoreInput() {
388382
updateEstimatedOutputRowSize();
389383
groupingSet_->noMoreInput();
390384
Operator::noMoreInput();
391-
recordSpillStats();
392385
// Release the extra reserved memory right after processing all the inputs.
393386
pool()->release();
394387
}
@@ -429,9 +422,6 @@ void HashAggregation::reclaim(
429422
// Spill all the rows starting from the next output row pointed by
430423
// 'resultIterator_'.
431424
groupingSet_->spill(resultIterator_);
432-
// NOTE: we will only spill once during the output processing stage so
433-
// record stats here.
434-
recordSpillStats();
435425
} else {
436426
// TODO: support fine-grain disk spilling based on 'targetBytes' after
437427
// having row container memory compaction support later.

velox/exec/HashAggregation.h

-4
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,6 @@ class HashAggregation : public Operator {
7272

7373
RowVectorPtr getDistinctOutput();
7474

75-
// Invoked to record the spilling stats in operator stats after processing all
76-
// the inputs.
77-
void recordSpillStats();
78-
7975
void updateEstimatedOutputRowSize();
8076

8177
std::shared_ptr<const core::AggregationNode> aggregationNode_;

velox/exec/HashBuild.cpp

+4-20
Original file line numberDiff line numberDiff line change
@@ -231,9 +231,11 @@ void HashBuild::setupSpiller(SpillPartition* spillPartition) {
231231
<< spillConfig.maxSpillLevel
232232
<< ", and disable spilling for memory pool: "
233233
<< pool()->name();
234+
++spillStats_.wlock()->spillMaxLevelExceededCount;
234235
exceededMaxSpillLevelLimit_ = true;
235236
return;
236237
}
238+
exceededMaxSpillLevelLimit_ = false;
237239
hashBits = HashBitRange(startBit, startBit + spillConfig.numPartitionBits);
238240
}
239241

@@ -243,7 +245,8 @@ void HashBuild::setupSpiller(SpillPartition* spillPartition) {
243245
table_->rows(),
244246
spillType_,
245247
std::move(hashBits),
246-
&spillConfig);
248+
&spillConfig,
249+
&spillStats_);
247250

248251
const int32_t numPartitions = spiller_->hashBits().numPartitions();
249252
spillInputIndicesBuffers_.resize(numPartitions);
@@ -732,15 +735,13 @@ bool HashBuild::finishHashBuild() {
732735
}
733736
if (spiller != nullptr) {
734737
spiller->finishSpill(spillPartitions);
735-
build->recordSpillStats(spiller.get());
736738
}
737739
}
738740

739741
if (spiller_ != nullptr) {
740742
spiller_->finishSpill(spillPartitions);
741743
removeEmptyPartitions(spillPartitions);
742744
}
743-
recordSpillStats();
744745

745746
// TODO: re-enable parallel join build with spilling triggered after
746747
// https://github.com/facebookincubator/velox/issues/3567 is fixed.
@@ -765,23 +766,6 @@ bool HashBuild::finishHashBuild() {
765766
return true;
766767
}
767768

768-
void HashBuild::recordSpillStats() {
769-
recordSpillStats(spiller_.get());
770-
}
771-
772-
void HashBuild::recordSpillStats(Spiller* spiller) {
773-
if (spiller != nullptr) {
774-
const auto spillStats = spiller->stats();
775-
VELOX_CHECK_EQ(spillStats.spillSortTimeUs, 0);
776-
Operator::recordSpillStats(spillStats);
777-
} else if (exceededMaxSpillLevelLimit_) {
778-
exceededMaxSpillLevelLimit_ = false;
779-
common::SpillStats spillStats;
780-
spillStats.spillMaxLevelExceededCount = 1;
781-
Operator::recordSpillStats(spillStats);
782-
}
783-
}
784-
785769
void HashBuild::ensureTableFits(uint64_t numRows) {
786770
// NOTE: we don't need memory reservation if all the partitions have been
787771
// spilled as nothing need to be built.

velox/exec/HashBuild.h

-3
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,6 @@ class HashBuild final : public Operator {
117117
return canReclaim();
118118
}
119119

120-
void recordSpillStats();
121-
void recordSpillStats(Spiller* spiller);
122-
123120
// Indicates if the input is read from spill data or not.
124121
bool isInputFromSpill() const;
125122

velox/exec/HashProbe.cpp

+4-10
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,8 @@ void HashProbe::maybeSetupSpillInput(
253253
spillInputPartitionIds_.begin()->partitionBitOffset(),
254254
spillInputPartitionIds_.begin()->partitionBitOffset() +
255255
spillConfig.numPartitionBits),
256-
&spillConfig);
256+
&spillConfig,
257+
&spillStats_);
257258
// Set the spill partitions to the corresponding ones at the build side. The
258259
// hash probe operator itself won't trigger any spilling.
259260
spiller_->setPartitionsSpilled(toPartitionNumSet(spillInputPartitionIds_));
@@ -1382,7 +1383,8 @@ void HashProbe::noMoreInputInternal() {
13821383
VELOX_CHECK_EQ(
13831384
spillInputPartitionIds_.size(), spiller_->spilledPartitionSet().size());
13841385
spiller_->finishSpill(spillPartitionSet_);
1385-
recordSpillStats();
1386+
VELOX_CHECK_EQ(spillStats_.rlock()->spillSortTimeUs, 0);
1387+
VELOX_CHECK_EQ(spillStats_.rlock()->spillFillTimeUs, 0);
13861388
}
13871389

13881390
const bool hasSpillData = hasMoreSpillData();
@@ -1412,14 +1414,6 @@ void HashProbe::noMoreInputInternal() {
14121414
lastProber_ = true;
14131415
}
14141416

1415-
void HashProbe::recordSpillStats() {
1416-
VELOX_CHECK_NOT_NULL(spiller_);
1417-
const auto spillStats = spiller_->stats();
1418-
VELOX_CHECK_EQ(spillStats.spillSortTimeUs, 0);
1419-
VELOX_CHECK_EQ(spillStats.spillFillTimeUs, 0);
1420-
Operator::recordSpillStats(spillStats);
1421-
}
1422-
14231417
bool HashProbe::isFinished() {
14241418
return state_ == ProbeOperatorState::kFinish;
14251419
}

velox/exec/HashProbe.h

-2
Original file line numberDiff line numberDiff line change
@@ -220,8 +220,6 @@ class HashProbe : public Operator {
220220
// next hash table from the spilled data.
221221
void noMoreInputInternal();
222222

223-
void recordSpillStats();
224-
225223
// Returns the index of the 'match' column in the output for semi project
226224
// joins.
227225
VectorPtr& matchColumn() const {

0 commit comments

Comments
 (0)