blob: a488602eb2f2eb9885ac8ad71db2cdc9763c46b6 [file] [log] [blame]
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/task/sequence_manager/thread_controller_with_message_pump_impl.h"
#include "base/auto_reset.h"
#include "base/time/tick_clock.h"
#include "base/trace_event/trace_event.h"
namespace base {
namespace sequence_manager {
namespace internal {
ThreadControllerWithMessagePumpImpl::ThreadControllerWithMessagePumpImpl(
std::unique_ptr<MessagePump> message_pump,
const TickClock* time_source)
: associated_thread_(AssociatedThreadId::CreateUnbound()),
pump_(std::move(message_pump)),
time_source_(time_source) {
scoped_set_sequence_local_storage_map_for_current_thread_ = std::make_unique<
base::internal::ScopedSetSequenceLocalStorageMapForCurrentThread>(
&sequence_local_storage_map_);
RunLoop::RegisterDelegateForCurrentThread(this);
}
ThreadControllerWithMessagePumpImpl::~ThreadControllerWithMessagePumpImpl() {
// Destructors of RunLoop::Delegate and ThreadTaskRunnerHandle
// will do all the clean-up.
// ScopedSetSequenceLocalStorageMapForCurrentThread destructor will
// de-register the current thread as a sequence.
}
ThreadControllerWithMessagePumpImpl::MainThreadOnly::MainThreadOnly() = default;
ThreadControllerWithMessagePumpImpl::MainThreadOnly::~MainThreadOnly() =
default;
void ThreadControllerWithMessagePumpImpl::SetSequencedTaskSource(
SequencedTaskSource* task_source) {
DCHECK(task_source);
DCHECK(!main_thread_only().task_source);
main_thread_only().task_source = task_source;
}
void ThreadControllerWithMessagePumpImpl::SetMessageLoop(
MessageLoop* message_loop) {
NOTREACHED()
<< "ThreadControllerWithMessagePumpImpl doesn't support MessageLoops";
}
void ThreadControllerWithMessagePumpImpl::SetWorkBatchSize(
int work_batch_size) {
DCHECK_GE(work_batch_size, 1);
main_thread_only().batch_size = work_batch_size;
}
void ThreadControllerWithMessagePumpImpl::SetTimerSlack(
TimerSlack timer_slack) {
pump_->SetTimerSlack(timer_slack);
}
void ThreadControllerWithMessagePumpImpl::WillQueueTask(
PendingTask* pending_task) {
task_annotator_.WillQueueTask("ThreadController::Task", pending_task);
}
void ThreadControllerWithMessagePumpImpl::ScheduleWork() {
pump_->ScheduleWork();
}
void ThreadControllerWithMessagePumpImpl::SetNextDelayedDoWork(
LazyNow* lazy_now,
TimeTicks run_time) {
// Since this method must be called on the main thread, we're probably
// inside of DoWork() except some initialization code.
// DoWork() will schedule next wake-up if necessary.
if (is_doing_work())
return;
DCHECK_LT(time_source_->NowTicks(), run_time);
pump_->ScheduleDelayedWork(run_time);
}
const TickClock* ThreadControllerWithMessagePumpImpl::GetClock() {
return time_source_;
}
bool ThreadControllerWithMessagePumpImpl::RunsTasksInCurrentSequence() {
return associated_thread_->thread_id == PlatformThread::CurrentId();
}
void ThreadControllerWithMessagePumpImpl::SetDefaultTaskRunner(
scoped_refptr<SingleThreadTaskRunner> task_runner) {
main_thread_only().thread_task_runner_handle =
std::make_unique<ThreadTaskRunnerHandle>(task_runner);
}
void ThreadControllerWithMessagePumpImpl::RestoreDefaultTaskRunner() {
// There's no default task runner unlike with the MessageLoop.
main_thread_only().thread_task_runner_handle.reset();
}
void ThreadControllerWithMessagePumpImpl::AddNestingObserver(
RunLoop::NestingObserver* observer) {
DCHECK_LE(main_thread_only().run_depth, 1);
DCHECK(!main_thread_only().nesting_observer);
DCHECK(observer);
main_thread_only().nesting_observer = observer;
}
void ThreadControllerWithMessagePumpImpl::RemoveNestingObserver(
RunLoop::NestingObserver* observer) {
DCHECK_EQ(main_thread_only().nesting_observer, observer);
main_thread_only().nesting_observer = nullptr;
}
const scoped_refptr<AssociatedThreadId>&
ThreadControllerWithMessagePumpImpl::GetAssociatedThread() const {
return associated_thread_;
}
bool ThreadControllerWithMessagePumpImpl::DoWork() {
DCHECK(main_thread_only().task_source);
bool task_ran = false;
{
AutoReset<int> do_work_scope(&main_thread_only().do_work_depth,
main_thread_only().do_work_depth + 1);
for (int i = 0; i < main_thread_only().batch_size; i++) {
Optional<PendingTask> task = main_thread_only().task_source->TakeTask();
if (!task)
break;
TRACE_TASK_EXECUTION("ThreadController::Task", *task);
task_annotator_.RunTask("ThreadController::Task", &*task);
task_ran = true;
main_thread_only().task_source->DidRunTask();
if (main_thread_only().quit_do_work) {
// When Quit() is called we must stop running the batch because
// caller expects per-task granularity.
main_thread_only().quit_do_work = false;
return true;
}
}
} // DoWorkScope.
LazyNow lazy_now(time_source_);
TimeDelta do_work_delay =
main_thread_only().task_source->DelayTillNextTask(&lazy_now);
DCHECK_GE(do_work_delay, TimeDelta());
// Schedule a continuation.
if (do_work_delay.is_zero()) {
// Need to run new work immediately.
pump_->ScheduleWork();
} else if (do_work_delay != TimeDelta::Max()) {
// Cancels any previously scheduled delayed wake-ups.
pump_->ScheduleDelayedWork(lazy_now.Now() + do_work_delay);
}
return task_ran;
}
bool ThreadControllerWithMessagePumpImpl::DoDelayedWork(
TimeTicks* next_run_time) {
// Delayed work is getting processed in DoWork().
return false;
}
bool ThreadControllerWithMessagePumpImpl::DoIdleWork() {
// RunLoop::Delegate knows whether we called Run() or RunUntilIdle().
if (ShouldQuitWhenIdle())
Quit();
return false;
}
void ThreadControllerWithMessagePumpImpl::Run(bool application_tasks_allowed) {
// No system messages are being processed by this class.
DCHECK(application_tasks_allowed);
// We already have a MessagePump::Run() running, so we're in a nested RunLoop.
if (main_thread_only().run_depth > 0 && main_thread_only().nesting_observer)
main_thread_only().nesting_observer->OnBeginNestedRunLoop();
{
AutoReset<int> run_scope(&main_thread_only().run_depth,
main_thread_only().run_depth + 1);
// MessagePump::Run() blocks until Quit() called, but previously started
// Run() calls continue to block.
pump_->Run(this);
}
// We'll soon continue to run an outer MessagePump::Run() loop.
if (main_thread_only().run_depth > 0 && main_thread_only().nesting_observer)
main_thread_only().nesting_observer->OnExitNestedRunLoop();
}
void ThreadControllerWithMessagePumpImpl::Quit() {
// Interrupt a batch of work.
if (is_doing_work())
main_thread_only().quit_do_work = true;
// If we're in a nested RunLoop, continuation will be posted if necessary.
pump_->Quit();
}
void ThreadControllerWithMessagePumpImpl::EnsureWorkScheduled() {
ScheduleWork();
}
} // namespace internal
} // namespace sequence_manager
} // namespace base