blob: 912ffc7a6230763d75950683de250162789cf735 [file] [log] [blame]
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/task/sequence_manager/thread_controller_with_message_pump_impl.h"
#include "base/auto_reset.h"
#include "base/time/tick_clock.h"
#include "base/trace_event/trace_event.h"
#include "build/build_config.h"
namespace base {
namespace sequence_manager {
namespace internal {
namespace {
// Returns |next_run_time| capped at 1 day from |lazy_now|. This is used to
// mitigate https://crbug.com/850450 where some platforms are unhappy with
// delays > 100,000,000 seconds. In practice, a diagnosis metric showed that no
// sleep > 1 hour ever completes (always interrupted by an earlier MessageLoop
// event) and 99% of completed sleeps are the ones scheduled for <= 1 second.
// Details @ https://crrev.com/c/1142589.
TimeTicks CapAtOneDay(TimeTicks next_run_time, LazyNow* lazy_now) {
return std::min(next_run_time, lazy_now->Now() + TimeDelta::FromDays(1));
}
} // namespace
ThreadControllerWithMessagePumpImpl::ThreadControllerWithMessagePumpImpl(
std::unique_ptr<MessagePump> message_pump,
const TickClock* time_source)
: associated_thread_(AssociatedThreadId::CreateUnbound()),
pump_(std::move(message_pump)),
time_source_(time_source) {
scoped_set_sequence_local_storage_map_for_current_thread_ = std::make_unique<
base::internal::ScopedSetSequenceLocalStorageMapForCurrentThread>(
&sequence_local_storage_map_);
RunLoop::RegisterDelegateForCurrentThread(this);
}
ThreadControllerWithMessagePumpImpl::~ThreadControllerWithMessagePumpImpl() {
// Destructors of RunLoop::Delegate and ThreadTaskRunnerHandle
// will do all the clean-up.
// ScopedSetSequenceLocalStorageMapForCurrentThread destructor will
// de-register the current thread as a sequence.
}
ThreadControllerWithMessagePumpImpl::MainThreadOnly::MainThreadOnly() = default;
ThreadControllerWithMessagePumpImpl::MainThreadOnly::~MainThreadOnly() =
default;
void ThreadControllerWithMessagePumpImpl::SetSequencedTaskSource(
SequencedTaskSource* task_source) {
DCHECK(task_source);
DCHECK(!main_thread_only().task_source);
main_thread_only().task_source = task_source;
}
void ThreadControllerWithMessagePumpImpl::SetMessageLoop(
MessageLoop* message_loop) {
NOTREACHED()
<< "ThreadControllerWithMessagePumpImpl doesn't support MessageLoops";
}
void ThreadControllerWithMessagePumpImpl::SetWorkBatchSize(
int work_batch_size) {
DCHECK_GE(work_batch_size, 1);
main_thread_only().work_batch_size = work_batch_size;
}
void ThreadControllerWithMessagePumpImpl::SetTimerSlack(
TimerSlack timer_slack) {
pump_->SetTimerSlack(timer_slack);
}
void ThreadControllerWithMessagePumpImpl::WillQueueTask(
PendingTask* pending_task) {
task_annotator_.WillQueueTask("ThreadController::Task", pending_task);
}
void ThreadControllerWithMessagePumpImpl::ScheduleWork() {
// This assumes that cross thread ScheduleWork isn't frequent enough to
// warrant ScheduleWork deduplication.
if (RunsTasksInCurrentSequence()) {
// Don't post a DoWork if there's an immediate DoWork in flight or if we're
// inside a top level DoWork. We can rely on a continuation being posted as
// needed.
if (main_thread_only().immediate_do_work_posted || InTopLevelDoWork())
return;
main_thread_only().immediate_do_work_posted = true;
}
pump_->ScheduleWork();
}
void ThreadControllerWithMessagePumpImpl::SetNextDelayedDoWork(
LazyNow* lazy_now,
TimeTicks run_time) {
DCHECK_LT(time_source_->NowTicks(), run_time);
if (main_thread_only().next_delayed_do_work == run_time)
return;
// Don't post a DoWork if there's an immediate DoWork in flight or if we're
// inside a top level DoWork. We can rely on a continuation being posted as
// needed.
if (main_thread_only().immediate_do_work_posted || InTopLevelDoWork())
return;
run_time = CapAtOneDay(run_time, lazy_now);
main_thread_only().next_delayed_do_work = run_time;
pump_->ScheduleDelayedWork(run_time);
}
const TickClock* ThreadControllerWithMessagePumpImpl::GetClock() {
return time_source_;
}
bool ThreadControllerWithMessagePumpImpl::RunsTasksInCurrentSequence() {
return associated_thread_->thread_id == PlatformThread::CurrentId();
}
void ThreadControllerWithMessagePumpImpl::SetDefaultTaskRunner(
scoped_refptr<SingleThreadTaskRunner> task_runner) {
main_thread_only().thread_task_runner_handle =
std::make_unique<ThreadTaskRunnerHandle>(task_runner);
}
void ThreadControllerWithMessagePumpImpl::RestoreDefaultTaskRunner() {
// There's no default task runner unlike with the MessageLoop.
main_thread_only().thread_task_runner_handle.reset();
}
void ThreadControllerWithMessagePumpImpl::AddNestingObserver(
RunLoop::NestingObserver* observer) {
DCHECK(!main_thread_only().nesting_observer);
DCHECK(observer);
main_thread_only().nesting_observer = observer;
RunLoop::AddNestingObserverOnCurrentThread(this);
}
void ThreadControllerWithMessagePumpImpl::RemoveNestingObserver(
RunLoop::NestingObserver* observer) {
DCHECK_EQ(main_thread_only().nesting_observer, observer);
main_thread_only().nesting_observer = nullptr;
RunLoop::RemoveNestingObserverOnCurrentThread(this);
}
const scoped_refptr<AssociatedThreadId>&
ThreadControllerWithMessagePumpImpl::GetAssociatedThread() const {
return associated_thread_;
}
bool ThreadControllerWithMessagePumpImpl::DoWork() {
base::TimeTicks next_run_time;
main_thread_only().immediate_do_work_posted = false;
return DoWorkImpl(&next_run_time);
}
bool ThreadControllerWithMessagePumpImpl::DoDelayedWork(
TimeTicks* next_run_time) {
main_thread_only().next_delayed_do_work = TimeTicks::Max();
return DoWorkImpl(next_run_time);
}
bool ThreadControllerWithMessagePumpImpl::DoWorkImpl(
base::TimeTicks* next_run_time) {
DCHECK(main_thread_only().task_source);
bool task_ran = false;
main_thread_only().do_work_running_count++;
for (int i = 0; i < main_thread_only().work_batch_size; i++) {
Optional<PendingTask> task = main_thread_only().task_source->TakeTask();
if (!task)
break;
TRACE_TASK_EXECUTION("ThreadController::Task", *task);
task_annotator_.RunTask("ThreadController::Task", &*task);
task_ran = true;
main_thread_only().task_source->DidRunTask();
// When Quit() is called we must stop running the batch because the caller
// expects per-task granularity.
if (main_thread_only().quit_do_work)
break;
}
main_thread_only().do_work_running_count--;
if (main_thread_only().quit_do_work) {
main_thread_only().quit_do_work = false;
return task_ran;
}
LazyNow lazy_now(time_source_);
TimeDelta do_work_delay =
main_thread_only().task_source->DelayTillNextTask(&lazy_now);
DCHECK_GE(do_work_delay, TimeDelta());
// Schedule a continuation.
// TODO(altimin, gab): Make this more efficient by merging DoWork
// and DoDelayedWork and allowing returing base::TimeTicks() when we have
// immediate work.
if (do_work_delay.is_zero()) {
// Need to run new work immediately, but due to the contract of DoWork we
// only need to return true to ensure that happens.
*next_run_time = lazy_now.Now();
main_thread_only().immediate_do_work_posted = true;
return true;
} else if (do_work_delay != TimeDelta::Max()) {
*next_run_time = CapAtOneDay(lazy_now.Now() + do_work_delay, &lazy_now);
// Cancels any previously scheduled delayed wake-ups.
pump_->ScheduleDelayedWork(*next_run_time);
} else {
*next_run_time = base::TimeTicks::Max();
}
return task_ran;
}
bool ThreadControllerWithMessagePumpImpl::InTopLevelDoWork() const {
return main_thread_only().do_work_running_count >
main_thread_only().nesting_depth;
}
bool ThreadControllerWithMessagePumpImpl::DoIdleWork() {
// RunLoop::Delegate knows whether we called Run() or RunUntilIdle().
if (ShouldQuitWhenIdle())
Quit();
#if defined(OS_WIN)
bool need_high_res_mode =
main_thread_only().task_source->HasPendingHighResolutionTasks();
if (main_thread_only().in_high_res_mode != need_high_res_mode) {
// On Windows we activate the high resolution timer so that the wait
// _if_ triggered by the timer happens with good resolution. If we don't
// do this the default resolution is 15ms which might not be acceptable
// for some tasks.
main_thread_only().in_high_res_mode = need_high_res_mode;
Time::ActivateHighResolutionTimer(need_high_res_mode);
}
#endif // defined(OS_WIN)
return false;
}
void ThreadControllerWithMessagePumpImpl::Run(bool application_tasks_allowed) {
// No system messages are being processed by this class.
DCHECK(application_tasks_allowed);
// MessagePump::Run() blocks until Quit() called, but previously started
// Run() calls continue to block.
pump_->Run(this);
}
void ThreadControllerWithMessagePumpImpl::OnBeginNestedRunLoop() {
main_thread_only().nesting_depth++;
if (main_thread_only().nesting_observer)
main_thread_only().nesting_observer->OnBeginNestedRunLoop();
}
void ThreadControllerWithMessagePumpImpl::OnExitNestedRunLoop() {
main_thread_only().nesting_depth--;
DCHECK_GE(main_thread_only().nesting_depth, 0);
if (main_thread_only().nesting_observer)
main_thread_only().nesting_observer->OnExitNestedRunLoop();
}
void ThreadControllerWithMessagePumpImpl::Quit() {
// Interrupt a batch of work.
if (InTopLevelDoWork())
main_thread_only().quit_do_work = true;
// If we're in a nested RunLoop, continuation will be posted if necessary.
pump_->Quit();
}
void ThreadControllerWithMessagePumpImpl::EnsureWorkScheduled() {
main_thread_only().immediate_do_work_posted = true;
ScheduleWork();
}
} // namespace internal
} // namespace sequence_manager
} // namespace base