Skip to content

5 锁机制

bruceEeZhao edited this page Apr 22, 2023 · 1 revision

[TOC]

概述

Why c++ coroutine?Why libgo?

在任何 C++ 协程库的使用中,都应该慎重使用或禁用线程锁,比如下面的代码

std::mutex mtx;

void foo() {
    std::unique_lock<std::mutex> lock(mtx);
    // switch coroutine by sleep
    sleep(1);
}

int main() {
    go foo;  // 协程A
    go foo;  // 协程B
    co_sched.RunLoop();
}

协程 A 首先被调度,加锁后调用 sleep 导致当前协程挂起,注意此时 mtx 已然是被锁定的。

然后协程 B 被调度,要等待 mtx 被解锁才能继续执行下去,由于 mtx 是线程锁,会阻塞调度线程,协程 A 再也不会有机会被调度,从而形成死锁。

这是一个典型的边角问题,因为我们无法阻止 C++ 程序员在使用协程库的同时再使用线程同步机制。

协程锁

其实我们可以提供一个协程锁来解决这一问题,比如下面的代码

co_mutex mtx;

void foo() {
    std::unique_lock<co_mutex> lock(mtx);
    // switch coroutine by sleep
    sleep(1);
}

int main() {
    go foo;  // 协程A
    go foo;  // 协程B
    co_sched.RunLoop();
}

代码与前一个例子几乎一样,唯一的区别是 mtx 的锁类型从线程锁变成了 libgo 提供的协程锁。

协程 A 首先被调度,加锁后调用 sleep 导致当前协程挂起,注意此时 mtx 已然是被锁定的。

然后协程 B 被调度,要等待 mtx 被解锁才能继续执行下去,由于 mtx 是协程锁,协程锁在等待时会挂起当前协程而不是阻塞线程,协程 A 在 sleep 时间结束后会被唤醒并被调度,协程 A 退出 foo 函数时会解锁,解锁的行为又会唤醒协程 B,协程 B 被调度时再次锁定 mtx,然后顺利完成整个逻辑。

协程读写锁

libgo 还提供了协程读写锁:co_rwmutex

其他

另外,即便开发者有意识的规避第一个例子那样的场景,也很容易踩到另外一个线程锁导致的坑,比如在使用 zookeeper-client 这样会启动后台线程来 call 回调函数的第三方库时:

std::mutex mtx;
int g_state;

void zookeeper_callback(int state) {
    std::unique_lock<std::mutex> lock(mtx);
    // 修改g_state
    g_state = state;
    
    // 这里忘了立即解锁, 做了其他耗时的事情
}

void routine() {
    std::unique_lock<std::mutex> lock(mtx);
    // 取g_state的值
}

go routine;

看起来好像没什么问题,但其实 routine 里面的线程锁会阻塞整个调度线程,使得其他协程都无法被及时调度。

针对这种情况最优雅的处理方式就是使用 Channel,因为 libgo 提供的 Channel 不仅可以用于协程间交换数据,也可以用于协程与线程间交换数据,可以说是专门针对 zk 这类起后台线程的第三方库设计的。

co_chan<int> state_c(10);

void zookeeper_callback(int state) {
    state_c << state;
}

void routine() {
    // 取g_state的值
    int state;
    state_c >> state;
}

go routine;

详见6-channel

协程锁

类图

image-20230131095749249

如何实现只锁定线程

下面以lock函数为例:

template <typename _Clock, typename _Duration>
bool lock(const std::chrono::time_point<_Clock, _Duration> * abstime)
{
    if (try_lock()) { // 加锁成功,直接返回
        return true;
    }

    int res = lock_contended(abstime);
    return RutexBase::rutex_wait_return_success == res;
}

其中try_lock()定义为

struct MutexInternal {
    std::atomic<unsigned char> locked;
    std::atomic<unsigned char> contended;
    unsigned short padding;
};

bool try_lock()
{
    MutexInternal* split = (MutexInternal*)rutex_.value();
    return 0 == split->locked.exchange(1, std::memory_order_acquire);
    // 如果原来的值是0,则说明未被加锁
    // 否则说明已被加锁
    // rutex_.value() {0,0,0} -> {1,0,0}
}

这里使用了std::atomic,使用原子变量时需要规定内存的读写顺序

内存顺序(Memory Order)问题(二)

exchange原子地替换原子对象的值并返回它先前持有的值,操作为读-修改-写操作。根据 order 的值影响内存。

因为编译器和CPU对程序指令的优化,导致代码逻辑顺序和实际指令执行顺序不一致。 因此,我们要用内存顺序来告诉编译器和CPU确保指令执行顺序和代码的逻辑顺序一致。

内存顺序 先后次序 语义
Acquire 读操作在前 读读、读写
Release 写操作在后 读写、写写

