Skip to content

Commit 0da92cb

Browse files
Bikramjeet Vigfacebook-github-bot
Bikramjeet Vig
authored andcommitted
Consolidate Operator's close and abort APIs (facebookincubator#8757)
Summary: The close() and abort() APIs share the same objective: to release resources held by the operator. However, they are not currently implemented uniformly across all operators. For example, HashBuild and HashProbe have abort() implemented, but not close(). This discrepancy can lead to inconsistencies in the expected effects of these API calls. For instance, when a driver is destroyed, it calls close() on all its operators before detaching itself from its parent task. All operators, with the exception of HashBuild and HashProbe, would have their resources released. The latter, however, would rely on their destructor being called, which could occur at any later point. The detachment of the driver from the task serves as a synchronization point. If we now rely on the destructor being called later, this introduces an element of indeterminism to the state of the resources. This unpredictability makes it difficult for memory management to make decisions during arbitration. This change aims to eliminate the abort() API and consolidate its functionality into close(). Additionally, it serializer access to HashBuild's internal state (table and spiller) to handle the case where it can be concurrently cleared by the task thread closing the operator and the being read by the last hash build operator attempting to build the hash table by fetching this internal state from all its peers. Reviewed By: oerling Differential Revision: D53818809
1 parent 929fd37 commit 0da92cb

10 files changed

+85
-41
lines changed

velox/exec/HashAggregation.cpp

-4
Original file line numberDiff line numberDiff line change
@@ -450,10 +450,6 @@ void HashAggregation::close() {
450450
groupingSet_.reset();
451451
}
452452

