Skip to content
bruceEeZhao edited this page Apr 22, 2023 · 1 revision

[TOC]

本节为了梳理libgo中gc相关的内容

scheduler

默认的scheduler位于栈中,在程序退出时不需要手动清理空间,但通过Create创建的scheduler实例位于堆中,在退出之前需要做空间清理,于是libgo设计了一个gc机制,负责退出前的scheduler实例的空间回收。

该机制通过使用atexit函数向系统注册在程序退出(调用exit或main函数返回时)时需要执行的函数,这里注册了一个delete函数,会执行析构函数。

主要的函数为InitOnExit:在程序退出时会执行onExit函数,完成gc。

static int InitOnExit() {
    atexit(&onExit); // register a function to be called at normal process termination
    return 0;
}

onExit定义为:

static void onExit(void) {
    auto vec = ExitList();
    for (auto fn : *vec) {
        fn();
    }
    vec->clear();
}

该函数遍历vec数组,执行数组中的每一个函数,执行完成之后释放vec本身的空间,

vec数组在向其中添加元素时需要是线程安全的,为此定义了一个全局锁。定义如下:

// 用于获取ExitList的全局锁变量
std::mutex& ExitListMtx()
{
    static std::mutex mtx;
    return mtx;
}

std::vector<std::function<void()>>* ExitList()
{
    static std::vector<std::function<void()>> *vec = new std::vector<std::function<void()>>;
    return vec;
}

在执行Create函数时会将delete函数加入vec中

Scheduler* Scheduler::Create()
{
    static int ignore = InitOnExit();
    (void)ignore;

    Scheduler* sched = new Scheduler;
    std::unique_lock<std::mutex> lock(ExitListMtx()); // for thread safe, vec
    auto vec = ExitList();
    vec->push_back([=] { delete sched; });
    return sched;
}

实际的gc会执行scheduler的析构函数,析构函数定义为:

Scheduler::~Scheduler()
{
    IsExiting() = true;
    Stop();
}

void Scheduler::Stop()
{
    std::unique_lock<std::mutex> lock(stopMtx_);

    if (stop_) return;

    stop_ = true;
    size_t n = processers_.size();
    for (size_t i = 0; i < n; ++i) {
        auto p = processers_[i];
        if (p)
            p->NotifyCondition(); // TODO
    }

    if (timer_) timer_->stop();

    if (dispatchThread_.joinable())
        dispatchThread_.join();
}

注:

static 的变量是唯一的,只会被初始化一次

Task GC

每一个processer都维护了一个gc队列gcQueue,执行完毕的协程会被加入该队列中。

下面给出一个代码片段,该代码描述了对状态为done的task的处理:

case TaskState::done:
default:
{
    runnableQueue_.next(runningTask_, nextTask_); // nextTask_ = runningTask_->next
    if (!nextTask_ && addNewQuota_ > 0) {
        if (AddNewTasks()) {
            runnableQueue_.next(runningTask_, nextTask_);
            -- addNewQuota_;
        }
    }

    DebugPrint(dbg_task, "task(%s) done.", runningTask_->DebugInfo());
    runnableQueue_.erase(runningTask_);  // 移出该 task
    if (gcQueue_.size() > 16)
        GC();
    gcQueue_.push(runningTask_);  // 加入gc队列
    if (runningTask_->eptr_) {    // 错误处理
        std::exception_ptr ep = runningTask_->eptr_;
        std::rethrow_exception(ep);
    }

    std::unique_lock<TaskQueue::lock_t> lock(runnableQueue_.LockRef());
    runningTask_ = nextTask_; // 设置下一个要执行的task
    nextTask_ = nullptr;
}
break;

加入gc队列之前会对gc队列进行判断,如果队列长度大于16,则进行一次gc。

gc实际上就是把队列中的task依次取出,并执行DecrementRef,

void Processer::GC()
{
    auto list = gcQueue_.pop_all();
    for (Task & tk : list) {
        tk.DecrementRef();
    }
    list.clear();
}

调用GC函数的还有一个方法:

void Processer::WaitCondition()
{
    GC();
    std::unique_lock<TaskQueue::lock_t> lock(newQueue_.LockRef());
    if (notified_) {
        DebugPrint(dbg_scheduler, "WaitCondition by Notified. [Proc(%d)] --------------------------", id_);
        notified_ = false;
        return ;
    }

    waiting_ = true;
    DebugPrint(dbg_scheduler, "WaitCondition. [Proc(%d)] --------------------------", id_);
    cv_.wait(lock);
    waiting_ = false;
}

为什么使用GC

libgo中为task添加了引用计数,只有引用计数为0时才会被删除。Task 继承了类 SharedRefObject

virtual bool RefObject::DecrementRef()
{
    if (--*reference_ == 0) {
        deleter_(this);
        return true;
    }
    return false;
}

^
|
|
    
virtual bool SharedRefObject::DecrementRef()
{
    RefObjectImpl * impl = impl_;
    if (RefObject::DecrementRef()) {
        std::atomic_thread_fence(std::memory_order_acq_rel);
        impl->DecrementWeak();
        return true;
    }
    return false;
}

但实际上,GC函数并没有理睬引用计数,一股脑全删除了。。 TODO

void Processer::GC()
{
    auto list = gcQueue_.pop_all();
    for (Task & tk : list) {
        tk.DecrementRef();
    }
    list.clear();
}
Clone this wiki locally