可以看出:要规约“写操作之前的写操作之间的顺序不能改变”(写写),得采用Release语义; 要规约“读操作之后的读操作,之间的顺序不能改变”(读读),得采用Acquire语义。

接下来,看加锁失败时的操作:

template<typename _Clock, typename _Duration>
inline int lock_contended(const std::chrono::time_point<_Clock, _Duration> * abstime) {
    std::atomic<unsigned>* whole = rutex_.value();
    while (whole->exchange(LIBGO_ROUTINE_SYNC_MUTEX_CONTENDED) & LIBGO_ROUTINE_SYNC_MUTEX_LOCKED) {
        int res = rutex_.wait_until(LIBGO_ROUTINE_SYNC_MUTEX_CONTENDED, abstime);

        if (RutexBase::rutex_wait_return_etimeout == res) {
            return res;
        }
    }
    return RutexBase::rutex_wait_return_success;
}

加锁失败时,使用一个死循环竞争锁。在rutex_.wait_until会调用sleep放弃CPU。

whole->exchange(LIBGO_ROUTINE_SYNC_MUTEX_CONTENDED 时,rutex_.value() {1,0,0} -> {1,1,0}, 返回值是{1,0,0}

wait_until的代码如下:

template<typename _Clock, typename _Duration>
rutex_wait_return wait_until(IntValueType expectValue,
                             const std::chrono::time_point<_Clock, _Duration> * abstime)
{
    // base_t::value() 调用基类函数获取当前值
    if (base_t::value()->load(std::memory_order_relaxed) != expectValue) {
        std::atomic_thread_fence(std::memory_order_acquire);
        return rutex_wait_return_ewouldblock;
    }

    RoutineSwitcherI* switcher = &RoutineSyncPolicy::clsRef();
    std::unique_ptr<RoutineSwitcherI> raii;
    if (!switcher->valid()) {
        // exit阶段, tls对象会较早被析构
        // 此时如果处于pthread中, 可以临时new一个出来用
        if (RoutineSyncPolicy::isInPThread()) {
            switcher = new PThreadSwitcher;
            raii.reset(switcher);
        } else {

            return rutex_wait_return_ewouldblock;
        }
    }

    RutexWaiter rw(*switcher);

    {
        std::unique_lock<std::mutex> lock(mtx_);
        if (base_t::value()->load(std::memory_order_relaxed) != expectValue) {
            return rutex_wait_return_ewouldblock;
        }

        int state = rw.state();
        switch (state) {
            case RutexWaiter::waiter_state_interrupted:
                return rutex_wait_return_eintr;

            case RutexWaiter::waiter_state_ready:
                return rutex_wait_return_success;
        }

        assert(!rw.safe_unlink());
        rw.mark();            // 协程切换状态为suspend
        waiters_.push(&rw);   // 放入等待队列
        rw.owner_.store(this, std::memory_order_relaxed);
    }

    rw.sleep(abstime);   // 加入timer队列,切换上下文,放弃CPU
    rw.join();

    int state = rw.state();
    switch (state) {
        case RutexWaiter::waiter_state_none:
        case RutexWaiter::waiter_state_interrupted:
            return rutex_wait_return_eintr;

        case RutexWaiter::waiter_state_timeout:
            return rutex_wait_return_etimeout;
    }
    return rutex_wait_return_success;
}

加锁、解锁、竞争锁时变量的变化

一个Mutex对象拥有一个Rutex<unsigned> rutex_对象,加锁解锁竞争锁的过程实际上也是对该对象的操作。

Rutex继承了3个类,那么它也就拥有了父类的成员,拥有的成员变量包括:

struct Rutex : public IntValue<IntValueType, Reference>, public RutexBase, public DebuggerId<RutexBase>
{
protected:
    friend struct RutexWaiter;
    LinkedList waiters_;
    std::mutex mtx_;
    std::atomic<IntValueType> value_ {0};
private:
    long id_;
}

接下来我们假设共有3个协程,分别是A,B,C,按照字母顺序依次请求锁

加锁

bool try_lock()
{
    MutexInternal* split = (MutexInternal*)rutex_.value();
    return 0 == split->locked.exchange(1, std::memory_order_acquire);
}

bool lock(const std::chrono::time_point<_Clock, _Duration> * abstime)
{
    if (try_lock()) { // 加锁成功
        return true;
    }

    int res = lock_contended(abstime);
    return RutexBase::rutex_wait_return_success == res;
}

A尝试加锁成功

这种情况是第一个协程尝试进行加锁,因为锁还未使用,因此可以成功加锁,对应代码中的if部分,

此时,value_的值由{0,0,0}变为{1,0,0}

B尝试加锁失败

这种情况是一个协程还未解锁,另一个协程尝试加锁,导致加锁失败,对应代码中的lock_contended(abstime)部分,

inline int lock_contended(const std::chrono::time_point<_Clock, _Duration> * abstime) {
    std::atomic<unsigned>* whole = rutex_.value();
    while (whole->exchange(LIBGO_ROUTINE_SYNC_MUTEX_CONTENDED) & LIBGO_ROUTINE_SYNC_MUTEX_LOCKED) {
        int res = rutex_.wait_until(LIBGO_ROUTINE_SYNC_MUTEX_CONTENDED, abstime);

        if (RutexBase::rutex_wait_return_etimeout == res) {
            return res;
        }
    }
    return RutexBase::rutex_wait_return_success;
}

第二行value_的值为{1,0,0}

第三行value_的值由{1,0,0} -> {1,1,0}, 同时whole->exchange的返回值为{1,0,0}

接下来,调用wait_until进行锁等待操作,下面的代码只保留了关键部分

rutex_wait_return wait_until(IntValueType expectValue,
                             const std::chrono::time_point<_Clock, _Duration> * abstime)
{
    RutexWaiter rw(*switcher);
    {
        std::unique_lock<std::mutex> lock(mtx_);
        if (base_t::value()->load(std::memory_order_relaxed) != expectValue) {
            return rutex_wait_return_ewouldblock;
        }

        int state = rw.state(); // 0
        switch (state) {
            case RutexWaiter::waiter_state_interrupted: // 2
                return rutex_wait_return_eintr;

            case RutexWaiter::waiter_state_ready:  // 1
                return rutex_wait_return_success;
        }

        assert(!rw.safe_unlink());
        rw.mark();            // 协程切换状态为suspend
        waiters_.push(&rw);   // 放入等待队列
        rw.owner_.store(this, std::memory_order_relaxed); // 原子操作, owner_ = this
    }

    rw.sleep(abstime);   // 1. abstime > 0, 加入timer, 放弃CPU
    					 // 2. abstime == 0, 放弃CPU
    //---------唤醒后----------
    rw.join();

    int state = rw.state();
    switch (state) {
        case RutexWaiter::waiter_state_none:        // 0
        case RutexWaiter::waiter_state_interrupted: // 2
            return rutex_wait_return_eintr;

        case RutexWaiter::waiter_state_timeout:     // 3
            return rutex_wait_return_etimeout;
    }
    return rutex_wait_return_success;
}

C尝试加锁失败

第二行value_的值为{1,1,0}

第三行value_的值由{1,1,0} -> {1,1,0}, 同时whole->exchange的返回值为{1,1,0}

解锁

void unlock()
{
    std::atomic<unsigned>* whole = (std::atomic<unsigned>*)rutex_.value();
    const unsigned prev = whole->exchange(0, std::memory_order_release);
    if (prev == LIBGO_ROUTINE_SYNC_MUTEX_LOCKED) {
        return ;
    }

    int res = rutex_.notify_one();
    (void)res;
}

第三行,解锁时whole有两种情况,{1,0,0}{1,1,0}

第四行,设置value_的值为{0,0,0},并返回原来的值

如果是{1,0,0}则表明等待队列中没有其他协程在等待该锁的释放,直接返回

如果是{1,1,0}则表明有其他协程等待,则唤醒一个等待的协程。

notify_one虽然名字是唤醒一个,==但实际上是对等待队列中的全部元素依次遍历==

int notify_one()
{
    for (;;) {
        std::unique_lock<std::mutex> lock(mtx_);  // 加线程锁
        RutexWaiter *rw = static_cast<RutexWaiter *>(waiters_.front()); // 取出等待队列中的第一个元素
        if (!rw) {
            return 0;
        }

        // 先lock,后unlink, 和join里面的顺序形成ABBA, 确保join可以等到这个函数结束 
        std::unique_lock<std::mutex> lock2(rw->wakeMtx_, std::defer_lock);
        bool isLock = lock2.try_lock();

        waiters_.unlink(rw);
        rw->owner_.store(nullptr, std::memory_order_relaxed); // owner_ = null

        if (!isLock) {
            continue;
        }

        lock.unlock();
        if (rw->wake()) {
            return 1;
        } else {
            RS_DBG(dbg_rutex, "rutex=%ld | %s | rw=%ld wake failed",
                   id(), __func__, rw->id());
        }
    }
}
bool wake(int state = waiter_state_ready) {
    if (waked_.load(std::memory_order_relaxed)) // if waked_ == 1, 表示已经唤醒过了
        return false;

    if (!switcher_->wake()) {
        return false;
    }

    state_ = state; // state = 1
    waked_.store(true, std::memory_order_relaxed); // waked_ = 1
    return true;
}

协程读写锁

类图

img