-
Notifications
You must be signed in to change notification settings - Fork 2
5 锁机制
[TOC]
在任何 C++ 协程库的使用中,都应该慎重使用或禁用线程锁,比如下面的代码
std::mutex mtx; void foo() { std::unique_lock<std::mutex> lock(mtx); // switch coroutine by sleep sleep(1); } int main() { go foo; // 协程A go foo; // 协程B co_sched.RunLoop(); }协程 A 首先被调度,加锁后调用 sleep 导致当前协程挂起,注意此时 mtx 已然是被锁定的。
然后协程 B 被调度,要等待 mtx 被解锁才能继续执行下去,由于 mtx 是线程锁,会阻塞调度线程,协程 A 再也不会有机会被调度,从而形成死锁。
这是一个典型的边角问题,因为我们无法阻止 C++ 程序员在使用协程库的同时再使用线程同步机制。
其实我们可以提供一个协程锁来解决这一问题,比如下面的代码
co_mutex mtx;
void foo() {
std::unique_lock<co_mutex> lock(mtx);
// switch coroutine by sleep
sleep(1);
}
int main() {
go foo; // 协程A
go foo; // 协程B
co_sched.RunLoop();
}
代码与前一个例子几乎一样,唯一的区别是 mtx 的锁类型从线程锁变成了 libgo 提供的协程锁。
协程 A 首先被调度,加锁后调用 sleep 导致当前协程挂起,注意此时 mtx 已然是被锁定的。
然后协程 B 被调度,要等待 mtx 被解锁才能继续执行下去,由于 mtx 是协程锁,协程锁在等待时会挂起当前协程而不是阻塞线程,协程 A 在 sleep 时间结束后会被唤醒并被调度,协程 A 退出 foo 函数时会解锁,解锁的行为又会唤醒协程 B,协程 B 被调度时再次锁定 mtx,然后顺利完成整个逻辑。
libgo 还提供了协程读写锁:co_rwmutex
另外,即便开发者有意识的规避第一个例子那样的场景,也很容易踩到另外一个线程锁导致的坑,比如在使用 zookeeper-client 这样会启动后台线程来 call 回调函数的第三方库时:
std::mutex mtx;
int g_state;
void zookeeper_callback(int state) {
std::unique_lock<std::mutex> lock(mtx);
// 修改g_state
g_state = state;
// 这里忘了立即解锁, 做了其他耗时的事情
}
void routine() {
std::unique_lock<std::mutex> lock(mtx);
// 取g_state的值
}
go routine;
看起来好像没什么问题,但其实 routine 里面的线程锁会阻塞整个调度线程,使得其他协程都无法被及时调度。
针对这种情况最优雅的处理方式就是使用 Channel,因为 libgo 提供的 Channel 不仅可以用于协程间交换数据,也可以用于协程与线程间交换数据,可以说是专门针对 zk 这类起后台线程的第三方库设计的。
co_chan<int> state_c(10);
void zookeeper_callback(int state) {
state_c << state;
}
void routine() {
// 取g_state的值
int state;
state_c >> state;
}
go routine;
下面以lock函数为例:
template <typename _Clock, typename _Duration>
bool lock(const std::chrono::time_point<_Clock, _Duration> * abstime)
{
if (try_lock()) { // 加锁成功,直接返回
return true;
}
int res = lock_contended(abstime);
return RutexBase::rutex_wait_return_success == res;
}
其中try_lock()
定义为
struct MutexInternal {
std::atomic<unsigned char> locked;
std::atomic<unsigned char> contended;
unsigned short padding;
};
bool try_lock()
{
MutexInternal* split = (MutexInternal*)rutex_.value();
return 0 == split->locked.exchange(1, std::memory_order_acquire);
// 如果原来的值是0,则说明未被加锁
// 否则说明已被加锁
// rutex_.value() {0,0,0} -> {1,0,0}
}
这里使用了std::atomic
,使用原子变量时需要规定内存的读写顺序
exchange
原子地替换原子对象的值并返回它先前持有的值,操作为读-修改-写操作。根据order
的值影响内存。因为编译器和CPU对程序指令的优化,导致代码逻辑顺序和实际指令执行顺序不一致。 因此,我们要用内存顺序来告诉编译器和CPU确保指令执行顺序和代码的逻辑顺序一致。
内存顺序 先后次序 语义 Acquire 读操作在前 读读、读写 Release 写操作在后 读写、写写 可以看出:要规约“写操作之前的写操作之间的顺序不能改变”(写写),得采用Release语义; 要规约“读操作之后的读操作,之间的顺序不能改变”(读读),得采用Acquire语义。
接下来,看加锁失败时的操作:
template<typename _Clock, typename _Duration>
inline int lock_contended(const std::chrono::time_point<_Clock, _Duration> * abstime) {
std::atomic<unsigned>* whole = rutex_.value();
while (whole->exchange(LIBGO_ROUTINE_SYNC_MUTEX_CONTENDED) & LIBGO_ROUTINE_SYNC_MUTEX_LOCKED) {
int res = rutex_.wait_until(LIBGO_ROUTINE_SYNC_MUTEX_CONTENDED, abstime);
if (RutexBase::rutex_wait_return_etimeout == res) {
return res;
}
}
return RutexBase::rutex_wait_return_success;
}
加锁失败时,使用一个死循环竞争锁。在rutex_.wait_until
会调用sleep放弃CPU。
whole->exchange(LIBGO_ROUTINE_SYNC_MUTEX_CONTENDED
时,rutex_.value() {1,0,0} -> {1,1,0}, 返回值是{1,0,0}
wait_until的代码如下:
template<typename _Clock, typename _Duration>
rutex_wait_return wait_until(IntValueType expectValue,
const std::chrono::time_point<_Clock, _Duration> * abstime)
{
// base_t::value() 调用基类函数获取当前值
if (base_t::value()->load(std::memory_order_relaxed) != expectValue) {
std::atomic_thread_fence(std::memory_order_acquire);
return rutex_wait_return_ewouldblock;
}
RoutineSwitcherI* switcher = &RoutineSyncPolicy::clsRef();
std::unique_ptr<RoutineSwitcherI> raii;
if (!switcher->valid()) {
// exit阶段, tls对象会较早被析构
// 此时如果处于pthread中, 可以临时new一个出来用
if (RoutineSyncPolicy::isInPThread()) {
switcher = new PThreadSwitcher;
raii.reset(switcher);
} else {
return rutex_wait_return_ewouldblock;
}
}
RutexWaiter rw(*switcher);
{
std::unique_lock<std::mutex> lock(mtx_);
if (base_t::value()->load(std::memory_order_relaxed) != expectValue) {
return rutex_wait_return_ewouldblock;
}
int state = rw.state();
switch (state) {
case RutexWaiter::waiter_state_interrupted:
return rutex_wait_return_eintr;
case RutexWaiter::waiter_state_ready:
return rutex_wait_return_success;
}
assert(!rw.safe_unlink());
rw.mark(); // 协程切换状态为suspend
waiters_.push(&rw); // 放入等待队列
rw.owner_.store(this, std::memory_order_relaxed);
}
rw.sleep(abstime); // 加入timer队列,切换上下文,放弃CPU
rw.join();
int state = rw.state();
switch (state) {
case RutexWaiter::waiter_state_none:
case RutexWaiter::waiter_state_interrupted:
return rutex_wait_return_eintr;
case RutexWaiter::waiter_state_timeout:
return rutex_wait_return_etimeout;
}
return rutex_wait_return_success;
}
一个Mutex对象拥有一个Rutex<unsigned> rutex_
对象,加锁解锁竞争锁的过程实际上也是对该对象的操作。
Rutex
继承了3个类,那么它也就拥有了父类的成员,拥有的成员变量包括:
struct Rutex : public IntValue<IntValueType, Reference>, public RutexBase, public DebuggerId<RutexBase>
{
protected:
friend struct RutexWaiter;
LinkedList waiters_;
std::mutex mtx_;
std::atomic<IntValueType> value_ {0};
private:
long id_;
}
接下来我们假设共有3个协程,分别是A,B,C,按照字母顺序依次请求锁
bool try_lock()
{
MutexInternal* split = (MutexInternal*)rutex_.value();
return 0 == split->locked.exchange(1, std::memory_order_acquire);
}
bool lock(const std::chrono::time_point<_Clock, _Duration> * abstime)
{
if (try_lock()) { // 加锁成功
return true;
}
int res = lock_contended(abstime);
return RutexBase::rutex_wait_return_success == res;
}
这种情况是第一个协程尝试进行加锁,因为锁还未使用,因此可以成功加锁,对应代码中的if部分,
此时,value_
的值由{0,0,0}
变为{1,0,0}
这种情况是一个协程还未解锁,另一个协程尝试加锁,导致加锁失败,对应代码中的lock_contended(abstime)
部分,
inline int lock_contended(const std::chrono::time_point<_Clock, _Duration> * abstime) {
std::atomic<unsigned>* whole = rutex_.value();
while (whole->exchange(LIBGO_ROUTINE_SYNC_MUTEX_CONTENDED) & LIBGO_ROUTINE_SYNC_MUTEX_LOCKED) {
int res = rutex_.wait_until(LIBGO_ROUTINE_SYNC_MUTEX_CONTENDED, abstime);
if (RutexBase::rutex_wait_return_etimeout == res) {
return res;
}
}
return RutexBase::rutex_wait_return_success;
}
第二行value_
的值为{1,0,0}
第三行value_
的值由{1,0,0} -> {1,1,0}
, 同时whole->exchange
的返回值为{1,0,0}
接下来,调用wait_until
进行锁等待操作,下面的代码只保留了关键部分
rutex_wait_return wait_until(IntValueType expectValue,
const std::chrono::time_point<_Clock, _Duration> * abstime)
{
RutexWaiter rw(*switcher);
{
std::unique_lock<std::mutex> lock(mtx_);
if (base_t::value()->load(std::memory_order_relaxed) != expectValue) {
return rutex_wait_return_ewouldblock;
}
int state = rw.state(); // 0
switch (state) {
case RutexWaiter::waiter_state_interrupted: // 2
return rutex_wait_return_eintr;
case RutexWaiter::waiter_state_ready: // 1
return rutex_wait_return_success;
}
assert(!rw.safe_unlink());
rw.mark(); // 协程切换状态为suspend
waiters_.push(&rw); // 放入等待队列
rw.owner_.store(this, std::memory_order_relaxed); // 原子操作, owner_ = this
}
rw.sleep(abstime); // 1. abstime > 0, 加入timer, 放弃CPU
// 2. abstime == 0, 放弃CPU
//---------唤醒后----------
rw.join();
int state = rw.state();
switch (state) {
case RutexWaiter::waiter_state_none: // 0
case RutexWaiter::waiter_state_interrupted: // 2
return rutex_wait_return_eintr;
case RutexWaiter::waiter_state_timeout: // 3
return rutex_wait_return_etimeout;
}
return rutex_wait_return_success;
}
第二行value_
的值为{1,1,0}
第三行value_
的值由{1,1,0} -> {1,1,0}
, 同时whole->exchange
的返回值为{1,1,0}
void unlock()
{
std::atomic<unsigned>* whole = (std::atomic<unsigned>*)rutex_.value();
const unsigned prev = whole->exchange(0, std::memory_order_release);
if (prev == LIBGO_ROUTINE_SYNC_MUTEX_LOCKED) {
return ;
}
int res = rutex_.notify_one();
(void)res;
}
第三行,解锁时whole
有两种情况,{1,0,0}
和{1,1,0}
第四行,设置value_
的值为{0,0,0}
,并返回原来的值
如果是{1,0,0}
则表明等待队列中没有其他协程在等待该锁的释放,直接返回
如果是{1,1,0}
则表明有其他协程等待,则唤醒一个等待的协程。
notify_one
虽然名字是唤醒一个,==但实际上是对等待队列中的全部元素依次遍历==
int notify_one()
{
for (;;) {
std::unique_lock<std::mutex> lock(mtx_); // 加线程锁
RutexWaiter *rw = static_cast<RutexWaiter *>(waiters_.front()); // 取出等待队列中的第一个元素
if (!rw) {
return 0;
}
// 先lock,后unlink, 和join里面的顺序形成ABBA, 确保join可以等到这个函数结束
std::unique_lock<std::mutex> lock2(rw->wakeMtx_, std::defer_lock);
bool isLock = lock2.try_lock();
waiters_.unlink(rw);
rw->owner_.store(nullptr, std::memory_order_relaxed); // owner_ = null
if (!isLock) {
continue;
}
lock.unlock();
if (rw->wake()) {
return 1;
} else {
RS_DBG(dbg_rutex, "rutex=%ld | %s | rw=%ld wake failed",
id(), __func__, rw->id());
}
}
}
bool wake(int state = waiter_state_ready) {
if (waked_.load(std::memory_order_relaxed)) // if waked_ == 1, 表示已经唤醒过了
return false;
if (!switcher_->wake()) {
return false;
}
state_ = state; // state = 1
waked_.store(true, std::memory_order_relaxed); // waked_ = 1
return true;
}