diff --git a/src/braft/log_manager.cpp b/src/braft/log_manager.cpp index 6c5aa275..6425491b 100644 --- a/src/braft/log_manager.cpp +++ b/src/braft/log_manager.cpp @@ -64,6 +64,7 @@ LogManager::LogManager() : _log_storage(NULL) , _config_manager(NULL) , _stopped(false) + , _shutdown(false) , _has_error(false) , _next_wait_id(0) , _first_log_index(0) @@ -245,6 +246,13 @@ class ResetClosure : public LogManager::StableClosure { int64_t _next_log_index; }; +class ShutdownClosure : public LogManager::StableClosure { +public: + void Run() { + delete this; + } +}; + int LogManager::truncate_prefix(const int64_t first_index_kept, std::unique_lock& lck) { std::deque saved_logs_in_memory; @@ -505,7 +513,18 @@ class AppendBatcher { EIO, "Corrupted LogStorage"); } _storage[i]->update_metric(&metric); - _storage[i]->Run(); + if (BAIDU_UNLIKELY(_lm->_shutdown)) { + // Run closure in bthread after the log manager shutdown, + // to avoid a corner case. In this case, the closure holds the + // last reference of NodeImpl, and done->Run() triggers the + // the destructor of NodeImpl. In the destructor, NodeImpl + // joins the disk thread to terminate, but the done->Run() + // itself is excuted in the disk thread. The disk thread can + // never be terminated! + run_closure_in_bthread(_storage[i]); + } else { + _storage[i]->Run(); + } } _to_append.clear(); } @@ -600,6 +619,11 @@ int LogManager::disk_thread(void* meta, ret = log_manager->_log_storage->reset(rc->next_log_index()); break; } + ShutdownClosure* sc = dynamic_cast(done); + if (sc) { + ret = log_manager->_shutdown = true; + break; + } } while (0); if (ret != 0) { @@ -816,6 +840,7 @@ void LogManager::set_applied_id(const LogId& applied_id) { void LogManager::shutdown() { std::unique_lock lck(_mutex); _stopped = true; + CHECK_EQ(0, bthread::execution_queue_execute(_disk_queue, new ShutdownClosure)); wakeup_all_waiter(lck); } diff --git a/src/braft/log_manager.h b/src/braft/log_manager.h index fcd52dc3..273073da 100644 --- a/src/braft/log_manager.h +++ b/src/braft/log_manager.h @@ -207,6 +207,7 @@ friend class AppendBatcher; raft_mutex_t _mutex; butil::FlatMap _wait_map; bool _stopped; + bool _shutdown; butil::atomic _has_error; WaitId _next_wait_id;