timer 模块
timer的定义,cyberrt中timer模块用于设置定时器任务,字面意思,设置设置定时周期及出发频次(周期 or oneshot),到达指定时间时间触发callback
time wheel
时钟节拍轮,常见的定时器设计,例如ucos中的定时器,Linux Crontab等,cyberrt也是采用了时钟轮
时间轮(TimingWheel)简单来说,是一个 存储定时任务的循环队列,队列中的每个元素都可以放置一个定时任务列表(TimeBucket) 。TimeBucket 是一个list,链表中的每一项表示的都是定时任务(TimerTask)。
时钟轮示意图
类图
@startuml
class Timer{
+explicit Timer(TimerOption opt)
+void Start()
+void Stop()
-TimerOption timer_opt_;
-TimingWheel* timing_wheel_ = nullptr;
-std::shared_ptr<TimerTask> task_;
}
class TimingWheel {- TimerBucket work_wheel_[WORK_WHEEL_SIZE]- TimerBucket assistant_wheel_[ASSISTANT_WHEEL_SIZE]- std::thread tick_thread_+ ~TimingWheel()+ void Start()+ void Shutdown()+ void Tick()+ void AddTask(const std::shared_ptr<TimerTask>& task) + void TickFunc()}
class TimerBucket {- std::mutex mutex_- std::list<std::weak_ptr<TimerTask>> task_list_+ void AddTask(const std::shared_ptr<TimerTask>& task)+ std::mutex& mutex()+ std::list<std::weak_ptr<TimerTask>>& task_list()
}struct TimerTask {+ TimerTask(uint64_t timer_id)+ uint64_t timer_id_+ std::function<void()> callback+ uint64_t interval_ms+ uint64_t remainder_interval_ms+ uint64_t next_fire_duration_ms+ int64_t accumulated_error_ns+ uint64_t last_execute_time_ns+ std::mutex mutex
}
class TimerOption {+ uint32_t period+ std::function<void()> callback+ bool oneshot+ TimerOption(uint32_t period, std::function<void()> callback, bool oneshot)+ TimerOption()
}note left of Timer外部主要调用类,定时器对象实体
end notenote left of TimerOption配置参数,主要作为入参构造timer,用于配置定时器
end notenote left of TimingWheel环形队列,cyberrt内部实现为二级时钟节拍轮,单例
end notenote left of TimerBucket环形队列中的元素,为链表,存储std::weak_ptr<TimerTask>,等价线程安全的list<std::weak_ptr<TimerTask>>
end noteTimer --> TimerOption : 入参依赖
Timer --> TimingWheel : 成员依赖
TimingWheel --> TimerBucket : 成员依赖
TimerBucket --> TimerTask : 成员依赖 @enduml
实现原理
timingwheel 中的tick线程用于统计时间时间,并获取指向的时钟节拍论里面的元素进行任务触发,并将任务丢到cyberrt的携程池里运行,timewheel的实现基本都大同小异,这里不细说。
bug
bug主要是call back的生命周期管理问题,在代码中其实已经考虑了一部分(shared_ptr+weak_ptr)已经保证了一部分的悬空指针的问题,但是不完全。
bool Timer::InitTimerTask() {task_.reset(new TimerTask(timer_id_));task_->interval_ms = timer_opt_.period;task_->next_fire_duration_ms = task_->interval_ms;if (timer_opt_.oneshot) {std::weak_ptr<TimerTask> task_weak_ptr = task_;task_->callback = [callback = this->timer_opt_.callback, task_weak_ptr]() {auto task = task_weak_ptr.lock();if (task) {std::lock_guard<std::mutex> lg(task->mutex);callback();}};} else {std::weak_ptr<TimerTask> task_weak_ptr = task_;task_->callback = [callback = this->timer_opt_.callback, task_weak_ptr]() {auto task = task_weak_ptr.lock();if (!task) {return;}XXXX //省略TimingWheel::Instance()->AddTask(task);};}return true;
}void TimingWheel::Tick() {auto& bucket = work_wheel_[current_work_wheel_index_];{std::lock_guard<std::mutex> lock(bucket.mutex());auto ite = bucket.task_list().begin();while (ite != bucket.task_list().end()) {auto task = ite->lock();if (task) {ADEBUG << "index: " << current_work_wheel_index_<< " timer id: " << task->timer_id_;auto* callback =reinterpret_cast<std::function<void()>*>(&(task->callback));cyber::Async([this, callback] {if (this->running_) {(*callback)();}});}ite = bucket.task_list().erase(ite);}}
}
以上代码,构建TimerTask添加到timing wheel中,task为share_ptr,所有权归属timer对象很合理,传递weak_ptr对象給timingWhee,timer对象释放后,节拍轮里面的weak_ptr不再有效进行不必要的触发,这也很正确,合理的考虑了timer对象持有的task和timingwheel中task的异步生命周期管理的问题。
但是在tick函数中,将触发的task放到异步协程池运行的时候则出现问题了,未考虑异步生命周期的问题。
现在假设我们有这样一个场景
创建了一个10ms的周期timer,经过第一个10ms时触发了定时器并将回掉丢到协程池中并推出tick,假设当时协程池比较繁忙,有恰巧我们迅速释放掉timer对象或者stop掉timer对象,you lose,程序boom了。
因为传递task这个share_ptr对象里的callback 成员函数进行了异步调用,tick函数内虽然正确获取了task,退出该函数,task引用计数-1,于此同时我们stop了timer 对象s或者 析构了对象,time持有的task,引用再次-1归0,tick函数内cyber::Async虽然正确获取了task的callback,但此时,已经是悬空指针了。
修改
知道了原因,改起来也很方便,无非就是异步调用指针的管理而已。
void TimingWheel::Tick() {auto& bucket = work_wheel_[current_work_wheel_index_];{std::lock_guard<std::mutex> lock(bucket.mutex());auto ite = bucket.task_list().begin();while (ite != bucket.task_list().end()) {auto task = ite->lock();if (task) {ADEBUG << "index: " << current_work_wheel_index_<< " timer id: " << task->timer_id_;// auto* callback =// reinterpret_cast<std::function<void()>*>(&(task->callback));cyber::Async([this, weakTask = std::weak_ptr<TimerTask>(task)] {auto task = weakTask->lock();if (this->running_ && task ) {task->callback();}});}ite = bucket.task_list().erase(ite);}}
}