453-
void HashAggregation::abort() {
454-
close();
455-
}
456-
457453
void HashAggregation::updateEstimatedOutputRowSize() {
458454
const auto optionalRowSize = groupingSet_->estimateOutputRowSize();
459455
if (!optionalRowSize.has_value()) {

velox/exec/HashAggregation.h

-2
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,6 @@ class HashAggregation : public Operator {
5050

5151
void close() override;
5252

53-
void abort() override;
54-
5553
private:
5654
void updateRuntimeStats();
5755

velox/exec/HashBuild.cpp

+59-17
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ HashBuild::HashBuild(
120120
tableType_ = ROW(std::move(names), std::move(types));
121121
setupTable();
122122
setupSpiller();
123+
intermediateStateCleared_ = false;
123124
}
124125

125126
void HashBuild::initialize() {
@@ -783,7 +784,14 @@ bool HashBuild::finishHashBuild() {
783784
return true;
784785
}
785786
}
786-
numRows += build->table_->rows()->numRows();
787+
{
788+
std::lock_guard<std::mutex> l(build->intermediateStateMutex_);
789+
VELOX_CHECK(
790+
!build->intermediateStateCleared_,
791+
"Intermediate state for a peer is empty. It might have been "
792+
"already closed.");
793+
numRows += build->table_->rows()->numRows();
794+
}
787795
otherBuilds.push_back(build);
788796
}
789797

@@ -793,12 +801,22 @@ bool HashBuild::finishHashBuild() {
793801
otherTables.reserve(peers.size());
794802
SpillPartitionSet spillPartitions;
795803
for (auto* build : otherBuilds) {
796-
VELOX_CHECK_NOT_NULL(build->table_);
797-
otherTables.push_back(std::move(build->table_));
798-
if (build->spiller_ != nullptr) {
799-
build->spiller_->finishSpill(spillPartitions);
804+
std::unique_ptr<Spiller> buildSpiller;
805+
{
806+
std::lock_guard<std::mutex> l(build->intermediateStateMutex_);
807+
VELOX_CHECK(
808+
!build->intermediateStateCleared_,
809+
"Intermediate state for a peer is empty. It might have been "
810+
"already closed.");
811+
build->intermediateStateCleared_ = true;
812+
VELOX_CHECK_NOT_NULL(build->table_);
813+
otherTables.push_back(std::move(build->table_));
814+
buildSpiller = std::move(build->spiller_);
815+
}
816+
if (buildSpiller != nullptr) {
817+
buildSpiller->finishSpill(spillPartitions);
800818
}
801-
build->recordSpillStats();
819+
build->recordSpillStats(buildSpiller.get());
802820
}
803821

804822
if (spiller_ != nullptr) {
@@ -830,6 +848,7 @@ bool HashBuild::finishHashBuild() {
830848
addRuntimeStats();
831849
if (joinBridge_->setHashTable(
832850
std::move(table_), std::move(spillPartitions), joinHasNullKeys_)) {
851+
intermediateStateCleared_ = true;
833852
spillGroup_->restart();
834853
}
835854

@@ -840,8 +859,12 @@ bool HashBuild::finishHashBuild() {
840859
}
841860

842861
void HashBuild::recordSpillStats() {
843-
if (spiller_ != nullptr) {
844-
const auto spillStats = spiller_->stats();
862+
recordSpillStats(spiller_.get());
863+
}
864+
865+
void HashBuild::recordSpillStats(Spiller* spiller) {
866+
if (spiller != nullptr) {
867+
const auto spillStats = spiller->stats();
845868
VELOX_CHECK_EQ(spillStats.spillSortTimeUs, 0);
846869
Operator::recordSpillStats(spillStats);
847870
} else if (exceededMaxSpillLevelLimit_) {
@@ -916,6 +939,7 @@ void HashBuild::setupSpillInput(HashJoinBridge::SpillInput spillInput) {
916939

917940
setupTable();
918941
setupSpiller(spillInput.spillPartition.get());
942+
intermediateStateCleared_ = false;
919943

920944
// Start to process spill input.
921945
processSpillInput();
@@ -1103,7 +1127,9 @@ void HashBuild::reclaim(
11031127

11041128
TestValue::adjust("facebook::velox::exec::HashBuild::reclaim", this);
11051129

1106-
if (spiller_ == nullptr) {
1130+
// can another thread call close() while hashbuild is in arbitration and
1131+
// reclaim is called on it?
1132+
if (exceededMaxSpillLevelLimit_) {
11071133
// NOTE: we might have reached to the max spill limit.
11081134
return;
11091135
}
@@ -1117,7 +1143,9 @@ void HashBuild::reclaim(
11171143
LOG(WARNING) << "Can't reclaim from hash build operator, state_["
11181144
<< stateName(state_) << "], nonReclaimableSection_["
11191145
<< nonReclaimableSection_ << "], spiller_["
1120-
<< (spiller_->finalized() ? "finalized" : "non-finalized")
1146+
<< (intermediateStateCleared_ || spiller_->finalized()
1147+
? "finalized"
1148+
: "non-finalized")
11211149
<< "] " << pool()->name()
11221150
<< ", usage: " << succinctBytes(pool()->currentBytes());
11231151
return;
@@ -1195,17 +1223,31 @@ void HashBuild::reclaim(
11951223
}
11961224

11971225
bool HashBuild::nonReclaimableState() const {
1226+
// Apart from being in the nonReclaimable section,
1227+
// its also not reclaimable if:
1228+
// 1) the hash table has been built by the last build thread (inidicated
1229+
// by state_)
1230+
// 2) the last build operator has transferred ownership of 'this' operator's
1231+
// intermediate state (table_ and spiller_) to itself
1232+
// 3) it has completed spilling before reaching either of the previous
1233+
// two states.
11981234
return ((state_ != State::kRunning) && (state_ != State::kWaitForBuild) &&
11991235
(state_ != State::kYield)) ||
1200-
nonReclaimableSection_ || spiller_->finalized();
1236+
nonReclaimableSection_ || intermediateStateCleared_ ||
1237+
spiller_->finalized();
12011238
}
12021239

1203-
void HashBuild::abort() {
1204-
Operator::abort();
1240+
void HashBuild::close() {
1241+
Operator::close();
12051242

1206-
// Free up major memory usage.
1207-
joinBridge_.reset();
1208-
spiller_.reset();
1209-
table_.reset();
1243+
{
1244+
// Free up major memory usage. Gate access to them as they can be accessed
1245+
// by the last build thread that finishes building the hash table.
1246+
std::lock_guard<std::mutex> l(intermediateStateMutex_);
1247+
intermediateStateCleared_ = true;
1248+
joinBridge_.reset();
1249+
spiller_.reset();
1250+
table_.reset();
1251+
}
12101252
}
12111253
} // namespace facebook::velox::exec

velox/exec/HashBuild.h

+18-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ class HashBuild final : public Operator {
8585
void reclaim(uint64_t targetBytes, memory::MemoryReclaimer::Stats& stats)
8686
override;
8787

88-
void abort() override;
88+
void close() override;
8989

9090
private:
9191
void setState(State state);
@@ -123,6 +123,7 @@ class HashBuild final : public Operator {
123123
}
124124

125125
void recordSpillStats();
126+
void recordSpillStats(Spiller* spiller);
126127

127128
// Indicates if the input is read from spill data or not.
128129
bool isInputFromSpill() const;
@@ -267,6 +268,19 @@ class HashBuild final : public Operator {
267268
// The row type used for hash table build and disk spilling.
268269
RowTypePtr tableType_;
269270

271+
// Used to serialize access to intermediate state variables (like 'table_' and
272+
// 'spiller_'). This is only required when variables are accessed
273+
// concurrently, that is, when a thread tries to close the operator while
274+
// another thread is building the hash table. Refer to 'close()' and
275+
// finishHashBuild()' for more details.
276+
std::mutex intermediateStateMutex_;
277+
278+
// Indicates if the intermediate state ('table_' and 'spiller_') has
279+
// been cleared. This can happen either when the operator is closed or when
280+
// the last hash build operator transfers ownership of them to itself while
281+
// building the final hash table.
282+
bool intermediateStateCleared_{false};
283+
270284
// Container for the rows being accumulated.
271285
std::unique_ptr<BaseHashTable> table_;
272286

@@ -305,6 +319,9 @@ class HashBuild final : public Operator {
305319
uint64_t numSpillRows_{0};
306320
uint64_t numSpillBytes_{0};
307321

322+
// This can be nullptr if either spilling is not allowed or it has been
323+
// trsnaferred to the last hash build operator while in kWaitForBuild state or
324+
// it has been cleared to setup a new one for recursive spilling.
308325
std::unique_ptr<Spiller> spiller_;
309326

310327
// Used to read input from previously spilled data for restoring.

velox/exec/HashProbe.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -1440,8 +1440,8 @@ void HashProbe::setRunning() {
14401440
setState(ProbeOperatorState::kRunning);
14411441
}
14421442

1443-
void HashProbe::abort() {
1444-
Operator::abort();
1443+
void HashProbe::close() {
1444+
Operator::close();
14451445

14461446
// Free up major memory usage.
14471447
joinBridge_.reset();

velox/exec/HashProbe.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class HashProbe : public Operator {
6666
return false;
6767
}
6868

69-
void abort() override;
69+
void close() override;
7070

7171
void clearDynamicFilters() override;
7272

velox/exec/Operator.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -642,7 +642,7 @@ void Operator::MemoryReclaimer::abort(
642642
driver->state().isTerminated);
643643
VELOX_CHECK(driver->task()->isCancelled());
644644

645-
// Calls operator abort to free up major memory usage.
646-
op_->abort();
645+
// Calls operator close to free up major memory usage.
646+
op_->close();
647647
}
648648
} // namespace facebook::velox::exec

velox/exec/Operator.h

-9
Original file line numberDiff line numberDiff line change
@@ -424,15 +424,6 @@ class Operator : public BaseRuntimeStatWriter {
424424
operatorCtx_->pool()->release();
425425
}
426426

427-
/// Invoked by memory arbitrator to free up operator's resource immediately on
428-
/// memory abort, and the query will stop running after this call.
429-
///
430-
/// NOTE: we don't expect any access to this operator except close method
431-
/// call.
432-
virtual void abort() {
433-
close();
434-
}
435-
436427
// Returns true if 'this' never has more output rows than input rows.
437428
virtual bool isFilter() const {
438429
return false;

velox/exec/OrderBy.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,8 @@ RowVectorPtr OrderBy::getOutput() {
104104
return output;
105105
}
106106

107-
void OrderBy::abort() {
108-
Operator::abort();
107+
void OrderBy::close() {
108+
Operator::close();
109109
sortBuffer_.reset();
110110
}
111111

velox/exec/OrderBy.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class OrderBy : public Operator {
6060
void reclaim(uint64_t targetBytes, memory::MemoryReclaimer::Stats& stats)
6161
override;
6262

63-
void abort() override;
63+
void close() override;
6464

6565
private:
6666
// Invoked to record the spilling stats in operator stats after processing all

0 commit comments

Comments
 (0)