@@ -647,7 +647,9 @@ RowVectorPtr Task::next(ContinueFuture* future) {
647
647
}
648
648
649
649
VELOX_CHECK_EQ (
650
- state_, TaskState::kRunning , " Task has already finished processing." );
650
+ static_cast <int >(state_),
651
+ static_cast <int >(kRunning ),
652
+ " Task has already finished processing." );
651
653
652
654
// On first call, create the drivers.
653
655
if (driverFactories_.empty ()) {
@@ -682,11 +684,6 @@ RowVectorPtr Task::next(ContinueFuture* future) {
682
684
}
683
685
684
686
drivers_ = std::move (drivers);
685
- driverBlockingStates_.reserve (drivers_.size ());
686
- for (auto i = 0 ; i < drivers_.size (); ++i) {
687
- driverBlockingStates_.emplace_back (
688
- std::make_unique<DriverBlockingState>(drivers_[i].get ()));
689
- }
690
687
}
691
688
692
689
// Run drivers one at a time. If a driver blocks, continue running the other
@@ -701,10 +698,7 @@ RowVectorPtr Task::next(ContinueFuture* future) {
701
698
int runnableDrivers = 0 ;
702
699
int blockedDrivers = 0 ;
703
700
for (auto i = 0 ; i < numDrivers; ++i) {
704
- // Holds a reference to driver for access as async task terminate might
705
- // remove drivers from 'drivers_' slot.
706
- auto driver = getDriver (i);
707
- if (driver == nullptr ) {
701
+ if (drivers_[i] == nullptr ) {
708
702
// This driver has finished processing.
709
703
continue ;
710
704
}
@@ -715,25 +709,16 @@ RowVectorPtr Task::next(ContinueFuture* future) {
715
709
continue ;
716
710
}
717
711
718
- ContinueFuture blockFuture = ContinueFuture::makeEmpty ();
719
- if (driverBlockingStates_[i]->blocked (&blockFuture)) {
720
- VELOX_CHECK (blockFuture.valid ());
721
- futures[i] = std::move (blockFuture);
722
- // This driver is still blocked.
723
- ++blockedDrivers;
724
- continue ;
725
- }
726
712
++runnableDrivers;
727
713
728
714
ContinueFuture driverFuture = ContinueFuture::makeEmpty ();
729
- auto result = driver->next (&driverFuture);
730
- if (result != nullptr ) {
731
- VELOX_CHECK (!driverFuture.valid ());
715
+ auto result = drivers_[i]->next (&driverFuture);
716
+ if (result) {
732
717
return result;
733
718
}
734
719
735
720
if (driverFuture.valid ()) {
736
- driverBlockingStates_ [i]-> setDriverFuture (driverFuture);
721
+ futures [i] = std::move (driverFuture);
737
722
}
738
723
739
724
if (error ()) {
@@ -743,7 +728,7 @@ RowVectorPtr Task::next(ContinueFuture* future) {
743
728
744
729
if (runnableDrivers == 0 ) {
745
730
if (blockedDrivers > 0 ) {
746
- if (future == nullptr ) {
731
+ if (! future) {
747
732
VELOX_FAIL (
748
733
" Cannot make progress as all remaining drivers are blocked and user are not expected to wait." );
749
734
} else {
@@ -753,20 +738,14 @@ RowVectorPtr Task::next(ContinueFuture* future) {
753
738
notReadyFutures.emplace_back (std::move (continueFuture));
754
739
}
755
740
}
756
- *future = folly::collectAny (std::move (notReadyFutures)).unit ();
741
+ *future = folly::collectAll (std::move (notReadyFutures)).unit ();
757
742
}
758
743
}
759
744
return nullptr ;
760
745
}
761
746
}
762
747
}
763
748
764
- std::shared_ptr<Driver> Task::getDriver (uint32_t driverId) const {
765
- VELOX_CHECK_LT (driverId, drivers_.size ());
766
- std::unique_lock<std::timed_mutex> l (mutex_);
767
- return drivers_[driverId];
768
- }
769
-
770
749
void Task::start (uint32_t maxDrivers, uint32_t concurrentSplitGroups) {
771
750
facebook::velox::process::ThreadDebugInfo threadDebugInfo{
772
751
queryCtx ()->queryId (), taskId_, nullptr };
@@ -1501,7 +1480,7 @@ void Task::noMoreSplits(const core::PlanNodeId& planNodeId) {
1501
1480
}
1502
1481
1503
1482
if (allFinished) {
1504
- terminate (TaskState:: kFinished );
1483
+ terminate (kFinished );
1505
1484
}
1506
1485
}
1507
1486
@@ -3123,68 +3102,4 @@ void Task::MemoryReclaimer::abort(
3123
3102
<< " Timeout waiting for task to complete during query memory aborting." ;
3124
3103
}
3125
3104
}
3126
-
3127
- void Task::DriverBlockingState::setDriverFuture (ContinueFuture& driverFuture) {
3128
- VELOX_CHECK (!blocked_);
3129
- {
3130
- std::lock_guard<std::mutex> l (mutex_);
3131
- VELOX_CHECK (promises_.empty ());
3132
- VELOX_CHECK_NULL (error_);
3133
- blocked_ = true ;
3134
- }
3135
- std::move (driverFuture)
3136
- .via (&folly::InlineExecutor::instance ())
3137
- .thenValue (
3138
- [&, driverHolder = driver_->shared_from_this ()](auto && /* unused */ ) {
3139
- std::vector<std::unique_ptr<ContinuePromise>> promises;
3140
- {
3141
- std::lock_guard<std::mutex> l (mutex_);
3142
- VELOX_CHECK (blocked_);
3143
- VELOX_CHECK_NULL (error_);
3144
- promises = std::move (promises_);
3145
- blocked_ = false ;
3146
- }
3147
- for (auto & promise : promises) {
3148
- promise->setValue ();
3149
- }
3150
- })
3151
- .thenError (
3152
- folly::tag_t <std::exception >{},
3153
- [&, driverHolder = driver_->shared_from_this ()](
3154
- std::exception const & e) {
3155
- std::lock_guard<std::mutex> l (mutex_);
3156
- VELOX_CHECK (blocked_);
3157
- VELOX_CHECK_NULL (error_);
3158
- try {
3159
- VELOX_FAIL (
3160
- " A driver future from task {} was realized with error: {}" ,
3161
- driver_->task ()->taskId (),
3162
- e.what ());
3163
- } catch (const VeloxException&) {
3164
- error_ = std::current_exception ();
3165
- }
3166
- blocked_ = false ;
3167
- });
3168
- }
3169
-
3170
- bool Task::DriverBlockingState::blocked (ContinueFuture* future) {
3171
- VELOX_CHECK_NOT_NULL (future);
3172
- std::lock_guard<std::mutex> l (mutex_);
3173
- if (error_ != nullptr ) {
3174
- std::rethrow_exception (error_);
3175
- }
3176
- if (!blocked_) {
3177
- VELOX_CHECK (promises_.empty ());
3178
- return false ;
3179
- }
3180
- auto [blockPromise, blockFuture] =
3181
- makeVeloxContinuePromiseContract (fmt::format (
3182
- " DriverBlockingState {} from task {}" ,
3183
- driver_->driverCtx ()->driverId ,
3184
- driver_->task ()->taskId ()));
3185
- *future = std::move (blockFuture);
3186
- promises_.emplace_back (
3187
- std::make_unique<ContinuePromise>(std::move (blockPromise)));
3188
- return true ;
3189
- }
3190
3105
} // namespace facebook::velox::exec
0 commit comments