@@ -423,11 +423,12 @@ PartitionedOutputBufferManager::getInstance(const std::string& host) {
423
423
424
424
std::shared_ptr<PartitionedOutputBuffer>
425
425
PartitionedOutputBufferManager::getBuffer (const std::string& taskId) {
426
- std::lock_guard<std::mutex> l (mutex_);
427
- auto it = buffers_.find (taskId);
428
- VELOX_CHECK (
429
- it != buffers_.end (), " Output buffers for task not found: {}" , taskId);
430
- return it->second ;
426
+ return buffers_.withLock ([&](auto & buffers) {
427
+ auto it = buffers.find (taskId);
428
+ VELOX_CHECK (
429
+ it != buffers.end (), " Output buffers for task not found: {}" , taskId);
430
+ return it->second ;
431
+ });
431
432
}
432
433
433
434
BlockingReason PartitionedOutputBufferManager::enqueue (
@@ -450,35 +451,34 @@ void PartitionedOutputBufferManager::acknowledge(
450
451
const std::string& taskId,
451
452
int destination,
452
453
int64_t sequence) {
453
- std::shared_ptr<PartitionedOutputBuffer> buffer;
454
- {
455
- std::lock_guard<std::mutex> l (mutex_);
456
- auto it = buffers_.find (taskId);
457
- if (it == buffers_.end ()) {
458
- VLOG (1 ) << " Receiving ack for non-existent task " << taskId
459
- << " destination " << destination << " sequence " << sequence;
460
- return ;
461
- }
462
- buffer = it->second ;
454
+ auto buffer = buffers_.withLock (
455
+ [&](auto & buffers) -> std::shared_ptr<PartitionedOutputBuffer> {
456
+ auto it = buffers.find (taskId);
457
+ if (it == buffers.end ()) {
458
+ VLOG (1 ) << " Receiving ack for non-existent task " << taskId
459
+ << " destination " << destination << " sequence " << sequence;
460
+ return nullptr ;
461
+ }
462
+ return it->second ;
463
+ });
464
+ if (buffer) {
465
+ buffer->acknowledge (destination, sequence);
463
466
}
464
- buffer->acknowledge (destination, sequence);
465
467
}
466
468
467
469
void PartitionedOutputBufferManager::deleteResults (
468
470
const std::string& taskId,
469
471
int destination) {
470
- std::shared_ptr<PartitionedOutputBuffer> buffer;
471
- {
472
- std::lock_guard<std::mutex> l (mutex_);
473
- auto it = buffers_.find (taskId);
474
- if (it == buffers_.end ()) {
475
- return ;
476
- }
477
- buffer = it->second ;
478
- }
479
- if (buffer->deleteResults (destination)) {
480
- std::lock_guard<std::mutex> l (mutex_);
481
- buffers_.erase (taskId);
472
+ auto buffer = buffers_.withLock (
473
+ [&](auto & buffers) -> std::shared_ptr<PartitionedOutputBuffer> {
474
+ auto it = buffers.find (taskId);
475
+ if (it == buffers.end ()) {
476
+ return nullptr ;
477
+ }
478
+ return it->second ;
479
+ });
480
+ if (buffer && buffer->deleteResults (destination)) {
481
+ buffers_.withLock ([&](auto & buffers) { buffers.erase (taskId); });
482
482
}
483
483
}
484
484
@@ -498,15 +498,16 @@ void PartitionedOutputBufferManager::initializeTask(
498
498
int numDrivers) {
499
499
const auto & taskId = task->taskId ();
500
500
501
- std::lock_guard<std::mutex> l (mutex_);
502
- auto it = buffers_.find (taskId);
503
- if (it == buffers_.end ()) {
504
- buffers_[taskId] = std::make_shared<PartitionedOutputBuffer>(
505
- std::move (task), broadcast, numDestinations, numDrivers);
506
- } else {
507
- VELOX_FAIL (
508
- " Registering an output buffer for pre-existing taskId {}" , taskId);
509
- }
501
+ buffers_.withLock ([&](auto & buffers) {
502
+ auto it = buffers.find (taskId);
503
+ if (it == buffers.end ()) {
504
+ buffers[taskId] = std::make_shared<PartitionedOutputBuffer>(
505
+ std::move (task), broadcast, numDestinations, numDrivers);
506
+ } else {
507
+ VELOX_FAIL (
508
+ " Registering an output buffer for pre-existing taskId {}" , taskId);
509
+ }
510
+ });
510
511
}
511
512
512
513
void PartitionedOutputBufferManager::updateBroadcastOutputBuffers (
@@ -517,28 +518,32 @@ void PartitionedOutputBufferManager::updateBroadcastOutputBuffers(
517
518
}
518
519
519
520
void PartitionedOutputBufferManager::removeTask (const std::string& taskId) {
520
- std::shared_ptr<PartitionedOutputBuffer> buffer;
521
- {
522
- std::lock_guard<std::mutex> l (mutex_);
523
- auto iter = buffers_.find (taskId);
524
- if (iter == buffers_.end ()) {
525
- // Already removed.
526
- return ;
527
- }
528
- buffer = iter->second ;
529
- buffers_.erase (taskId);
521
+ auto buffer = buffers_.withLock (
522
+ [&](auto & buffers) -> std::shared_ptr<PartitionedOutputBuffer> {
523
+ auto it = buffers.find (taskId);
524
+ if (it == buffers.end ()) {
525
+ // Already removed.
526
+ return nullptr ;
527
+ }
528
+ auto taskBuffer = it->second ;
529
+ buffers.erase (taskId);
530
+ return taskBuffer;
531
+ });
532
+ if (buffer) {
533
+ buffer->terminate ();
530
534
}
531
- buffer->terminate ();
532
535
}
533
536
534
537
std::string PartitionedOutputBufferManager::toString () {
535
- std::stringstream out;
536
- out << " [BufferManager:" << std::endl;
537
- for (auto & pair : buffers_) {
538
- out << pair.first << " : " << pair.second ->toString () << std::endl;
539
- }
540
- out << " ]" ;
541
- return out.str ();
538
+ return buffers_.withLock ([](const auto & buffers) {
539
+ std::stringstream out;
540
+ out << " [BufferManager:" << std::endl;
541
+ for (const auto & pair : buffers) {
542
+ out << pair.first << " : " << pair.second ->toString () << std::endl;
543
+ }
544
+ out << " ]" ;
545
+ return out.str ();
546
+ });
542
547
}
543
548
544
549
} // namespace facebook::velox::exec
0 commit comments