blob: 86f63272011d9b8b3846330c86d04c8bb6abf59a [file] [log] [blame]
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/task_scheduler/scheduler_worker_pool_impl.h"
#include <stddef.h>
#include <memory>
#include <unordered_set>
#include <vector>
#include "base/atomicops.h"
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/callback.h"
#include "base/macros.h"
#include "base/memory/ptr_util.h"
#include "base/memory/ref_counted.h"
#include "base/metrics/histogram.h"
#include "base/metrics/histogram_samples.h"
#include "base/metrics/statistics_recorder.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h"
#include "base/synchronization/waitable_event.h"
#include "base/task_runner.h"
#include "base/task_scheduler/delayed_task_manager.h"
#include "base/task_scheduler/scheduler_worker_pool_params.h"
#include "base/task_scheduler/sequence.h"
#include "base/task_scheduler/sequence_sort_key.h"
#include "base/task_scheduler/task_tracker.h"
#include "base/task_scheduler/test_task_factory.h"
#include "base/task_scheduler/test_utils.h"
#include "base/test/gtest_util.h"
#include "base/test/test_simple_task_runner.h"
#include "base/test/test_timeouts.h"
#include "base/threading/platform_thread.h"
#include "base/threading/simple_thread.h"
#include "base/threading/thread.h"
#include "base/threading/thread_checker_impl.h"
#include "base/threading/thread_local_storage.h"
#include "base/threading/thread_restrictions.h"
#include "base/time/time.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace base {
namespace internal {
namespace {
constexpr size_t kNumWorkersInWorkerPool = 4;
constexpr size_t kNumThreadsPostingTasks = 4;
constexpr size_t kNumTasksPostedPerThread = 150;
// This can't be lower because Windows' WaitableEvent wakes up too early when a
// small timeout is used. This results in many spurious wake ups before a worker
// is allowed to detach.
constexpr TimeDelta kReclaimTimeForDetachTests =
TimeDelta::FromMilliseconds(500);
constexpr TimeDelta kExtraTimeToWaitForDetach =
TimeDelta::FromSeconds(1);
using StandbyThreadPolicy = SchedulerWorkerPoolParams::StandbyThreadPolicy;
class TaskSchedulerWorkerPoolImplTest
: public testing::TestWithParam<test::ExecutionMode> {
protected:
TaskSchedulerWorkerPoolImplTest()
: service_thread_("TaskSchedulerServiceThread") {}
void SetUp() override {
CreateAndStartWorkerPool(TimeDelta::Max(), kNumWorkersInWorkerPool);
}
void TearDown() override {
service_thread_.Stop();
worker_pool_->WaitForAllWorkersIdleForTesting();
worker_pool_->JoinForTesting();
}
void CreateWorkerPool() {
ASSERT_FALSE(worker_pool_);
ASSERT_FALSE(delayed_task_manager_);
service_thread_.Start();
delayed_task_manager_ =
base::MakeUnique<DelayedTaskManager>(service_thread_.task_runner());
worker_pool_ = MakeUnique<SchedulerWorkerPoolImpl>(
"TestWorkerPool", ThreadPriority::NORMAL,
Bind(&TaskSchedulerWorkerPoolImplTest::ReEnqueueSequenceCallback,
Unretained(this)),
&task_tracker_, delayed_task_manager_.get());
ASSERT_TRUE(worker_pool_);
}
void StartWorkerPool(TimeDelta suggested_reclaim_time, size_t num_workers) {
ASSERT_TRUE(worker_pool_);
worker_pool_->Start(SchedulerWorkerPoolParams(
"TestWorkerPool", ThreadPriority::NORMAL, StandbyThreadPolicy::LAZY,
num_workers, suggested_reclaim_time));
}
void CreateAndStartWorkerPool(TimeDelta suggested_reclaim_time,
size_t num_workers) {
CreateWorkerPool();
StartWorkerPool(suggested_reclaim_time, num_workers);
}
std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool_;
TaskTracker task_tracker_;
Thread service_thread_;
std::unique_ptr<DelayedTaskManager> delayed_task_manager_;
private:
void ReEnqueueSequenceCallback(scoped_refptr<Sequence> sequence) {
// In production code, this callback would be implemented by the
// TaskScheduler which would first determine which PriorityQueue the
// sequence must be re-enqueued.
const SequenceSortKey sort_key(sequence->GetSortKey());
worker_pool_->ReEnqueueSequence(std::move(sequence), sort_key);
}
DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolImplTest);
};
scoped_refptr<TaskRunner> CreateTaskRunnerWithExecutionMode(
SchedulerWorkerPoolImpl* worker_pool,
test::ExecutionMode execution_mode) {
// Allow tasks posted to the returned TaskRunner to wait on a WaitableEvent.
const TaskTraits traits = TaskTraits().WithBaseSyncPrimitives();
switch (execution_mode) {
case test::ExecutionMode::PARALLEL:
return worker_pool->CreateTaskRunnerWithTraits(traits);
case test::ExecutionMode::SEQUENCED:
return worker_pool->CreateSequencedTaskRunnerWithTraits(traits);
default:
// Fall through.
break;
}
ADD_FAILURE() << "Unexpected ExecutionMode";
return nullptr;
}
using PostNestedTask = test::TestTaskFactory::PostNestedTask;
class ThreadPostingTasks : public SimpleThread {
public:
enum class WaitBeforePostTask {
NO_WAIT,
WAIT_FOR_ALL_WORKERS_IDLE,
};
// Constructs a thread that posts tasks to |worker_pool| through an
// |execution_mode| task runner. If |wait_before_post_task| is
// WAIT_FOR_ALL_WORKERS_IDLE, the thread waits until all workers in
// |worker_pool| are idle before posting a new task. If |post_nested_task| is
// YES, each task posted by this thread posts another task when it runs.
ThreadPostingTasks(SchedulerWorkerPoolImpl* worker_pool,
test::ExecutionMode execution_mode,
WaitBeforePostTask wait_before_post_task,
PostNestedTask post_nested_task)
: SimpleThread("ThreadPostingTasks"),
worker_pool_(worker_pool),
wait_before_post_task_(wait_before_post_task),
post_nested_task_(post_nested_task),
factory_(CreateTaskRunnerWithExecutionMode(worker_pool, execution_mode),
execution_mode) {
DCHECK(worker_pool_);
}
const test::TestTaskFactory* factory() const { return &factory_; }
private:
void Run() override {
EXPECT_FALSE(factory_.task_runner()->RunsTasksOnCurrentThread());
for (size_t i = 0; i < kNumTasksPostedPerThread; ++i) {
if (wait_before_post_task_ ==
WaitBeforePostTask::WAIT_FOR_ALL_WORKERS_IDLE) {
worker_pool_->WaitForAllWorkersIdleForTesting();
}
EXPECT_TRUE(factory_.PostTask(post_nested_task_, Closure()));
}
}
SchedulerWorkerPoolImpl* const worker_pool_;
const scoped_refptr<TaskRunner> task_runner_;
const WaitBeforePostTask wait_before_post_task_;
const PostNestedTask post_nested_task_;
test::TestTaskFactory factory_;
DISALLOW_COPY_AND_ASSIGN(ThreadPostingTasks);
};
using WaitBeforePostTask = ThreadPostingTasks::WaitBeforePostTask;
void ShouldNotRun() {
ADD_FAILURE() << "Ran a task that shouldn't run.";
}
} // namespace
TEST_P(TaskSchedulerWorkerPoolImplTest, PostTasks) {
// Create threads to post tasks.
std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks;
for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
threads_posting_tasks.push_back(MakeUnique<ThreadPostingTasks>(
worker_pool_.get(), GetParam(), WaitBeforePostTask::NO_WAIT,
PostNestedTask::NO));
threads_posting_tasks.back()->Start();
}
// Wait for all tasks to run.
for (const auto& thread_posting_tasks : threads_posting_tasks) {
thread_posting_tasks->Join();
thread_posting_tasks->factory()->WaitForAllTasksToRun();
}
// Wait until all workers are idle to be sure that no task accesses
// its TestTaskFactory after |thread_posting_tasks| is destroyed.
worker_pool_->WaitForAllWorkersIdleForTesting();
}
TEST_P(TaskSchedulerWorkerPoolImplTest, PostTasksWaitAllWorkersIdle) {
// Create threads to post tasks. To verify that workers can sleep and be woken
// up when new tasks are posted, wait for all workers to become idle before
// posting a new task.
std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks;
for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
threads_posting_tasks.push_back(MakeUnique<ThreadPostingTasks>(
worker_pool_.get(), GetParam(),
WaitBeforePostTask::WAIT_FOR_ALL_WORKERS_IDLE, PostNestedTask::NO));
threads_posting_tasks.back()->Start();
}
// Wait for all tasks to run.
for (const auto& thread_posting_tasks : threads_posting_tasks) {
thread_posting_tasks->Join();
thread_posting_tasks->factory()->WaitForAllTasksToRun();
}
// Wait until all workers are idle to be sure that no task accesses its
// TestTaskFactory after |thread_posting_tasks| is destroyed.
worker_pool_->WaitForAllWorkersIdleForTesting();
}
TEST_P(TaskSchedulerWorkerPoolImplTest, NestedPostTasks) {
// Create threads to post tasks. Each task posted by these threads will post
// another task when it runs.
std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks;
for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
threads_posting_tasks.push_back(MakeUnique<ThreadPostingTasks>(
worker_pool_.get(), GetParam(), WaitBeforePostTask::NO_WAIT,
PostNestedTask::YES));
threads_posting_tasks.back()->Start();
}
// Wait for all tasks to run.
for (const auto& thread_posting_tasks : threads_posting_tasks) {
thread_posting_tasks->Join();
thread_posting_tasks->factory()->WaitForAllTasksToRun();
}
// Wait until all workers are idle to be sure that no task accesses its
// TestTaskFactory after |thread_posting_tasks| is destroyed.
worker_pool_->WaitForAllWorkersIdleForTesting();
}
TEST_P(TaskSchedulerWorkerPoolImplTest, PostTasksWithOneAvailableWorker) {
// Post blocking tasks to keep all workers busy except one until |event| is
// signaled. Use different factories so that tasks are added to different
// sequences and can run simultaneously when the execution mode is SEQUENCED.
WaitableEvent event(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED);
std::vector<std::unique_ptr<test::TestTaskFactory>> blocked_task_factories;
for (size_t i = 0; i < (kNumWorkersInWorkerPool - 1); ++i) {
blocked_task_factories.push_back(MakeUnique<test::TestTaskFactory>(
CreateTaskRunnerWithExecutionMode(worker_pool_.get(), GetParam()),
GetParam()));
EXPECT_TRUE(blocked_task_factories.back()->PostTask(
PostNestedTask::NO, Bind(&WaitableEvent::Wait, Unretained(&event))));
blocked_task_factories.back()->WaitForAllTasksToRun();
}
// Post |kNumTasksPostedPerThread| tasks that should all run despite the fact
// that only one worker in |worker_pool_| isn't busy.
test::TestTaskFactory short_task_factory(
CreateTaskRunnerWithExecutionMode(worker_pool_.get(), GetParam()),
GetParam());
for (size_t i = 0; i < kNumTasksPostedPerThread; ++i)
EXPECT_TRUE(short_task_factory.PostTask(PostNestedTask::NO, Closure()));
short_task_factory.WaitForAllTasksToRun();
// Release tasks waiting on |event|.
event.Signal();
// Wait until all workers are idle to be sure that no task accesses
// its TestTaskFactory after it is destroyed.
worker_pool_->WaitForAllWorkersIdleForTesting();
}
TEST_P(TaskSchedulerWorkerPoolImplTest, Saturate) {
// Verify that it is possible to have |kNumWorkersInWorkerPool|
// tasks/sequences running simultaneously. Use different factories so that the
// blocking tasks are added to different sequences and can run simultaneously
// when the execution mode is SEQUENCED.
WaitableEvent event(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED);
std::vector<std::unique_ptr<test::TestTaskFactory>> factories;
for (size_t i = 0; i < kNumWorkersInWorkerPool; ++i) {
factories.push_back(MakeUnique<test::TestTaskFactory>(
CreateTaskRunnerWithExecutionMode(worker_pool_.get(), GetParam()),
GetParam()));
EXPECT_TRUE(factories.back()->PostTask(
PostNestedTask::NO, Bind(&WaitableEvent::Wait, Unretained(&event))));
factories.back()->WaitForAllTasksToRun();
}
// Release tasks waiting on |event|.
event.Signal();
// Wait until all workers are idle to be sure that no task accesses
// its TestTaskFactory after it is destroyed.
worker_pool_->WaitForAllWorkersIdleForTesting();
}
// Verify that a Task can't be posted after shutdown.
TEST_P(TaskSchedulerWorkerPoolImplTest, PostTaskAfterShutdown) {
auto task_runner =
CreateTaskRunnerWithExecutionMode(worker_pool_.get(), GetParam());
task_tracker_.Shutdown();
EXPECT_FALSE(task_runner->PostTask(FROM_HERE, Bind(&ShouldNotRun)));
}
// Verify that a Task runs shortly after its delay expires.
TEST_P(TaskSchedulerWorkerPoolImplTest, PostDelayedTask) {
TimeTicks start_time = TimeTicks::Now();
// Post a task with a short delay.
WaitableEvent task_ran(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED);
EXPECT_TRUE(CreateTaskRunnerWithExecutionMode(worker_pool_.get(), GetParam())
->PostDelayedTask(FROM_HERE, Bind(&WaitableEvent::Signal,
Unretained(&task_ran)),
TestTimeouts::tiny_timeout()));
// Wait until the task runs.
task_ran.Wait();
// Expect the task to run after its delay expires, but not more than 250 ms
// after that.
const TimeDelta actual_delay = TimeTicks::Now() - start_time;
EXPECT_GE(actual_delay, TestTimeouts::tiny_timeout());
EXPECT_LT(actual_delay,
TimeDelta::FromMilliseconds(250) + TestTimeouts::tiny_timeout());
}
// Verify that the RunsTasksOnCurrentThread() method of a SEQUENCED TaskRunner
// returns false when called from a task that isn't part of the sequence. Note:
// Tests that use TestTaskFactory already verify that RunsTasksOnCurrentThread()
// returns true when appropriate so this method complements it to get full
// coverage of that method.
TEST_P(TaskSchedulerWorkerPoolImplTest, SequencedRunsTasksOnCurrentThread) {
auto task_runner =
CreateTaskRunnerWithExecutionMode(worker_pool_.get(), GetParam());
auto sequenced_task_runner =
worker_pool_->CreateSequencedTaskRunnerWithTraits(TaskTraits());
WaitableEvent task_ran(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED);
task_runner->PostTask(
FROM_HERE,
Bind(
[](scoped_refptr<TaskRunner> sequenced_task_runner,
WaitableEvent* task_ran) {
EXPECT_FALSE(sequenced_task_runner->RunsTasksOnCurrentThread());
task_ran->Signal();
},
sequenced_task_runner, Unretained(&task_ran)));
task_ran.Wait();
}
INSTANTIATE_TEST_CASE_P(Parallel,
TaskSchedulerWorkerPoolImplTest,
::testing::Values(test::ExecutionMode::PARALLEL));
INSTANTIATE_TEST_CASE_P(Sequenced,
TaskSchedulerWorkerPoolImplTest,
::testing::Values(test::ExecutionMode::SEQUENCED));
namespace {
class TaskSchedulerWorkerPoolImplPostTaskBeforeStartTest
: public TaskSchedulerWorkerPoolImplTest {
public:
void SetUp() override {
CreateWorkerPool();
// Let the test start the worker pool.
}
};
void TaskPostedBeforeStart(PlatformThreadRef* platform_thread_ref,
WaitableEvent* task_scheduled,
WaitableEvent* barrier) {
*platform_thread_ref = PlatformThread::CurrentRef();
task_scheduled->Signal();
barrier->Wait();
}
} // namespace
// Verify that 2 tasks posted before Start() to a SchedulerWorkerPoolImpl with
// more than 2 workers are scheduled on different workers when Start() is
// called.
TEST_F(TaskSchedulerWorkerPoolImplPostTaskBeforeStartTest,
PostTasksBeforeStart) {
PlatformThreadRef task_1_thread_ref;
PlatformThreadRef task_2_thread_ref;
WaitableEvent task_1_scheduled(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED);
WaitableEvent task_2_scheduled(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED);
// This event is used to prevent a task from completing before the other task
// is scheduled. If that happened, both tasks could run on the same worker and
// this test couldn't verify that the correct number of workers were woken up.
WaitableEvent barrier(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED);
worker_pool_
->CreateTaskRunnerWithTraits(TaskTraits().WithBaseSyncPrimitives())
->PostTask(FROM_HERE,
Bind(&TaskPostedBeforeStart, Unretained(&task_1_thread_ref),
Unretained(&task_1_scheduled), Unretained(&barrier)));
worker_pool_
->CreateTaskRunnerWithTraits(TaskTraits().WithBaseSyncPrimitives())
->PostTask(FROM_HERE,
Bind(&TaskPostedBeforeStart, Unretained(&task_2_thread_ref),
Unretained(&task_2_scheduled), Unretained(&barrier)));
// Workers should not be created and tasks should not run before the pool is
// started.
EXPECT_EQ(0U, worker_pool_->NumberOfAliveWorkersForTesting());
EXPECT_FALSE(task_1_scheduled.IsSignaled());
EXPECT_FALSE(task_2_scheduled.IsSignaled());
StartWorkerPool(TimeDelta::Max(), kNumWorkersInWorkerPool);
// Tasks should be scheduled shortly after the pool is started.
task_1_scheduled.Wait();
task_2_scheduled.Wait();
// Tasks should be scheduled on different threads.
EXPECT_NE(task_1_thread_ref, task_2_thread_ref);
barrier.Signal();
task_tracker_.Flush();
}
namespace {
constexpr size_t kMagicTlsValue = 42;
class TaskSchedulerWorkerPoolCheckTlsReuse
: public TaskSchedulerWorkerPoolImplTest {
public:
void SetTlsValueAndWait() {
slot_.Set(reinterpret_cast<void*>(kMagicTlsValue));
waiter_.Wait();
}
void CountZeroTlsValuesAndWait(WaitableEvent* count_waiter) {
if (!slot_.Get())
subtle::NoBarrier_AtomicIncrement(&zero_tls_values_, 1);
count_waiter->Signal();
waiter_.Wait();
}
protected:
TaskSchedulerWorkerPoolCheckTlsReuse() :
waiter_(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED) {}
void SetUp() override {
CreateAndStartWorkerPool(kReclaimTimeForDetachTests,
kNumWorkersInWorkerPool);
}
subtle::Atomic32 zero_tls_values_ = 0;
WaitableEvent waiter_;
private:
ThreadLocalStorage::Slot slot_;
DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolCheckTlsReuse);
};
} // namespace
// Checks that at least one thread has detached by checking the TLS.
TEST_F(TaskSchedulerWorkerPoolCheckTlsReuse, CheckDetachedThreads) {
// Saturate the threads and mark each thread with a magic TLS value.
std::vector<std::unique_ptr<test::TestTaskFactory>> factories;
for (size_t i = 0; i < kNumWorkersInWorkerPool; ++i) {
factories.push_back(MakeUnique<test::TestTaskFactory>(
worker_pool_->CreateTaskRunnerWithTraits(
TaskTraits().WithBaseSyncPrimitives()),
test::ExecutionMode::PARALLEL));
ASSERT_TRUE(factories.back()->PostTask(
PostNestedTask::NO,
Bind(&TaskSchedulerWorkerPoolCheckTlsReuse::SetTlsValueAndWait,
Unretained(this))));
factories.back()->WaitForAllTasksToRun();
}
// Release tasks waiting on |waiter_|.
waiter_.Signal();
worker_pool_->WaitForAllWorkersIdleForTesting();
// All threads should be done running by now, so reset for the next phase.
waiter_.Reset();
// Give the worker pool a chance to detach its threads.
PlatformThread::Sleep(kReclaimTimeForDetachTests + kExtraTimeToWaitForDetach);
worker_pool_->DisallowWorkerDetachmentForTesting();
// Saturate and count the threads that do not have the magic TLS value. If the
// value is not there, that means we're at a new thread.
std::vector<std::unique_ptr<WaitableEvent>> count_waiters;
for (auto& factory : factories) {
count_waiters.push_back(WrapUnique(new WaitableEvent(
WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED)));
ASSERT_TRUE(factory->PostTask(
PostNestedTask::NO,
Bind(&TaskSchedulerWorkerPoolCheckTlsReuse::CountZeroTlsValuesAndWait,
Unretained(this),
count_waiters.back().get())));
factory->WaitForAllTasksToRun();
}
// Wait for all counters to complete.
for (auto& count_waiter : count_waiters)
count_waiter->Wait();
EXPECT_GT(subtle::NoBarrier_Load(&zero_tls_values_), 0);
// Release tasks waiting on |waiter_|.
waiter_.Signal();
}
namespace {
class TaskSchedulerWorkerPoolHistogramTest
: public TaskSchedulerWorkerPoolImplTest {
public:
TaskSchedulerWorkerPoolHistogramTest() = default;
protected:
// Override SetUp() to allow every test case to initialize a worker pool with
// its own arguments.
void SetUp() override {}
private:
std::unique_ptr<StatisticsRecorder> statistics_recorder_ =
StatisticsRecorder::CreateTemporaryForTesting();
DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolHistogramTest);
};
} // namespace
TEST_F(TaskSchedulerWorkerPoolHistogramTest, NumTasksBetweenWaits) {
WaitableEvent event(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED);
CreateAndStartWorkerPool(TimeDelta::Max(), kNumWorkersInWorkerPool);
auto task_runner = worker_pool_->CreateSequencedTaskRunnerWithTraits(
TaskTraits().WithBaseSyncPrimitives());
// Post a task.
task_runner->PostTask(FROM_HERE,
Bind(&WaitableEvent::Wait, Unretained(&event)));
// Post 2 more tasks while the first task hasn't completed its execution. It
// is guaranteed that these tasks will run immediately after the first task,
// without allowing the worker to sleep.
task_runner->PostTask(FROM_HERE, Bind(&DoNothing));
task_runner->PostTask(FROM_HERE, Bind(&DoNothing));
// Allow tasks to run and wait until the SchedulerWorker is idle.
event.Signal();
worker_pool_->WaitForAllWorkersIdleForTesting();
// Wake up the SchedulerWorker that just became idle by posting a task and
// wait until it becomes idle again. The SchedulerWorker should record the
// TaskScheduler.NumTasksBetweenWaits.* histogram on wake up.
task_runner->PostTask(FROM_HERE, Bind(&DoNothing));
worker_pool_->WaitForAllWorkersIdleForTesting();
// Verify that counts were recorded to the histogram as expected.
const auto* histogram = worker_pool_->num_tasks_between_waits_histogram();
EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(0));
EXPECT_EQ(1, histogram->SnapshotSamples()->GetCount(3));
EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(10));
}
namespace {
void SignalAndWaitEvent(WaitableEvent* signal_event,
WaitableEvent* wait_event) {
signal_event->Signal();
wait_event->Wait();
}
} // namespace
TEST_F(TaskSchedulerWorkerPoolHistogramTest, NumTasksBetweenWaitsWithDetach) {
WaitableEvent tasks_can_exit_event(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED);
CreateAndStartWorkerPool(kReclaimTimeForDetachTests, kNumWorkersInWorkerPool);
auto task_runner = worker_pool_->CreateTaskRunnerWithTraits(
TaskTraits().WithBaseSyncPrimitives());
// Post tasks to saturate the pool.
std::vector<std::unique_ptr<WaitableEvent>> task_started_events;
for (size_t i = 0; i < kNumWorkersInWorkerPool; ++i) {
task_started_events.push_back(
MakeUnique<WaitableEvent>(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED));
task_runner->PostTask(
FROM_HERE,
Bind(&SignalAndWaitEvent, Unretained(task_started_events.back().get()),
Unretained(&tasks_can_exit_event)));
}
for (const auto& task_started_event : task_started_events)
task_started_event->Wait();
// Allow tasks to complete their execution and wait to allow workers to
// detach.
tasks_can_exit_event.Signal();
worker_pool_->WaitForAllWorkersIdleForTesting();
PlatformThread::Sleep(kReclaimTimeForDetachTests + kExtraTimeToWaitForDetach);
// Wake up SchedulerWorkers by posting tasks. They should record the
// TaskScheduler.NumTasksBetweenWaits.* histogram on wake up.
tasks_can_exit_event.Reset();
task_started_events.clear();
for (size_t i = 0; i < kNumWorkersInWorkerPool; ++i) {
task_started_events.push_back(
MakeUnique<WaitableEvent>(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED));
task_runner->PostTask(
FROM_HERE,
Bind(&SignalAndWaitEvent, Unretained(task_started_events.back().get()),
Unretained(&tasks_can_exit_event)));
}
for (const auto& task_started_event : task_started_events)
task_started_event->Wait();
const auto* histogram = worker_pool_->num_tasks_between_waits_histogram();
// Verify that counts were recorded to the histogram as expected.
// - The "0" bucket has a count of at least 1 because the SchedulerWorker on
// top of the idle stack isn't allowed to detach when its sleep timeout
// expires. Instead, it waits on its WaitableEvent again without running a
// task. The count may be higher than 1 because of spurious wake ups before
// the sleep timeout expires.
EXPECT_GE(histogram->SnapshotSamples()->GetCount(0), 1);
// - The "1" bucket has a count of |kNumWorkersInWorkerPool| because each
// SchedulerWorker ran a task before waiting on its WaitableEvent at the
// beginning of the test.
EXPECT_EQ(static_cast<int>(kNumWorkersInWorkerPool),
histogram->SnapshotSamples()->GetCount(1));
EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(10));
tasks_can_exit_event.Signal();
worker_pool_->WaitForAllWorkersIdleForTesting();
worker_pool_->DisallowWorkerDetachmentForTesting();
}
TEST_F(TaskSchedulerWorkerPoolHistogramTest, NumTasksBeforeDetach) {
CreateAndStartWorkerPool(kReclaimTimeForDetachTests, kNumWorkersInWorkerPool);
auto histogrammed_thread_task_runner =
worker_pool_->CreateSequencedTaskRunnerWithTraits(
TaskTraits().WithBaseSyncPrimitives());
// Post 3 tasks and hold the thread for idle thread stack ordering.
// This test assumes |histogrammed_thread_task_runner| gets assigned the same
// thread for each of its tasks.
PlatformThreadRef thread_ref;
histogrammed_thread_task_runner->PostTask(
FROM_HERE, Bind(
[](PlatformThreadRef* thread_ref) {
ASSERT_TRUE(thread_ref);
*thread_ref = PlatformThread::CurrentRef();
},
Unretained(&thread_ref)));
histogrammed_thread_task_runner->PostTask(
FROM_HERE, Bind(
[](PlatformThreadRef* thread_ref) {
ASSERT_FALSE(thread_ref->is_null());
EXPECT_EQ(*thread_ref, PlatformThread::CurrentRef());
},
Unretained(&thread_ref)));
WaitableEvent detach_thread_running(
WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED);
WaitableEvent detach_thread_continue(
WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED);
histogrammed_thread_task_runner->PostTask(
FROM_HERE,
Bind(
[](PlatformThreadRef* thread_ref,
WaitableEvent* detach_thread_running,
WaitableEvent* detach_thread_continue) {
ASSERT_FALSE(thread_ref->is_null());
EXPECT_EQ(*thread_ref, PlatformThread::CurrentRef());
detach_thread_running->Signal();
detach_thread_continue->Wait();
},
Unretained(&thread_ref), Unretained(&detach_thread_running),
Unretained(&detach_thread_continue)));
detach_thread_running.Wait();
// To allow the SchedulerWorker associated with
// |histogrammed_thread_task_runner| to detach, make sure it isn't on top of
// the idle stack by waking up another SchedulerWorker via
// |task_runner_for_top_idle|. |histogrammed_thread_task_runner| should
// release and go idle first and then |task_runner_for_top_idle| should
// release and go idle. This allows the SchedulerWorker associated with
// |histogrammed_thread_task_runner| to detach.
WaitableEvent top_idle_thread_running(
WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED);
WaitableEvent top_idle_thread_continue(
WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED);
auto task_runner_for_top_idle =
worker_pool_->CreateSequencedTaskRunnerWithTraits(
TaskTraits().WithBaseSyncPrimitives());
task_runner_for_top_idle->PostTask(
FROM_HERE, Bind(
[](PlatformThreadRef thread_ref,
WaitableEvent* top_idle_thread_running,
WaitableEvent* top_idle_thread_continue) {
ASSERT_FALSE(thread_ref.is_null());
EXPECT_NE(thread_ref, PlatformThread::CurrentRef())
<< "Worker reused. Thread will not detach and the "
"histogram value will be wrong.";
top_idle_thread_running->Signal();
top_idle_thread_continue->Wait();
},
thread_ref, Unretained(&top_idle_thread_running),
Unretained(&top_idle_thread_continue)));
top_idle_thread_running.Wait();
detach_thread_continue.Signal();
// Wait for the thread processing the |histogrammed_thread_task_runner| work
// to go to the idle stack.
PlatformThread::Sleep(TestTimeouts::tiny_timeout());
top_idle_thread_continue.Signal();
// Allow the thread processing the |histogrammed_thread_task_runner| work to
// detach.
PlatformThread::Sleep(kReclaimTimeForDetachTests +
kReclaimTimeForDetachTests);
worker_pool_->WaitForAllWorkersIdleForTesting();
worker_pool_->DisallowWorkerDetachmentForTesting();
// Verify that counts were recorded to the histogram as expected.
const auto* histogram = worker_pool_->num_tasks_before_detach_histogram();
EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(0));
EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(1));
EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(2));
EXPECT_EQ(1, histogram->SnapshotSamples()->GetCount(3));
EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(4));
EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(5));
EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(6));
EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(10));
}
namespace {
void NotReachedReEnqueueSequenceCallback(scoped_refptr<Sequence> sequence) {
ADD_FAILURE()
<< "Unexpected invocation of NotReachedReEnqueueSequenceCallback.";
}
} // namespace
TEST(TaskSchedulerWorkerPoolStandbyPolicyTest, InitLazy) {
TaskTracker task_tracker;
DelayedTaskManager delayed_task_manager(
make_scoped_refptr(new TestSimpleTaskRunner));
auto worker_pool = MakeUnique<SchedulerWorkerPoolImpl>(
"LazyPolicyWorkerPool", ThreadPriority::NORMAL,
Bind(&NotReachedReEnqueueSequenceCallback), &task_tracker,
&delayed_task_manager);
worker_pool->Start(SchedulerWorkerPoolParams(StandbyThreadPolicy::LAZY, 8U,
TimeDelta::Max()));
ASSERT_TRUE(worker_pool);
EXPECT_EQ(0U, worker_pool->NumberOfAliveWorkersForTesting());
worker_pool->JoinForTesting();
}
TEST(TaskSchedulerWorkerPoolStandbyPolicyTest, InitOne) {
TaskTracker task_tracker;
DelayedTaskManager delayed_task_manager(
make_scoped_refptr(new TestSimpleTaskRunner));
auto worker_pool = MakeUnique<SchedulerWorkerPoolImpl>(
"OnePolicyWorkerPool", ThreadPriority::NORMAL,
Bind(&NotReachedReEnqueueSequenceCallback), &task_tracker,
&delayed_task_manager);
worker_pool->Start(SchedulerWorkerPoolParams(StandbyThreadPolicy::ONE, 8U,
TimeDelta::Max()));
ASSERT_TRUE(worker_pool);
EXPECT_EQ(1U, worker_pool->NumberOfAliveWorkersForTesting());
worker_pool->JoinForTesting();
}
} // namespace internal
} // namespace base