Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LogManager::disk_thread run closures in bthread after the log manager shutdown, to avoid deadlock #242

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion src/braft/log_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ LogManager::LogManager()
: _log_storage(NULL)
, _config_manager(NULL)
, _stopped(false)
, _shutdown(false)
, _has_error(false)
, _next_wait_id(0)
, _first_log_index(0)
Expand Down Expand Up @@ -245,6 +246,13 @@ class ResetClosure : public LogManager::StableClosure {
int64_t _next_log_index;
};

class ShutdownClosure : public LogManager::StableClosure {
public:
void Run() {
delete this;
}
};

int LogManager::truncate_prefix(const int64_t first_index_kept,
std::unique_lock<raft_mutex_t>& lck) {
std::deque<LogEntry*> saved_logs_in_memory;
Expand Down Expand Up @@ -505,7 +513,18 @@ class AppendBatcher {
EIO, "Corrupted LogStorage");
}
_storage[i]->update_metric(&metric);
_storage[i]->Run();
if (BAIDU_UNLIKELY(_lm->_shutdown)) {
// Run closure in bthread after the log manager shutdown,
// to avoid a corner case. In this case, the closure holds the
// last reference of NodeImpl, and done->Run() triggers the
// the destructor of NodeImpl. In the destructor, NodeImpl
// joins the disk thread to terminate, but the done->Run()
// itself is excuted in the disk thread. The disk thread can
// never be terminated!
run_closure_in_bthread(_storage[i]);
} else {
_storage[i]->Run();
}
}
_to_append.clear();
}
Expand Down Expand Up @@ -600,6 +619,11 @@ int LogManager::disk_thread(void* meta,
ret = log_manager->_log_storage->reset(rc->next_log_index());
break;
}
ShutdownClosure* sc = dynamic_cast<ShutdownClosure*>(done);
if (sc) {
ret = log_manager->_shutdown = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里应该不需要 ret 返回值

break;
}
} while (0);

if (ret != 0) {
Expand Down Expand Up @@ -816,6 +840,7 @@ void LogManager::set_applied_id(const LogId& applied_id) {
void LogManager::shutdown() {
std::unique_lock<raft_mutex_t> lck(_mutex);
_stopped = true;
CHECK_EQ(0, bthread::execution_queue_execute(_disk_queue, new ShutdownClosure));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的fix是不是还是有问题:
如果在 shutdown 之前有日志添加进来,在 shudown 之后没有新的日志添加进来。
那么 shutdown 之前的日志(最后一条),依然有可能在 disk_thread 里面执行,导致 自己 join 自己

wakeup_all_waiter(lck);
}

Expand Down
1 change: 1 addition & 0 deletions src/braft/log_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ friend class AppendBatcher;
raft_mutex_t _mutex;
butil::FlatMap<int64_t, WaitMeta*> _wait_map;
bool _stopped;
bool _shutdown;
butil::atomic<bool> _has_error;
WaitId _next_wait_id;

Expand Down