Skip to content

8 timer

bruceEeZhao edited this page Apr 22, 2023 · 1 revision

libgo中使用定时器有两种方式:

  1. 使用默认的timer,RoutineSyncTimer。这种情况下,起一个线程。默认情况下,超时任务都会加入到默认time维护的跳表中。
  2. 自定义timer,CoTimer。起一个协程,可以通过接口向自定义的timer中添加任务TimerId CoTimer::ExpireAt

本文档主要介绍自定义timer

注:

实际上,由于自定义的timer是一个协程,所以它的准确性并不高,因此不太建议使用该timer,使用sleep、usleep或co_sleep同样可以达到定时的目的。

RoutineSyncTimerT

为了说timer,首先说一下RoutineSyncTimerT

类图

Collaboration graph

代码

私有成员

先看一下RoutineSyncTimerT的私有成员:

class RoutineSyncTimerT
{
private:
    MutexT mtx_;
    container_type orderedList_;
    ConditionVariableT cv_;
    bool exit_ {false};
    int64_t nextCheckAbstime_ = 0;
}

其中orderedList_是其维护的一个跳表,跳表元素按照时间有序。container_type定义为

typedef LinkedSkipList<clock_type::time_point, FuncWrapper> container_type;

跳表元素由{K,V,links}组成,

struct Node
{
    PointPair links[MaxHeight];
    uint8_t height = 0;
    K key;
    V value;
};

重要函数

schedule

template<typename _Clock, typename _Duration>
void schedule(TimerId & id,
              const std::chrono::time_point<_Clock, _Duration> & abstime,
              func_type const& fn)
{
    clock_type::time_point tp = convert(abstime);
    id.key = tp;
    id.value.set(fn);
    orderedList_.buildNode(&id);  //随机构造元素的高度

    std::unique_lock<MutexT> lock(mtx_);
    insert(id);
}

该函数的作用是将跳表Node类型的元素id加入跳表中

TimerId定义为:

typedef typename container_type::Node TimerId;

即上面的:struct Node,是跳表元素的结构

run

run函数是一个死循环,不断检测跳表中的任务是否超时

void run()
{
    std::unique_lock<MutexT> lock(mtx_);
    while (!exit_) //未调用 stop()
    {
        TimerId* id = orderedList_.front(); // 取出 跳表 第一个元素
        auto nowTp = now();                 // 获取当前时间
        if (id && nowTp >= id->key) {       // 当前时间大于第一个元素的时间(定时时间)
            std::shared_ptr<MutexT> invoke_mtx = id->value.mutex();
            std::unique_lock<MutexT> invoke_lock(*invoke_mtx, std::defer_lock);
            bool locked = invoke_lock.try_lock();   // ABBA

            orderedList_.erase(id);         // 删除元素

            if (locked) {                   // 加锁成功
                lock.unlock();
                id->value.invoke();         // 执行 schedule 中注册的函数
                lock.lock();
            }
            continue;
        }

        std::chrono::milliseconds sleepTime(1);
        if (id) {                            // 队列中还有其他超时元素
            std::chrono::milliseconds delta = std::chrono::duration_cast<
                std::chrono::milliseconds>(id->key - nowTp);
            sleepTime = (std::min)(sleepTime, delta);  // 设置休眠时间
        } else {
            sleepTime = loop_interval();     // 默认的休眠时间 20ms
        }

        nextCheckAbstime_ = std::chrono::duration_cast<std::chrono::nanoseconds>((now() + sleepTime).time_since_epoch()).count();

        cv_.wait_for(lock, sleepTime); // 原子地释放 lock ,阻塞当前线程,并将它添加到等待在 *this 上的线程列表。
        // 线程将在执行 notify_all() 或 notify_one() 时,或度过相对时限 rel_time 时被解除阻塞。
    }
}

CoTimer

代码

class CoTimer
{
public:
    typedef ::libgo::RoutineSyncTimerT<::libgo::Mutex, ::libgo::ConditionVariable> CoroutineTimer;
    typedef CoroutineTimer::func_type func_t;

