forked from yyzybb537/libgo
-
Notifications
You must be signed in to change notification settings - Fork 2
8 timer
bruceEeZhao edited this page Apr 22, 2023
·
1 revision
libgo中使用定时器有两种方式:
- 使用默认的timer,
RoutineSyncTimer
。这种情况下,起一个线程。默认情况下,超时任务都会加入到默认time维护的跳表中。 - 自定义timer,
CoTimer
。起一个协程,可以通过接口向自定义的timer中添加任务TimerId CoTimer::ExpireAt
。
本文档主要介绍自定义timer
注:
实际上,由于自定义的timer是一个协程,所以它的准确性并不高,因此不太建议使用该timer,使用sleep、usleep或co_sleep同样可以达到定时的目的。
为了说timer,首先说一下RoutineSyncTimerT
先看一下RoutineSyncTimerT
的私有成员:
class RoutineSyncTimerT
{
private:
MutexT mtx_;
container_type orderedList_;
ConditionVariableT cv_;
bool exit_ {false};
int64_t nextCheckAbstime_ = 0;
}
其中orderedList_
是其维护的一个跳表,跳表元素按照时间有序。container_type
定义为
typedef LinkedSkipList<clock_type::time_point, FuncWrapper> container_type;
跳表元素由{K,V,links}组成,
struct Node
{
PointPair links[MaxHeight];
uint8_t height = 0;
K key;
V value;
};
template<typename _Clock, typename _Duration>
void schedule(TimerId & id,
const std::chrono::time_point<_Clock, _Duration> & abstime,
func_type const& fn)
{
clock_type::time_point tp = convert(abstime);
id.key = tp;
id.value.set(fn);
orderedList_.buildNode(&id); //随机构造元素的高度
std::unique_lock<MutexT> lock(mtx_);
insert(id);
}
该函数的作用是将跳表Node类型的元素id加入跳表中
TimerId定义为:
typedef typename container_type::Node TimerId;
即上面的:struct Node
,是跳表元素的结构
run函数是一个死循环,不断检测跳表中的任务是否超时
void run()
{
std::unique_lock<MutexT> lock(mtx_);
while (!exit_) //未调用 stop()
{
TimerId* id = orderedList_.front(); // 取出 跳表 第一个元素
auto nowTp = now(); // 获取当前时间
if (id && nowTp >= id->key) { // 当前时间大于第一个元素的时间(定时时间)
std::shared_ptr<MutexT> invoke_mtx = id->value.mutex();
std::unique_lock<MutexT> invoke_lock(*invoke_mtx, std::defer_lock);
bool locked = invoke_lock.try_lock(); // ABBA
orderedList_.erase(id); // 删除元素
if (locked) { // 加锁成功
lock.unlock();
id->value.invoke(); // 执行 schedule 中注册的函数
lock.lock();
}
continue;
}
std::chrono::milliseconds sleepTime(1);
if (id) { // 队列中还有其他超时元素
std::chrono::milliseconds delta = std::chrono::duration_cast<
std::chrono::milliseconds>(id->key - nowTp);
sleepTime = (std::min)(sleepTime, delta); // 设置休眠时间
} else {
sleepTime = loop_interval(); // 默认的休眠时间 20ms
}
nextCheckAbstime_ = std::chrono::duration_cast<std::chrono::nanoseconds>((now() + sleepTime).time_since_epoch()).count();
cv_.wait_for(lock, sleepTime); // 原子地释放 lock ,阻塞当前线程,并将它添加到等待在 *this 上的线程列表。
// 线程将在执行 notify_all() 或 notify_one() 时,或度过相对时限 rel_time 时被解除阻塞。
}
}
class CoTimer
{
public:
typedef ::libgo::RoutineSyncTimerT<::libgo::Mutex, ::libgo::ConditionVariable> CoroutineTimer;
typedef CoroutineTimer::func_type func_t;
struct TimerIdImpl
{
......
};
typedef std::shared_ptr<TimerIdImpl> TimerIdImplPtr;
struct TimerId
{
......
};
public:
template <typename Rep, typename Period>
explicit CoTimer(std::chrono::duration<Rep, Period> dur, Scheduler * scheduler = nullptr)
{
impl_.reset(new CoroutineTimer);
Initialize(scheduler);
}
explicit CoTimer(Scheduler * scheduler = nullptr)
{
impl_.reset(new CoroutineTimer);
Initialize(scheduler);
}
~CoTimer();
TimerId ExpireAt(FastSteadyClock::duration dur, func_t const& cb);
TimerId ExpireAt(FastSteadyClock::time_point tp, func_t const& cb);
template <typename Rep, typename Period>
TimerId ExpireAt(std::chrono::duration<Rep, Period> dur, func_t const& fn) {
return ExpireAt(std::chrono::duration_cast<FastSteadyClock::duration>(dur), fn);
}
private:
CoTimer(CoTimer const&) = delete;
CoTimer& operator=(CoTimer const&) = delete;
void Initialize(Scheduler * scheduler);
private:
std::shared_ptr<CoroutineTimer> impl_;
};
从构造函数开始看起:
explicit CoTimer(Scheduler * scheduler = nullptr)
{
impl_.reset(new CoroutineTimer); // 设置智能指针
Initialize(scheduler); // 初始化
}
void CoTimer::Initialize(Scheduler * scheduler)
{
std::shared_ptr<CoroutineTimer> ptr = impl_;
go co_scheduler(scheduler) [ptr] { // 启动一个协程,运行自定义的timer
ptr->run();
};
}
构造函数启动一个协程,运行自定义的timer,最终调用了ptr->run();
。
接下来看在自定义的timer中添加任务的函数
CoTimer::TimerId CoTimer::ExpireAt(FastSteadyClock::time_point tp, func_t const& cb)
{
TimerIdImplPtr idImpl = std::make_shared<TimerIdImpl>(impl_);
func_t f = [idImpl, cb] {
cb();
};
impl_->schedule(idImpl->id_, tp, f);
return TimerId(idImpl);
}
#include "coroutine.h"
int main()
{
// 创建一个定时器
// 第一个参数: 精度
// 第二个参数: 绑定到一个调度器(Scheduler)
// 两个参数都有默认值, 可以简便地创建一个定时器: co_timer timer;
co_timer timer(std::chrono::milliseconds(1), &co_sched);
// 使用timer.ExpireAt接口设置一个定时任务
// 第一个参数可以是std::chrono中的时间长度,也可以是时间点。
// 第二个参数是定时器回调函数
// 返回一个co_timer_id类型的ID, 通过这个ID可以撤销还未执行的定时函数
co_timer_id id1 = timer.ExpireAt(std::chrono::seconds(1), []{
printf("Timer Callback.\n");
});
// co_timer_id::StopTimer接口可以撤销还未开始执行的定时函数
// 它返回bool类型的结果,如果撤销成功,返回true;
// 如果未来得及撤销,返回false, 此时不保证回调函数已执行完毕。
bool cancelled = id1.StopTimer();
printf("cancelled:%s\n", cancelled ? "true" : "false");
timer.ExpireAt(std::chrono::seconds(2), [&]{
printf("Timer Callback.\n");
co_sched.Stop();
});
for (int i = 0; i < 100; ++i)
go []{
// 休眠当前协程 1000 milliseconds.
// 不会阻塞线程, 因此100个并发的休眠, 总共只需要1秒.
co_sleep(1000);
};
co_sched.Start();
return 0;
}