通过chromium 官方文档,线程和任务一节我们可以知道 ,chromium有两类线程,一类是普通线程,最典型的就是io线程和ui线程。 另一类是 线程池线程。 今天我们先分析普通线程的实现,下一篇文章分析线程池的实现。(基于版本 117.0.5847.0(开发者内部版本) (64 位) 分析)。
通过官方文档我们知道,chromium的普通线程采用轮询消息队列的模式来处理任务。这种模式在ui编程中非常常见,比如Android的MessageLooper模型, glib的MainLooper模型, java的netty库都是采用此种模式,那么分析chromium模型我们就要先找到线程运行的核心消息循环。
方便后面学习,我们先给出总体数据结构:
chromium使用Thread类表示普通的线程。
我们先看一下Thread的启动逻辑
base/threading/thread.cc
一、任务循环
bool Thread::Start() {......return StartWithOptions(std::move(options));
}bool Thread::StartWithOptions(Options options) {......{AutoLock lock(thread_lock_);bool success = options.joinable? PlatformThread::CreateWithType(options.stack_size, this, &thread_,options.thread_type, options.message_pump_type): PlatformThread::CreateNonJoinableWithType(options.stack_size, this, options.thread_type,options.message_pump_type);......return true;
}
Thread启动创建一个平台线程PlatformThread, 在linux系统里面,该线程就是pthread线程, pthread启动之后调用Thread的ThreadMain方法进入主循环。
void Thread::ThreadMain() {....RunLoop run_loop;run_loop_ = &run_loop;Run(run_loop_);......
}void Thread::Run(RunLoop* run_loop) {......run_loop->Run();
}
ThreadMain方法创建一个RunLoop,并且调用RunLoop->Run()方法进入循环。
base/run_loop.cc
void RunLoop::Run(const Location& location) {......delegate_->Run(application_tasks_allowed, TimeDelta::Max());......
}
RunLooper 调用delegate_->Run()方法进入循环。 要想继续分析我们看一下delegate_是什么。
ABSL_CONST_INIT thread_local RunLoop::Delegate* delegate = nullptr;RunLoop::RunLoop(Type type): delegate_(delegate),type_(type),origin_task_runner_(SingleThreadTaskRunner::GetCurrentDefault()) {DCHECK(delegate_) << "A RunLoop::Delegate must be bound to this thread prior ""to using RunLoop.";DCHECK(origin_task_runner_);
}
delegate是一个thread_local 变量, 在这里是ThreadControllerWithMessagePumpImpl对象, 感兴趣的读者自行分析创建和赋值过程。本文主要关注点是chromium线程模型的大体逻辑,不去分析细枝末节,请自行阅读代码进行理解。 我们直接看ThreadControllerWithMessagePumpImpl的Run方法。
base/task/sequence_manager/thread_controller_with_message_pump_impl.cc
void ThreadControllerWithMessagePumpImpl::Run(bool application_tasks_allowed,TimeDelta timeout) {......if (application_tasks_allowed && !main_thread_only().task_execution_allowed) {// Allow nested task execution as explicitly requested.DCHECK(RunLoop::IsNestedOnCurrentThread());// 本来执行任务的时候不允许再执行其他任务, 这里为了运行嵌套RunLooper 临时设置main_thread_only().task_execution_allowed = truemain_thread_only().task_execution_allowed = true;pump_->Run(this);main_thread_only().task_execution_allowed = false;} else {// 非嵌套RunLooper 直接执行pump_->Run(), 如何推断首次执行? 因为默认task_execution_allowed=truepump_->Run(this);}......
}
ThreadControllerWithMessagePumpImpl.Run() 方法又调用了pump_->Run() 方法。 pump_在不同平台有不同实现,我们以linux下的libevent实现为例子
// Reentrant!
void MessagePumpLibevent::Run(Delegate* delegate) {......RunState run_state(delegate); // 创建一个runstate, 并且使用auto_reset_run_state赋值给成员变量run_state_, 主要作用是当上一层runstate 退出之后,会自动恢复下一层run_state, 这样可以支持嵌套RunLooperAutoReset<RunState*> auto_reset_run_state(&run_state_, &run_state);// event_base_loopexit() + EVLOOP_ONCE is leaky, see http://crbug.com/25641.// Instead, make our own timer and reuse it on each call to event_base_loop().std::unique_ptr<event> timer_event(new event);for (;;) {// Do some work and see if the next task is ready right away.// 执行任务,并且获取下一次要执行的任务信息Delegate::NextWorkInfo next_work_info = delegate->DoWork();bool immediate_work_available = next_work_info.is_immediate();if (run_state.should_quit)break;// Process native events if any are ready. Do not block waiting for more. Do// not instantiate a ScopedDoWorkItem for this call as:// - This most often ends up calling OnLibeventNotification() below which// already instantiates a ScopedDoWorkItem (and doing so twice would// incorrectly appear as nested work).// - "ThreadController active" is already up per the above DoWork() so this// would only be about detecting #work-in-work-implies-nested// (ref. thread_controller.h).// - This can result in the same work as the// event_base_loop(event_base_, EVLOOP_ONCE) call at the end of this// method and that call definitely can't be in a ScopedDoWorkItem as// it includes sleep.// - The only downside is that, if a native work item other than// OnLibeventNotification() did enter a nested loop from here, it// wouldn't be labeled as such in tracing by "ThreadController active".// Contact gab@/scheduler-dev@ if a problematic trace emerges.event_base_loop(event_base_.get(), EVLOOP_NONBLOCK); // 处理原生事件,主要是处理io事件bool attempt_more_work = immediate_work_available || processed_io_events_; // 下一个任务需要马上执行或者有io事件processed_io_events_ = false;if (run_state.should_quit)break;if (attempt_more_work) // 需要立即处理下一个事件,执行下一次循环continue;attempt_more_work = delegate->DoIdleWork(); // 没有立即要执行任务,执行idle任务if (run_state.should_quit)break;if (attempt_more_work) // idle 任务告知有立即执行任务,继续下一次循环continue;bool did_set_timer = false;// If there is delayed work.DCHECK(!next_work_info.delayed_run_time.is_null());if (!next_work_info.delayed_run_time.is_max()) { // 下一次任务有明确执行时间预期,添加一个libevent的超时事件用于处理下次任务const TimeDelta delay = next_work_info.remaining_delay();// Setup a timer to break out of the event loop at the right time.struct timeval poll_tv;poll_tv.tv_sec = static_cast<time_t>(delay.InSeconds());poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond;event_set(timer_event.get(), -1, 0, timer_callback, event_base_.get());event_base_set(event_base_.get(), timer_event.get());event_add(timer_event.get(), &poll_tv);did_set_timer = true;}// Block waiting for events and process all available upon waking up. This// is conditionally interrupted to look for more work if we are aware of a// delayed task that will need servicing.delegate->BeforeWait();event_base_loop(event_base_.get(), EVLOOP_ONCE); // 进入libevent 等待事件触发// We previously setup a timer to break out the event loop to look for more// work. Now that we're here delete the event.if (did_set_timer) {event_del(timer_event.get());}if (run_state.should_quit)break;}
}
MessagePumpLibevent::Run() 方法是真正处理事件的地方,这里有一个死循环检测事件触发(就是我们要找的消息循环),并执行事件,这是很常见的poll模型。 整体流程如下
- 如果有可以执行的任务,就执行它并获取下一次任务的时间
- 设置下一次任务执行时间作为poll的超时时间
- 等待事件触发
- 重复1-4 直到退出。
这里还包含对io事件(文件可读、可写,阻塞等事件)的处理,实现方式就是当IO事件触发的时候添加一个任务到任务队列里面,然后设置processed_io_events_=true。
由此我们可以知道,处理事件的函数为delegate->DoIdleWork(),也就是从队列取任务处理,并获取下一次任务的信息, 这里delegate为ThreadControllerWithMessagePumpImpl。 我们继续分析
二、任务获取和执行
任务获取主要的工作是从任务队列里面获取消息,也就是ThreadControllerWithMessagePumpImpl.DoIdleWork() 方法主要工作,前面我们已经分析了这个函数主要的职责:执行任务,并且获取下一次任务执行的时间。
MessagePump::Delegate::NextWorkInfo
ThreadControllerWithMessagePumpImpl::DoWork() {MessagePump::Delegate::NextWorkInfo next_work_info{};LazyNow continuation_lazy_now(time_source_);absl::optional<WakeUp> next_wake_up = DoWorkImpl(&continuation_lazy_now);......// Schedule a continuation.WorkDeduplicator::NextTask next_task =(next_wake_up && next_wake_up->is_immediate())? WorkDeduplicator::NextTask::kIsImmediate: WorkDeduplicator::NextTask::kIsDelayed;// 去重模块要求直接执行下一次任务,直接返回if (work_deduplicator_.DidCheckForMoreWork(next_task) ==ShouldScheduleWork::kScheduleImmediate) {// Need to run new work immediately, but due to the contract of DoWork// we only need to return a null TimeTicks to ensure that happens.return next_work_info;}// 没有下一次执行的任务,设置时间为TimeTicks::Max()// Special-casing here avoids unnecessarily sampling Now() when out of work.if (!next_wake_up) {main_thread_only().next_delayed_do_work = TimeTicks::Max();next_work_info.delayed_run_time = TimeTicks::Max();return next_work_info;}......// Don't request a run time past |main_thread_only().quit_runloop_after|.// 下次执行时间早于退出时间,设置下次执行时间为退出时间if (main_thread_only().next_delayed_do_work >main_thread_only().quit_runloop_after) {main_thread_only().next_delayed_do_work =main_thread_only().quit_runloop_after;// If we've passed |quit_runloop_after| there's no more work to do.if (continuation_lazy_now.Now() >= main_thread_only().quit_runloop_after) {next_work_info.delayed_run_time = TimeTicks::Max();return next_work_info;}}next_work_info.delayed_run_time = CapAtOneDay(main_thread_only().next_delayed_do_work, &continuation_lazy_now);next_work_info.recent_now = continuation_lazy_now.Now();return next_work_info;
}
这里主要调用DoWorkImp方法执行任务并获取下一次任务执行时间, 获取时间后做了一些修正。
absl::optional<WakeUp> ThreadControllerWithMessagePumpImpl::DoWorkImpl(LazyNow* continuation_lazy_now) {// 如果允许批量执行可以执行的任务, 则设置最大批量执行时间// Keep running tasks for up to 8ms before yielding to the pump when tasks are// run by batches.const base::TimeDelta batch_duration =RunsTasksByBatches() ? base::Milliseconds(8) : base::Milliseconds(0);const absl::optional<base::TimeTicks> start_time =batch_duration.is_zero()? absl::nullopt: absl::optional<base::TimeTicks>(time_source_->NowTicks());absl::optional<base::TimeTicks> recent_time = start_time;// Loops for |batch_duration|, or |work_batch_size| times if |batch_duration|// is zero.// 检查批量超时时间和批量执行任务个数两个条件, 进行批量执行任务for (int num_tasks_executed = 0;(!batch_duration.is_zero() &&(recent_time.value() - start_time.value()) < batch_duration) ||(batch_duration.is_zero() &&num_tasks_executed < main_thread_only().work_batch_size);++num_tasks_executed) {......// 选择可以执行的任务absl::optional<SequencedTaskSource::SelectedTask> selected_task =main_thread_only().task_source->SelectNextTask(lazy_now_select_task,select_task_option);......{......// 运行任务task_annotator_.RunTask("ThreadControllerImpl::RunTask", selected_task->task,[&selected_task, &source](perfetto::EventContext& ctx) {if (selected_task->task_execution_trace_logger) {selected_task->task_execution_trace_logger.Run(ctx, selected_task->task);}source->MaybeEmitTaskDetails(ctx, selected_task.value());});}......} // 批量执行end......// 获取下一次要执行的任务(批量执行完成,或者延迟任务,或者没有可以执行的任务)return main_thread_only().task_source->GetPendingWakeUp(continuation_lazy_now,select_task_option);
}
DoWorkImp() 函数主要做三个工作
1、调用main_thread_only().task_source->SelectNextTask(lazy_now_select_task, select_task_option) 函数选择一些可以执行的任务进行执行
2、调用task_annotator_.RunTask() 执行任务
3、最后调用main_thread_only().task_source->GetPendingWakeUp(continuation_lazy_now,select_task_option) 获取下一次要执行的任务。返回给消息循环作为休眠时间的参考。
chromium支持定时任务,这里获取到的下一次任务有三种情况:
1、任务还没有达到定时时间。
2、需要立即执行的任务,也就是一次批量任务没有处理完所有可以执行的任务。
3、没有任务
这里补充一点 main_thread_only() 获取到的MainThreadOnly代表只在当前线程中使用的数据, 可以不用加锁访问。下面我们就按照DoWorkImp的三个工作步骤去分析获取和执行任务的过程。
先来看main_thread_only().task_source->SelectNextTask(lazy_now_select_task, select_task_option) , 这里task_source 为SequenceManagerImpl。
再分析这段代码之前我们先来介绍一些SequenceManagerImpl维护的队列数据结构。
可以先查看这个官方文档。
我们 前面说过MainThead在固定线程中访问(不需要持锁),与之相对的是AnyThread 可以再任意线程访问(需要持锁)。 我们来看SequenceManagerImpl维护的队列数据结构。 SequenceManagerImpl的MainThreadonly持有一个WakeUpQueue结构,该结构通过IntrusiveHeap(最大堆数据结构)管理多个ScheduleWake数据结构, 按照weak 从小到达的顺序排序。 ScheduleWake持有TaskQueueImpl对象,也就是说一个SequenceManagerImpl可以管理多个TaskQueueImpl。每个TaskQueueImpl 是真正管理任务的地方, TaskQueueImpl 主要有四个队列,非别是
- MainThreadOnly->delayed_work_queue;
- MainThreadOnly-> immediate_work_queue;
- MainThreadOnly-> delayed_incoming_queue;
- AnyThread->immediate_incoming_queue;
前三个队列是MainThreadOnly持有,表示只能在目标线程中使用, 最后一个队列是AnyThread持有,表示可以在任意线程使用。 延迟任务提交后会进入delayed_incoming_queue, 当任务到达可执行时间后提交到 delayed_work_queue,然后去执行。 立即执行任务先提交到immediate_incoming_queue,然后再由目标线程提交到immediate_work_queue,然后去执行。 可以仔细阅读下官方文档。
有了上述背景知识我们再分析SequenceManagerImpl->SelectNextTaskImpl() 函数
absl::optional<SequenceManagerImpl::SelectedTask>
SequenceManagerImpl::SelectNextTaskImpl(LazyNow& lazy_now,SelectTaskOption option) {ReloadEmptyWorkQueues(); //1、 当MainThreadOnly-> immediate_work_queue为空时候尝试从AnyThread->immediate_incoming_queue获取任务MoveReadyDelayedTasksToWorkQueues(&lazy_now); // 2、 将准备就绪的延期任务从MainThreadOnly-> delayed_incoming_queue 提交到MainThreadOnly->delayed_work_queue......while (true) {// 3、选择一个WorkQueue (TaskQueueImpl::MainThreadOnly-> immediate_work_queue// 或者TaskQueueImpl::MainThreadOnly->delayed_work_queue, 这里// SelectNextTaskImpl::main_thread_only().selector 管理多个WorkQueue,// 使用一些优先级策略来选择WorkQueue )internal::WorkQueue* work_queue =main_thread_only().selector.SelectWorkQueueToService(option); ......if (!work_queue)return absl::nullopt;......// work_queue->TakeTaskFromWorkQueue() 或缺Task 放到main_thread_only().task_execution_stackmain_thread_only().task_execution_stack.emplace_back(work_queue->TakeTaskFromWorkQueue(), work_queue->task_queue(),InitializeTaskTiming(work_queue->task_queue()));// main_thread_only().task_execution_stack 取出来要执行的TaskExecutingTask& executing_task =*main_thread_only().task_execution_stack.rbegin();NotifyWillProcessTask(&executing_task, &lazy_now);......// 4、返回Taskreturn SelectedTask(executing_task.pending_task,executing_task.task_queue->task_execution_trace_logger(),executing_task.priority, executing_task.task_queue_name);}
}
这个函数执行4步操作
- 执行ReloadEmptyWorkQueues(); 当MainThreadOnly-> immediate_work_queue为空时候尝试从AnyThread->immediate_incoming_queue获取任务
- 执行MoveReadyDelayedTasksToWorkQueues(&lazy_now),将准备就绪的延期任务从MainThreadOnly-> delayed_incoming_queue 提交到MainThreadOnly->delayed_work_queue
- 执行 main_thread_only().selector.SelectWorkQueueToService(option)选择一个WorkQueue (TaskQueueImpl::MainThreadOnly-> immediate_work_queue 或者TaskQueueImpl::MainThreadOnly->delayed_work_queue, 这里 SelectNextTaskImpl::main_thread_only().selector 管理多个WorkQueue,使用一些优先级策略来选择WorkQueue )(实际管理多个TaskQueueImpl)
- 返回选取的Task。
到这里任务选取的逻辑我们就看完了。
我们再来分析任务运行的过程。
base/task/common/task_annotator.h|base/task/common/task_annotator.cc
void RunTask(perfetto::StaticString event_name,PendingTask& pending_task,Args&&... args) {......RunTaskImpl(pending_task);}
void TaskAnnotator::RunTaskImpl(PendingTask& pending_task) {.......std::move(pending_task.task).Run();.......
}
代码很简单,调用了pending_task.task的Run方法。 在下一节任务提交阶段我们会看到pending_task.task是什么。
最后我们分析获取下一次任务唤醒时间的逻辑。
absl::optional<WakeUp> SequenceManagerImpl::GetPendingWakeUp(LazyNow* lazy_now,SelectTaskOption option) const {......// Otherwise we need to find the shortest delay, if any. NB we don't need to// call MoveReadyDelayedTasksToWorkQueues because it's assumed// DelayTillNextTask will return TimeDelta>() if the delayed task is due to// run now.return AdjustWakeUp(GetNextDelayedWakeUpWithOption(option), lazy_now);
}absl::optional<WakeUp> SequenceManagerImpl::GetNextDelayedWakeUpWithOption(SelectTaskOption option) const {DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);if (option == SelectTaskOption::kSkipDelayedTask)return absl::nullopt;return GetNextDelayedWakeUp();
}absl::optional<WakeUp> SequenceManagerImpl::GetNextDelayedWakeUp() const {DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);return main_thread_only().wake_up_queue->GetNextDelayedWakeUp();
}
这里调用SequenceManagerImpl::GetNextDelayedWakeUp() 获取下次唤醒时间。 实际调用SequenceManagerImpl::wake_up_queue->GetNextDelayedWakeUp()方法
base/task/sequence_manager/wake_up_queue.cc
absl::optional<WakeUp> WakeUpQueue::GetNextDelayedWakeUp() const {DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);if (wake_up_queue_.empty())return absl::nullopt;WakeUp wake_up = wake_up_queue_.top().wake_up;......return wake_up;
}
GetNextDelayedWakeUp() 方法从wake_up_queue_选择最近唤醒的WakeUp信息返回。我们前面说过wake_up_queue_为IntrusiveHeap<ScheduledWakeUp, std::greater<>>(最大堆数据结构, 这里使用 std::greater作为比较方法,实际最小唤醒时间在堆的最上面)。
好了我们再看一下是如何更新wake_up_queue_结构的。 关键就在于前面我们多次看到的SequenceManagerImpl::MoveReadyDelayedTasksToWorkQueues函数。
base/task/sequence_manager/sequence_manager_impl.cc
void SequenceManagerImpl::MoveReadyDelayedTasksToWorkQueues(LazyNow* lazy_now) {TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("sequence_manager"),"SequenceManagerImpl::MoveReadyDelayedTasksToWorkQueues");EnqueueOrder delayed_task_group_enqueue_order = GetNextSequenceNumber();main_thread_only().wake_up_queue->MoveReadyDelayedTasksToWorkQueues(lazy_now, delayed_task_group_enqueue_order);......
}
调用main_thread_only().wake_up_queue->MoveReadyDelayedTasksToWorkQueues() 方法
src/base/task/sequence_manager/wake_up_queue.cc
void WakeUpQueue::MoveReadyDelayedTasksToWorkQueues(LazyNow* lazy_now,EnqueueOrder enqueue_order) {DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);bool update_needed = false;while (!wake_up_queue_.empty() &&wake_up_queue_.top().wake_up.earliest_time() <= lazy_now->Now()) {// wake_up_queue_ 最近的延期执行的任务已经可以执行, 则调用queue->OnWakeUp(lazy_now, enqueue_order) 唤醒TaskQueueImpl, 这会导致wake_up_queue_发生变化internal::TaskQueueImpl* queue = wake_up_queue_.top().queue;// OnWakeUp() is expected to update the next wake-up for this queue with// SetNextWakeUpForQueue(), thus allowing us to make progress.queue->OnWakeUp(lazy_now, enqueue_order);update_needed = true;}if (!update_needed || wake_up_queue_.empty())return;// If any queue was notified, possibly update following queues. This ensures// the wake up is up to date, which is necessary because calling OnWakeUp() on// a throttled queue may affect state that is shared between other related// throttled queues. The wake up for an affected queue might be pushed back// and needs to be updated. This is done lazily only once the related queue// becomes the next one to wake up, since that wake up can't be moved up.// `wake_up_queue_` is non-empty here, per the condition above.// // 更新其他Queue的wakeup 信息internal::TaskQueueImpl* queue = wake_up_queue_.top().queue;queue->UpdateWakeUp(lazy_now);// 继续引起改变,调用 queue->UpdateWakeUp(lazy_now),直到不再发生变化while (!wake_up_queue_.empty()) {internal::TaskQueueImpl* old_queue =std::exchange(queue, wake_up_queue_.top().queue);if (old_queue == queue)break;queue->UpdateWakeUp(lazy_now);}
}
WakeUpQueue::MoveReadyDelayedTasksToWorkQueues函数总体分为2步执行
1、对于有TaskQueueImpl延时任务达到可执行状态的情况下调用TaskQueueImpl.OnWakeUp() 函数激活队列。这可能导致wake_up_queue_ 排序发生变化
2、由于1可能导致wake_up_queue_排序发生变化,需要更新其他TaskQueueImpl 的wakup信息,从而保持wake_up_queue_ 排序正确。
base/task/sequence_manager/task_queue_impl.cc
void TaskQueueImpl::OnWakeUp(LazyNow* lazy_now, EnqueueOrder enqueue_order) {MoveReadyDelayedTasksToWorkQueue(lazy_now, enqueue_order);if (main_thread_only().throttler) {main_thread_only().throttler->OnWakeUp(lazy_now);}
}void TaskQueueImpl::MoveReadyDelayedTasksToWorkQueue(LazyNow* lazy_now,EnqueueOrder enqueue_order) {......StackVector<Task, 8> tasks_to_delete;// 1 从 MainThreadOnly.delayed_incoming_queue里面找到准备就绪的延时任务放到// MainThreadOnly.delayed_work_queue, 同时删除取消任务while (!main_thread_only().delayed_incoming_queue.empty()) {const Task& task = main_thread_only().delayed_incoming_queue.top();CHECK(task.task);// Leave the top task alone if it hasn't been canceled and it is not ready.const bool is_cancelled = task.task.IsCancelled();if (!is_cancelled && task.earliest_delayed_run_time() > lazy_now->Now())break;Task ready_task = main_thread_only().delayed_incoming_queue.take_top();if (is_cancelled) {tasks_to_delete->push_back(std::move(ready_task));continue;}......ready_task.set_enqueue_order(enqueue_order);ActivateDelayedFenceIfNeeded(ready_task);delayed_work_queue_task_pusher.Push(std::move(ready_task));}// Explicitly delete tasks last.tasks_to_delete->clear();
// 调用UpdateWakeUpUpdateWakeUp(lazy_now);
}
这个函数主要有两步操作
1、从 MainThreadOnly.delayed_incoming_queue里面找到准备就绪的延时任务放到MainThreadOnly.delayed_work_queue, 同时删除取消任务
2、UpdateWakeUp(lazy_now) ,所以我们可以看到无论是TaskQueueImpl::OnWakeUp() 和 WakeUpQueue::MoveReadyDelayedTasksToWorkQueues() 都会调用Queue的UpdateWakeUp()方法来更新wakup信息。
好了我们来分析 UpdateWakeUp(lazy_now)
absl::optional<WakeUp> TaskQueueImpl::GetNextDesiredWakeUp() {// Note we don't scheduled a wake-up for disabled queues.if (main_thread_only().delayed_incoming_queue.empty() || !IsQueueEnabled())return absl::nullopt;......const auto& top_task = main_thread_only().delayed_incoming_queue.top();return WakeUp{top_task.delayed_run_time, top_task.leeway, resolution,top_task.delay_policy};
}void TaskQueueImpl::UpdateWakeUp(LazyNow* lazy_now) {absl::optional<WakeUp> wake_up = GetNextDesiredWakeUp();if (main_thread_only().throttler && IsQueueEnabled()) {// GetNextAllowedWakeUp() may return a non-null wake_up even if |wake_up| is// nullopt, e.g. to throttle immediate tasks.wake_up = main_thread_only().throttler->GetNextAllowedWakeUp(lazy_now, wake_up, HasTaskToRunImmediatelyOrReadyDelayedTask());}SetNextWakeUp(lazy_now, wake_up);
}void TaskQueueImpl::SetNextWakeUp(LazyNow* lazy_now,absl::optional<WakeUp> wake_up) {if (main_thread_only().scheduled_wake_up == wake_up)return;main_thread_only().scheduled_wake_up = wake_up;main_thread_only().wake_up_queue->SetNextWakeUpForQueue(this, lazy_now,wake_up);
}
这里可以看到WakeUp信息通过TaskQueueImpl::MainThreadOnly->delayed_incoming_queue 获取最近可以执行的延时任务设置。最后调用WakeUpQueue->SetNextWakeUpForQueue() 方法更新WakeUpQueue->wake_up_queue_ 堆结构排序。
base/task/sequence_manager/wake_up_queue.cc
void WakeUpQueue::SetNextWakeUpForQueue(internal::TaskQueueImpl* queue,LazyNow* lazy_now,absl::optional<WakeUp> wake_up) {......absl::optional<WakeUp> previous_wake_up = GetNextDelayedWakeUp();absl::optional<WakeUpResolution> previous_queue_resolution;if (queue->heap_handle().IsValid()) {previous_queue_resolution =wake_up_queue_.at(queue->heap_handle()).wake_up.resolution;}if (wake_up) {// Insert a new wake-up into the heap.if (queue->heap_handle().IsValid()) { // 已经存在则更新// O(log n)wake_up_queue_.Replace(queue->heap_handle(), {wake_up.value(), queue});} else { // 不存在插入// O(log n)wake_up_queue_.insert({wake_up.value(), queue});}} else {// 移除// Remove a wake-up from heap if present.if (queue->heap_handle().IsValid())wake_up_queue_.erase(queue->heap_handle());}absl::optional<WakeUp> new_wake_up = GetNextDelayedWakeUp();......// 最近wakeup信息发生变化,通知消息循环修改超时时间if (new_wake_up != previous_wake_up)OnNextWakeUpChanged(lazy_now, GetNextDelayedWakeUp());
}
1、把最新的WakeUp信息添加到wake_up_queue_
2、如果新的WakeUp信息影响了wake_up_queue_排序,调用OnNextWakeUpChanged() 尝试更新消息循环的超时时间。
void DefaultWakeUpQueue::OnNextWakeUpChanged(LazyNow* lazy_now,absl::optional<WakeUp> wake_up) {sequence_manager_->SetNextWakeUp(lazy_now, wake_up);
}void SequenceManagerImpl::ScheduleWork() {controller_->ScheduleWork();
}void SequenceManagerImpl::SetNextWakeUp(LazyNow* lazy_now,absl::optional<WakeUp> wake_up) {auto next_wake_up = AdjustWakeUp(wake_up, lazy_now);if (next_wake_up && next_wake_up->is_immediate()) {ScheduleWork();} else {controller_->SetNextDelayedDoWork(lazy_now, next_wake_up);}
}
这里对于延时任务导致的WakeUp变化调用ThreadControllerWithMessagePumpImpl->SetNextDelayedDoWork()方法。立即执行任务则调用ThreadControllerWithMessagePumpImpl->ScheduleWork() 方法。
void ThreadControllerWithMessagePumpImpl::SetNextDelayedDoWork(LazyNow* lazy_now,absl::optional<WakeUp> wake_up) {......pump_->ScheduleDelayedWork({run_time, lazy_now->Now()});......}
}void ThreadControllerWithMessagePumpImpl::ScheduleWork() {base::internal::CheckedLock::AssertNoLockHeldOnCurrentThread();
......pump_->ScheduleWork();
......
}void MessagePumpLibevent::ScheduleWork() {
......// Tell libevent (in a threadsafe way) that it should break out of its loop.char buf = 0;long nwrite = HANDLE_EINTR(write(wakeup_pipe_in_, &buf, 1));DPCHECK(nwrite == 1 || errno == EAGAIN) << "nwrite:" << nwrite;
}void MessagePumpLibevent::ScheduleDelayedWork(const Delegate::NextWorkInfo& next_work_info) {}
可以看到立即执行任务导致的wakeup更新会通过写入wakeup_pipe_in_ 唤醒消息循环,而延时任务什么都不做。因为延时任务的wakeup更新是在消息线程里面,下一轮循环会自动设置超时时间。
到这里任务获取与执行阶段我们就分析完了。
三、任务投递
在任务获取阶段我们知道有四个队列,分别是
- MainThreadOnly->delayed_work_queue;
- MainThreadOnly-> immediate_work_queue;
- MainThreadOnly-> delayed_incoming_queue;
- AnyThread->immediate_incoming_queue;
所以我们的任务投递其实就是向MainThreadOnly-> delayed_incoming_queue 和 AnyThread->immediate_incoming_queue 投递任务。 并且如果有新的任务,我们需要唤醒消息循环,并且更新wakeup信息。
那么我们如何投递任务,官方文档里面这么写。
Posting to the Main Thread or to the IO Thread in the Browser Process
To post tasks to the main thread or to the IO thread, use content::GetUIThreadTaskRunner({}) or content::GetIOThreadTaskRunner({}) from content/public/browser/browser_thread.h
You may provide additional BrowserTaskTraits as a parameter to those methods though this is generally still uncommon in BrowserThreads and should be reserved for advanced use cases.
There‘s an ongoing migration (task APIs v3) away from the previous base-API-with-traits which you may still find throughout the codebase (it’s equivalent):
base::PostTask(FROM_HERE, {content::BrowserThread::UI}, ...);base::CreateSingleThreadTaskRunner({content::BrowserThread::IO})->PostTask(FROM_HERE, ...);
我们以content::GetUIThreadTaskRunner({}).PostTask(const Location& from_here, OnceClosure task) 这种方式投递任务为例进行分析。
content/browser/browser_thread_impl.cc
scoped_refptr<base::SingleThreadTaskRunner>
BrowserTaskExecutor::GetUIThreadTaskRunner(const BrowserTaskTraits& traits) {return Get()->GetTaskRunner(BrowserThread::UI, traits);
}content/browser/scheduler/browser_task_executor.cc
scoped_refptr<base::SingleThreadTaskRunner>
BrowserTaskExecutor::GetUIThreadTaskRunner(const BrowserTaskTraits& traits) {return Get()->GetTaskRunner(BrowserThread::UI, traits);
}scoped_refptr<base::SingleThreadTaskRunner>
BaseBrowserTaskExecutor::GetTaskRunner(BrowserThread::ID identifier,const BrowserTaskTraits& traits) const {const QueueType queue_type = GetQueueType(traits);switch (identifier) {case BrowserThread::UI: {return browser_ui_thread_handle_->GetBrowserTaskRunner(queue_type);}case BrowserThread::IO:return browser_io_thread_handle_->GetBrowserTaskRunner(queue_type);case BrowserThread::ID_COUNT:NOTREACHED();}return nullptr;
}BrowserTaskExecutor::BrowserTaskExecutor(std::unique_ptr<BrowserUIThreadScheduler> browser_ui_thread_scheduler,std::unique_ptr<BrowserIOThreadDelegate> browser_io_thread_delegate): ui_thread_executor_(std::make_unique<UIThreadExecutor>(std::move(browser_ui_thread_scheduler))),io_thread_executor_(std::make_unique<IOThreadExecutor>(std::move(browser_io_thread_delegate))) {browser_ui_thread_handle_ = ui_thread_executor_->GetUIThreadHandle();browser_io_thread_handle_ = io_thread_executor_->GetIOThreadHandle();ui_thread_executor_->SetIOThreadHandle(browser_io_thread_handle_);io_thread_executor_->SetUIThreadHandle(browser_ui_thread_handle_);
}
首先获取TaskRunner 实例,然后使用TaskRunner 投递任务。 通过一系列转发,函数实际使用browser_ui_thread_handle_->GetBrowserTaskRunner(queue_type) 获取的TaskRunner。那么browser_ui_thread_handle_是在哪里创建的?
content/browser/scheduler/browser_task_executor.cc
BrowserTaskExecutor::UIThreadExecutor::UIThreadExecutor(std::unique_ptr<BrowserUIThreadScheduler> browser_ui_thread_scheduler): browser_ui_thread_scheduler_(std::move(browser_ui_thread_scheduler)) {browser_ui_thread_handle_ = browser_ui_thread_scheduler_->GetHandle();
}scoped_refptr<BrowserUIThreadScheduler::Handle>
BrowserTaskExecutor::UIThreadExecutor::GetUIThreadHandle() {return browser_ui_thread_handle_;
}
}
也就是说handle是通过browser_ui_thread_scheduler_->GetHandle() 获取的。
// static
void BrowserTaskExecutor::Create() {DCHECK(!base::SingleThreadTaskRunner::HasCurrentDefault());CreateInternal(std::make_unique<BrowserUIThreadScheduler>(),std::make_unique<BrowserIOThreadDelegate>());
}// static
void BrowserTaskExecutor::CreateInternal(std::unique_ptr<BrowserUIThreadScheduler> browser_ui_thread_scheduler,std::unique_ptr<BrowserIOThreadDelegate> browser_io_thread_delegate) {DCHECK(!g_browser_task_executor);g_browser_task_executor =new BrowserTaskExecutor(std::move(browser_ui_thread_scheduler),std::move(browser_io_thread_delegate));g_browser_task_executor->browser_ui_thread_handle_->EnableAllExceptBestEffortQueues();}
browser_ui_thread_scheduler_为BrowserUIThreadScheduler的实例。我们看下BrowserUIThreadScheduler的构造
BrowserUIThreadScheduler::BrowserUIThreadScheduler(): owned_sequence_manager_(base::sequence_manager::CreateUnboundSequenceManager(base::sequence_manager::SequenceManager::Settings::Builder().SetMessagePumpType(base::MessagePumpType::UI).SetCanRunTasksByBatches(true).SetPrioritySettings(internal::CreateBrowserTaskPrioritySettings()).Build())),task_queues_(BrowserThread::UI, owned_sequence_manager_.get()),queue_enabled_voters_(task_queues_.CreateQueueEnabledVoters()),handle_(task_queues_.GetHandle()) {CommonSequenceManagerSetup(owned_sequence_manager_.get());owned_sequence_manager_->SetDefaultTaskRunner(handle_->GetDefaultTaskRunner());owned_sequence_manager_->BindToMessagePump(base::MessagePump::Create(base::MessagePumpType::UI));g_browser_ui_thread_scheduler = this;
}
看一下base::sequence_manager::CreateUnboundSequenceManager函数
std::unique_ptr<SequenceManager> CreateUnboundSequenceManager(SequenceManager::Settings settings) {return internal::SequenceManagerImpl::CreateUnbound(std::move(settings));
}std::unique_ptr<SequenceManagerImpl> SequenceManagerImpl::CreateUnbound(SequenceManager::Settings settings) {auto thread_controller =ThreadControllerWithMessagePumpImpl::CreateUnbound(settings);return WrapUnique(new SequenceManagerImpl(std::move(thread_controller),std::move(settings)));
最终构造了一个ThreadControllerWithMessagePumpImpl实例和SequenceManagerImpl实例子, 这两个对象通过前边分析我们已经比较熟悉了。
SequenceManagerImpl::SequenceManagerImpl(std::unique_ptr<internal::ThreadController> controller,SequenceManager::Settings settings): associated_thread_(controller->GetAssociatedThread()),controller_(std::move(controller)),settings_(std::move(settings)),metric_recording_settings_(InitializeMetricRecordingSettings(settings_.randomised_sampling_enabled)),add_queue_time_to_tasks_(settings_.add_queue_time_to_tasks),empty_queues_to_reload_(associated_thread_),main_thread_only_(this, associated_thread_, settings_, settings_.clock),clock_(settings_.clock) {......// 这里把ThreadControllerWithMessagePumpImpl实例的task_source // 设置成SequenceManagerImpl 实例子,和我们获取任务一节分析的一致。controller_->SetSequencedTaskSource(this);
}
这里把ThreadControllerWithMessagePumpImpl实例的task_source 设置成SequenceManagerImpl 实例子,和我们获取任务一节分析的一致。
再来看看BrowserUIThreadScheduler::task_queues_构造
content/browser/scheduler/browser_task_queues.cc
BrowserTaskQueues::BrowserTaskQueues(BrowserThread::ID thread_id,base::sequence_manager::SequenceManager* sequence_manager) {for (size_t i = 0; i < queue_data_.size(); ++i) {queue_data_[i].task_queue = sequence_manager->CreateTaskQueue(base::sequence_manager::TaskQueue::Spec(GetTaskQueueName(thread_id, static_cast<QueueType>(i))));queue_data_[i].voter = queue_data_[i].task_queue->CreateQueueEnabledVoter();if (static_cast<QueueType>(i) != QueueType::kDefault) {queue_data_[i].voter->SetVoteToEnable(false);}}GetBrowserTaskQueue(QueueType::kUserVisible)->SetQueuePriority(BrowserTaskPriority::kLowPriority);// Best effort queueGetBrowserTaskQueue(QueueType::kBestEffort)->SetQueuePriority(BrowserTaskPriority::kBestEffortPriority);// User Input queueGetBrowserTaskQueue(QueueType::kUserInput)->SetQueuePriority(BrowserTaskPriority::kHighestPriority);GetBrowserTaskQueue(QueueType::kNavigationNetworkResponse)->SetQueuePriority(BrowserTaskPriority::kHighPriority);GetBrowserTaskQueue(QueueType::kServiceWorkerStorageControlResponse)->SetQueuePriority(BrowserTaskPriority::kHighestPriority);// Control queuecontrol_queue_ =sequence_manager->CreateTaskQueue(base::sequence_manager::TaskQueue::Spec(GetControlTaskQueueName(thread_id)));control_queue_->SetQueuePriority(BrowserTaskPriority::kControlPriority);// Run all pending queuerun_all_pending_tasks_queue_ =sequence_manager->CreateTaskQueue(base::sequence_manager::TaskQueue::Spec(GetRunAllPendingTaskQueueName(thread_id)));run_all_pending_tasks_queue_->SetQueuePriority(BrowserTaskPriority::kBestEffortPriority);handle_ = base::AdoptRef(new Handle(this));
}
调用了sequence_manager->CreateTaskQueue创建了多个Queue,每个Queue的设置了不同的优先级。并创建了Handle对象,这个Handle对象就是用于创建TaskRunner的。 我们先分析Queue的创建
scoped_refptr<TaskQueue> SequenceManagerImpl::CreateTaskQueue(const TaskQueue::Spec& spec) {return WrapRefCounted(new TaskQueue(CreateTaskQueueImpl(spec), spec));
}std::unique_ptr<internal::TaskQueueImpl>
SequenceManagerImpl::CreateTaskQueueImpl(const TaskQueue::Spec& spec) {DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);std::unique_ptr<internal::TaskQueueImpl> task_queue =std::make_unique<internal::TaskQueueImpl>(this,spec.non_waking ? main_thread_only().non_waking_wake_up_queue.get(): main_thread_only().wake_up_queue.get(),spec);main_thread_only().active_queues.insert(task_queue.get());main_thread_only().selector.AddQueue(task_queue.get(), settings().priority_settings.default_priority());return task_queue;
}
创建了TaskQueueImpl。这里第二个参数为WakeUpQueue,也就是管理该TaskQueueImpl的WakeUpQueue。
再将TaskQueueImpl实例传递给TaskQueue,明显的包装模式。
base/task/sequence_manager/task_queue.cc
TaskQueue::TaskQueue(std::unique_ptr<internal::TaskQueueImpl> impl,const TaskQueue::Spec& spec): impl_(std::move(impl)),sequence_manager_(impl_->GetSequenceManagerWeakPtr()),associated_thread_((impl_->sequence_manager())? impl_->sequence_manager()->associated_thread(): MakeRefCounted<internal::AssociatedThreadId>()),default_task_runner_(impl_->CreateTaskRunner(kTaskTypeNone)),name_(impl_->GetProtoName()) {}
这里通过TaskQueueImpl->CreateTaskRunner(kTaskTypeNone) 方法创建了一个TaskRunner, 作为default_task_runner_,后面会用到这个TaskRunner。 我们很快就会看到。
接下来我们来分析如何获取TaskRunner。先看Handle构造
content/browser/scheduler/browser_task_queues.cc
BrowserTaskQueues::Handle::Handle(BrowserTaskQueues* outer): outer_(outer),control_task_runner_(outer_->control_queue_->task_runner()),default_task_runner_(outer_->GetDefaultTaskQueue()->task_runner()),browser_task_runners_(outer_->CreateBrowserTaskRunners()) {}
这里初始化了三个TaskRunner,分别使用三个不同的TaskQueueImpl获取。也就是对应了不同的优先级。
content/browser/scheduler/browser_task_queues.cc
const scoped_refptr<base::SingleThreadTaskRunner>& GetBrowserTaskRunner(QueueType queue_type) const {return browser_task_runners_[static_cast<size_t>(queue_type)];}
GetBrowserTaskRunner 则根据不同的QueueType选择TaskRunner。
我们看一下TaskRunner的创建。
base/task/sequence_manager/task_queue.cc
const scoped_refptr<SingleThreadTaskRunner>& task_runner() const {return default_task_runner_;}
这里返回default_task_runner_ 就是前面通过TaskQueueImpl->CreateTaskRunner(kTaskTypeNone)方法创建的。
scoped_refptr<SingleThreadTaskRunner> TaskQueueImpl::CreateTaskRunner(TaskType task_type) const {return MakeRefCounted<TaskRunner>(task_poster_, associated_thread_,task_type);
}TaskQueueImpl::TaskRunner::TaskRunner(scoped_refptr<GuardedTaskPoster> task_poster,scoped_refptr<const AssociatedThreadId> associated_thread,TaskType task_type): task_poster_(std::move(task_poster)),associated_thread_(std::move(associated_thread)),task_type_(task_type) {}
到这里TaskRunner就创建好了。
终于到了任务投递环节。
bool TaskQueueImpl::TaskRunner::PostDelayedTask(const Location& location,OnceClosure callback,TimeDelta delay) {return task_poster_->PostTask(PostedTask(this, std::move(callback), location,delay, Nestable::kNestable,task_type_));
}bool TaskQueueImpl::GuardedTaskPoster::PostTask(PostedTask task) {......outer_->PostTask(std::move(task));return true;
}
这里task_poster_ 实际调用TaskQueueImpl(task_poster_->outer_ 就是TaskQueueImpl)的PostTask()方法。
src/base/task/sequence_manager/task_queue_impl.cc
void TaskQueueImpl::PostTask(PostedTask task) {......if (!task.is_delayed()) {PostImmediateTaskImpl(std::move(task), current_thread);} else {PostDelayedTaskImpl(std::move(task), current_thread);}
}
这里判断是延时任务调用PostDelayedTaskImpl(std::move(task), current_thread) 添加。 如果是立即执行的任务则调用PostImmediateTaskImpl(std::move(task), current_thread) 添加。 我们先分析PostDelayedTaskImpl 添加延期任务。
void TaskQueueImpl::PostDelayedTaskImpl(PostedTask posted_task,CurrentThread current_thread) {// Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167// for details.CHECK(posted_task.callback);if (current_thread == CurrentThread::kMainThread) {.....PushOntoDelayedIncomingQueueFromMainThread(std::move(pending_task), &lazy_now,/* notify_task_annotator */ true);} else {.....PushOntoDelayedIncomingQueue(MakeDelayedTask(std::move(posted_task), &lazy_now));}
}
如果当前线程就是目标线程,调用PushOntoDelayedIncomingQueueFromMainThread() 直接添加延时任务,否则调用PushOntoDelayedIncomingQueue() 添加异步任务。
void TaskQueueImpl::PushOntoDelayedIncomingQueueFromMainThread(Task pending_task,LazyNow* lazy_now,bool notify_task_annotator) {......main_thread_only().delayed_incoming_queue.push(std::move(pending_task));UpdateWakeUp(lazy_now);TraceQueueSize();
}
PushOntoDelayedIncomingQueueFromMainThread函数在目标线程执行,直接添加任务到TaskQueueImpl::MainThreadOnly->delayed_incoming_queue 队列,并调用TaskQueueImpl->UpdateWakeUp(lazy_now) 更新wakup信息, 这个函数我们前面已经分析过了。 这里也验证了我们的说法, 延期任务的改变不需要更新消息循环的超时时间。
再看看在非目标线程里面添加延期任务
void TaskQueueImpl::PushOntoDelayedIncomingQueue(Task pending_task) {sequence_manager_->WillQueueTask(&pending_task);
......// TODO(altimin): Add a copy method to Task to capture metadata here.auto task_runner = pending_task.task_runner;const auto task_type = pending_task.task_type;PostImmediateTaskImpl(PostedTask(std::move(task_runner),BindOnce(&TaskQueueImpl::ScheduleDelayedWorkTask,Unretained(this), std::move(pending_task)),FROM_HERE, TimeDelta(), Nestable::kNonNestable, task_type),CurrentThread::kNotMainThread);
}
PushOntoDelayedIncomingQueue 函数在非目标线程执行, 调用PostImmediateTaskImpl 添加了一个立即执行任务,该任务回调TaskQueueImpl::ScheduleDelayedWorkTask方法,参数是延期执行任务。 我们可以猜测,最终也是在目标线程执行PushOntoDelayedIncomingQueueFromMainThread(), 验证一下
void TaskQueueImpl::ScheduleDelayedWorkTask(Task pending_task) {DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);sequence_manager_->MaybeAddLeewayToTask(pending_task);TimeTicks now = sequence_manager_->main_thread_clock()->NowTicks();LazyNow lazy_now(now);// 延期任务已经到期,直接添加到 main_thread_only().delayed_incoming_queue,然后调用MoveReadyDelayedTasksToWorkQueue() 移动到delayed_work_queueif (pending_task.earliest_delayed_run_time() <= now) {// If |delayed_run_time| is in the past then push it onto the work queue// immediately. To ensure the right task ordering we need to temporarily// push it onto the |delayed_incoming_queue|.pending_task.delayed_run_time = now;main_thread_only().delayed_incoming_queue.push(std::move(pending_task));MoveReadyDelayedTasksToWorkQueue(&lazy_now, sequence_manager_->GetNextSequenceNumber());} else {// 调用PushOntoDelayedIncomingQueueFromMainThread 添加到delayed_incoming_queue// If |delayed_run_time| is in the future we can queue it as normal.PushOntoDelayedIncomingQueueFromMainThread(std::move(pending_task),&lazy_now, false);}......
}
果然和我们猜测的一样。
最后我们来看立即任务的添加PostImmediateTaskImpl
void TaskQueueImpl::PostImmediateTaskImpl(PostedTask task,CurrentThread current_thread) {// Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167// for details.CHECK(task.callback);bool should_schedule_work = false;{// TODO(alexclarke): Maybe add a main thread only immediate_incoming_queue// See https://crbug.com/901800// 获取anythread 锁base::internal::CheckedAutoLock lock(any_thread_lock_);......// 添加到any_thread_.immediate_incoming_queue队列any_thread_.immediate_incoming_queue.push_back(Task(std::move(task), sequence_number, sequence_number, queue_time));......// 设置标志,表示有新的立即执行任务。if (was_immediate_incoming_queue_empty &&any_thread_.immediate_work_queue_empty) {empty_queues_to_reload_handle_.SetActive(true);should_schedule_work =any_thread_.post_immediate_task_should_schedule_work;}}......// 如果需要唤醒消息循环if (should_schedule_work)sequence_manager_->ScheduleWork();......
}
函数按照如下三个步骤执行:
1、获取anythread 锁
2、添加到any_thread_.immediate_incoming_queue队列
3、如果需要唤醒消息循环
最后
到这里就分析完了,还是非常复杂的。有些过度设计了。 不如Android的消息循环简洁。性能上估计也不会有太大优势。聪明的人实现高效简洁的东西。