    struct TimerIdImpl
    {
    ......
    };
    typedef std::shared_ptr<TimerIdImpl> TimerIdImplPtr;

    struct TimerId
    {
    ......
    };

public:
    template <typename Rep, typename Period>
    explicit CoTimer(std::chrono::duration<Rep, Period> dur, Scheduler * scheduler = nullptr)
    {
        impl_.reset(new CoroutineTimer);
        Initialize(scheduler);
    }

    explicit CoTimer(Scheduler * scheduler = nullptr)
    {
        impl_.reset(new CoroutineTimer);
        Initialize(scheduler);
    }

    ~CoTimer();

    TimerId ExpireAt(FastSteadyClock::duration dur, func_t const& cb);

    TimerId ExpireAt(FastSteadyClock::time_point tp, func_t const& cb);

    template <typename Rep, typename Period>
    TimerId ExpireAt(std::chrono::duration<Rep, Period> dur, func_t const& fn) {
        return ExpireAt(std::chrono::duration_cast<FastSteadyClock::duration>(dur), fn);
    }

private:
    CoTimer(CoTimer const&) = delete;
    CoTimer& operator=(CoTimer const&) = delete;

    void Initialize(Scheduler * scheduler);

private:
    std::shared_ptr<CoroutineTimer> impl_;
};

从构造函数开始看起:

explicit CoTimer(Scheduler * scheduler = nullptr)
{
    impl_.reset(new CoroutineTimer);  // 设置智能指针
    Initialize(scheduler); // 初始化
}

void CoTimer::Initialize(Scheduler * scheduler)
{
    std::shared_ptr<CoroutineTimer> ptr = impl_;  
    go co_scheduler(scheduler) [ptr] {  // 启动一个协程,运行自定义的timer
        ptr->run();
    };
}

构造函数启动一个协程,运行自定义的timer,最终调用了ptr->run();

接下来看在自定义的timer中添加任务的函数

CoTimer::TimerId CoTimer::ExpireAt(FastSteadyClock::time_point tp, func_t const& cb)
{
    TimerIdImplPtr idImpl = std::make_shared<TimerIdImpl>(impl_);
    func_t f = [idImpl, cb] {
        cb();
    };
    impl_->schedule(idImpl->id_, tp, f);
    return TimerId(idImpl);
}

使用自定义的timer

#include "coroutine.h"

int main()
{
    // 创建一个定时器
    // 第一个参数: 精度
    // 第二个参数: 绑定到一个调度器(Scheduler)
    // 两个参数都有默认值, 可以简便地创建一个定时器: co_timer timer; 
    co_timer timer(std::chrono::milliseconds(1), &co_sched);

    // 使用timer.ExpireAt接口设置一个定时任务
    // 第一个参数可以是std::chrono中的时间长度,也可以是时间点。
    // 第二个参数是定时器回调函数
    // 返回一个co_timer_id类型的ID, 通过这个ID可以撤销还未执行的定时函数
    co_timer_id id1 = timer.ExpireAt(std::chrono::seconds(1), []{
            printf("Timer Callback.\n");
            });

    // co_timer_id::StopTimer接口可以撤销还未开始执行的定时函数
    // 它返回bool类型的结果,如果撤销成功,返回true;
    //     如果未来得及撤销,返回false, 此时不保证回调函数已执行完毕。
    bool cancelled = id1.StopTimer();
    printf("cancelled:%s\n", cancelled ? "true" : "false");

    timer.ExpireAt(std::chrono::seconds(2), [&]{
            printf("Timer Callback.\n");
            co_sched.Stop();
            });

    for (int i = 0; i < 100; ++i)
        go []{
            // 休眠当前协程 1000 milliseconds.
            // 不会阻塞线程, 因此100个并发的休眠, 总共只需要1秒.
            co_sleep(1000);
        };

    co_sched.Start();
    return 0;
}
Clone this wiki locally