@@ -120,7 +120,7 @@ HashBuild::HashBuild(
120
120
tableType_ = ROW (std::move (names), std::move (types));
121
121
setupTable ();
122
122
setupSpiller ();
123
- intermediateStateCleared_ = false ;
123
+ stateCleared_ = false ;
124
124
}
125
125
126
126
void HashBuild::initialize () {
@@ -784,11 +784,11 @@ bool HashBuild::finishHashBuild() {
784
784
}
785
785
}
786
786
{
787
- std::lock_guard<std::mutex> l (build->intermediateStateMutex_ );
787
+ std::lock_guard<std::mutex> l (build->mutex_ );
788
788
VELOX_CHECK (
789
- !build->intermediateStateCleared_ ,
790
- " Intermediate state for a peer is empty. It might have been "
791
- " already closed." );
789
+ !build->stateCleared_ ,
790
+ " Internal state for a peer is empty. It might have already "
791
+ " been closed." );
792
792
numRows += build->table_ ->rows ()->numRows ();
793
793
}
794
794
otherBuilds.push_back (build);
@@ -800,22 +800,22 @@ bool HashBuild::finishHashBuild() {
800
800
otherTables.reserve (peers.size ());
801
801
SpillPartitionSet spillPartitions;
802
802
for (auto * build : otherBuilds) {
803
- std::unique_ptr<Spiller> buildSpiller ;
803
+ std::unique_ptr<Spiller> spiller ;
804
804
{
805
- std::lock_guard<std::mutex> l (build->intermediateStateMutex_ );
805
+ std::lock_guard<std::mutex> l (build->mutex_ );
806
806
VELOX_CHECK (
807
- !build->intermediateStateCleared_ ,
808
- " Intermediate state for a peer is empty. It might have been "
809
- " already closed." );
810
- build->intermediateStateCleared_ = true ;
807
+ !build->stateCleared_ ,
808
+ " Internal state for a peer is empty. It might have already "
809
+ " been closed." );
810
+ build->stateCleared_ = true ;
811
811
VELOX_CHECK_NOT_NULL (build->table_ );
812
812
otherTables.push_back (std::move (build->table_ ));
813
- buildSpiller = std::move (build->spiller_ );
813
+ spiller = std::move (build->spiller_ );
814
814
}
815
- if (buildSpiller != nullptr ) {
816
- buildSpiller->finishSpill (spillPartitions);
815
+ if (spiller != nullptr ) {
816
+ spiller->finishSpill (spillPartitions);
817
+ build->recordSpillStats (spiller.get ());
817
818
}
818
- build->recordSpillStats (buildSpiller.get ());
819
819
}
820
820
821
821
if (spiller_ != nullptr ) {
@@ -847,7 +847,7 @@ bool HashBuild::finishHashBuild() {
847
847
addRuntimeStats ();
848
848
if (joinBridge_->setHashTable (
849
849
std::move (table_), std::move (spillPartitions), joinHasNullKeys_)) {
850
- intermediateStateCleared_ = true ;
850
+ stateCleared_ = true ;
851
851
spillGroup_->restart ();
852
852
}
853
853
@@ -938,7 +938,7 @@ void HashBuild::setupSpillInput(HashJoinBridge::SpillInput spillInput) {
938
938
939
939
setupTable ();
940
940
setupSpiller (spillInput.spillPartition .get ());
941
- intermediateStateCleared_ = false ;
941
+ stateCleared_ = false ;
942
942
943
943
// Start to process spill input.
944
944
processSpillInput ();
@@ -1126,10 +1126,7 @@ void HashBuild::reclaim(
1126
1126
1127
1127
TestValue::adjust (" facebook::velox::exec::HashBuild::reclaim" , this );
1128
1128
1129
- // can another thread call close() while hashbuild is in arbitration and
1130
- // reclaim is called on it?
1131
1129
if (exceededMaxSpillLevelLimit_) {
1132
- // NOTE: we might have reached to the max spill limit.
1133
1130
return ;
1134
1131
}
1135
1132
@@ -1142,9 +1139,9 @@ void HashBuild::reclaim(
1142
1139
LOG (WARNING) << " Can't reclaim from hash build operator, state_["
1143
1140
<< stateName (state_) << " ], nonReclaimableSection_["
1144
1141
<< nonReclaimableSection_ << " ], spiller_["
1145
- << (intermediateStateCleared_ || spiller_-> finalized ()
1146
- ? " finalized"
1147
- : " non-finalized" )
1142
+ << (stateCleared_ ? " cleared "
1143
+ : (spiller_-> finalized () ? " finalized"
1144
+ : " non-finalized" ) )
1148
1145
<< " ] " << pool ()->name ()
1149
1146
<< " , usage: " << succinctBytes (pool ()->currentBytes ());
1150
1147
return ;
@@ -1227,13 +1224,12 @@ bool HashBuild::nonReclaimableState() const {
1227
1224
// 1) the hash table has been built by the last build thread (inidicated
1228
1225
// by state_)
1229
1226
// 2) the last build operator has transferred ownership of 'this' operator's
1230
- // intermediate state (table_ and spiller_) to itself
1227
+ // internal state (table_ and spiller_) to itself.
1231
1228
// 3) it has completed spilling before reaching either of the previous
1232
1229
// two states.
1233
1230
return ((state_ != State::kRunning ) && (state_ != State::kWaitForBuild ) &&
1234
1231
(state_ != State::kYield )) ||
1235
- nonReclaimableSection_ || intermediateStateCleared_ ||
1236
- spiller_->finalized ();
1232
+ nonReclaimableSection_ || !spiller_ || spiller_->finalized ();
1237
1233
}
1238
1234
1239
1235
void HashBuild::close () {
@@ -1242,8 +1238,8 @@ void HashBuild::close() {
1242
1238
{
1243
1239
// Free up major memory usage. Gate access to them as they can be accessed
1244
1240
// by the last build thread that finishes building the hash table.
1245
- std::lock_guard<std::mutex> l (intermediateStateMutex_ );
1246
- intermediateStateCleared_ = true ;
1241
+ std::lock_guard<std::mutex> l (mutex_ );
1242
+ stateCleared_ = true ;
1247
1243
joinBridge_.reset ();
1248
1244
spiller_.reset ();
1249
1245
table_.reset ();
0 commit comments