| #include "third_party/blink/renderer/platform/scheduler/test/fuzzer/thread_manager.h" |
| |
| #include <algorithm> |
| |
| #include "base/task/sequence_manager/test/test_task_queue.h" |
| #include "base/threading/thread_task_runner_handle.h" |
| #include "third_party/blink/renderer/platform/scheduler/test/fuzzer/thread_pool_manager.h" |
| |
| namespace base { |
| namespace sequence_manager { |
| |
| namespace { |
| |
| TaskQueue::QueuePriority ToTaskQueuePriority( |
| SequenceManagerTestDescription::QueuePriority priority) { |
| static_assert(TaskQueue::kQueuePriorityCount == 6, |
| "Number of task queue priorities has changed in " |
| "TaskQueue::QueuePriority."); |
| |
| switch (priority) { |
| case SequenceManagerTestDescription::BEST_EFFORT: |
| return TaskQueue::kBestEffortPriority; |
| case SequenceManagerTestDescription::LOW: |
| return TaskQueue::kLowPriority; |
| case SequenceManagerTestDescription::UNDEFINED: |
| case SequenceManagerTestDescription::NORMAL: |
| return TaskQueue::kNormalPriority; |
| case SequenceManagerTestDescription::HIGH: |
| return TaskQueue::kHighPriority; |
| case SequenceManagerTestDescription::HIGHEST: |
| return TaskQueue::kHighestPriority; |
| case SequenceManagerTestDescription::CONTROL: |
| return TaskQueue::kControlPriority; |
| } |
| } |
| |
| } // namespace |
| |
| ThreadManager::ThreadManager(TimeTicks initial_time, |
| SequenceManagerFuzzerProcessor* processor) |
| : processor_(processor) { |
| DCHECK(processor_); |
| |
| test_task_runner_ = WrapRefCounted( |
| new TestMockTimeTaskRunner(TestMockTimeTaskRunner::Type::kBoundToThread)); |
| |
| DCHECK(!(initial_time - TimeTicks()).is_zero()) |
| << "A zero clock is not allowed as empty TimeTicks have a special value " |
| "(i.e. base::TimeTicks::is_null())"; |
| |
| test_task_runner_->AdvanceMockTickClock(initial_time - TimeTicks()); |
| |
| manager_ = |
| SequenceManagerForTest::Create(nullptr, ThreadTaskRunnerHandle::Get(), |
| test_task_runner_->GetMockTickClock()); |
| |
| TaskQueue::Spec spec = TaskQueue::Spec("default_task_queue"); |
| task_queues_.emplace_back(std::make_unique<TaskQueueWithVoters>( |
| manager_->CreateTaskQueueWithType<TestTaskQueue>(spec))); |
| } |
| |
| ThreadManager::~ThreadManager() = default; |
| |
| TimeTicks ThreadManager::NowTicks() { |
| return test_task_runner_->GetMockTickClock()->NowTicks(); |
| } |
| |
| TimeDelta ThreadManager::NextPendingTaskDelay() { |
| return std::max(TimeDelta::FromMilliseconds(0), |
| test_task_runner_->NextPendingTaskDelay()); |
| } |
| |
| void ThreadManager::AdvanceMockTickClock(TimeDelta delta) { |
| DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); |
| |
| return test_task_runner_->AdvanceMockTickClock(delta); |
| } |
| |
| void ThreadManager::ExecuteThread( |
| const google::protobuf::RepeatedPtrField< |
| SequenceManagerTestDescription::Action>& initial_thread_actions) { |
| for (const auto& initial_thread_action : initial_thread_actions) { |
| RunAction(initial_thread_action); |
| } |
| |
| while (NowTicks() < TimeTicks::Max()) { |
| RunLoop().RunUntilIdle(); |
| processor_->thread_pool_manager() |
| ->AdvanceClockSynchronouslyByPendingTaskDelay(this); |
| } |
| |
| RunLoop().RunUntilIdle(); |
| processor_->thread_pool_manager()->ThreadDone(); |
| } |
| |
| void ThreadManager::RunAction( |
| const SequenceManagerTestDescription::Action& action) { |
| if (action.has_create_task_queue()) { |
| ExecuteCreateTaskQueueAction(action.action_id(), |
| action.create_task_queue()); |
| } else if (action.has_set_queue_priority()) { |
| ExecuteSetQueuePriorityAction(action.action_id(), |
| action.set_queue_priority()); |
| } else if (action.has_set_queue_enabled()) { |
| ExecuteSetQueueEnabledAction(action.action_id(), |
| action.set_queue_enabled()); |
| } else if (action.has_create_queue_voter()) { |
| ExecuteCreateQueueVoterAction(action.action_id(), |
| action.create_queue_voter()); |
| } else if (action.has_shutdown_task_queue()) { |
| ExecuteShutdownTaskQueueAction(action.action_id(), |
| action.shutdown_task_queue()); |
| } else if (action.has_cancel_task()) { |
| ExecuteCancelTaskAction(action.action_id(), action.cancel_task()); |
| } else if (action.has_insert_fence()) { |
| ExecuteInsertFenceAction(action.action_id(), action.insert_fence()); |
| } else if (action.has_remove_fence()) { |
| ExecuteRemoveFenceAction(action.action_id(), action.remove_fence()); |
| } else if (action.has_create_thread()) { |
| ExecuteCreateThreadAction(action.action_id(), action.create_thread()); |
| } else if (action.has_cross_thread_post()) { |
| ExecuteCrossThreadPostDelayedTaskAction(action.action_id(), |
| action.cross_thread_post()); |
| } else { |
| ExecutePostDelayedTaskAction(action.action_id(), |
| action.post_delayed_task()); |
| } |
| } |
| |
| void ThreadManager::ExecuteCreateThreadAction( |
| uint64_t action_id, |
| const SequenceManagerTestDescription::CreateThreadAction& action) { |
| DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); |
| |
| processor_->LogActionForTesting(&ordered_actions_, action_id, |
| ActionForTest::ActionType::kCreateThread, |
| NowTicks()); |
| |
| processor_->thread_pool_manager()->CreateThread( |
| action.initial_thread_actions(), NowTicks()); |
| } |
| |
| void ThreadManager::ExecuteCreateTaskQueueAction( |
| uint64_t action_id, |
| const SequenceManagerTestDescription::CreateTaskQueueAction& action) { |
| DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); |
| |
| processor_->LogActionForTesting(&ordered_actions_, action_id, |
| ActionForTest::ActionType::kCreateTaskQueue, |
| NowTicks()); |
| |
| TaskQueue::Spec spec = TaskQueue::Spec("test_task_queue"); |
| |
| TestTaskQueue* chosen_task_queue; |
| { |
| AutoLock lock(lock_); |
| task_queues_.emplace_back(std::make_unique<TaskQueueWithVoters>( |
| manager_->CreateTaskQueueWithType<TestTaskQueue>(spec))); |
| chosen_task_queue = task_queues_.back()->queue.get(); |
| } |
| chosen_task_queue->SetQueuePriority( |
| ToTaskQueuePriority(action.initial_priority())); |
| } |
| |
| void ThreadManager::ExecutePostDelayedTaskAction( |
| uint64_t action_id, |
| const SequenceManagerTestDescription::PostDelayedTaskAction& action) { |
| DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); |
| |
| processor_->LogActionForTesting(&ordered_actions_, action_id, |
| ActionForTest::ActionType::kPostDelayedTask, |
| NowTicks()); |
| |
| PostDelayedTask(action.task_queue_id(), action.delay_ms(), action.task()); |
| } |
| |
| void ThreadManager::ExecuteCrossThreadPostDelayedTaskAction( |
| uint64_t action_id, |
| const SequenceManagerTestDescription::CrossThreadPostDelayedTaskAction& |
| action) { |
| DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); |
| |
| processor_->LogActionForTesting( |
| &ordered_actions_, action_id, |
| ActionForTest::ActionType::kCrossThreadPostDelayedTask, NowTicks()); |
| |
| processor_->thread_pool_manager() |
| ->GetThreadManagerFor(action.thread_id()) |
| ->PostDelayedTask(action.task_queue_id(), action.delay_ms(), |
| action.task()); |
| } |
| |
| void ThreadManager::PostDelayedTask( |
| uint64_t task_queue_id, |
| uint32_t delay_ms, |
| const SequenceManagerTestDescription::Task& task) { |
| TestTaskQueue* chosen_task_queue = |
| GetTaskQueueFor(task_queue_id)->queue.get(); |
| |
| std::unique_ptr<Task> pending_task = std::make_unique<Task>(this); |
| |
| // TODO(farahcharab) After adding non-nestable/nestable tasks, fix this to |
| // PostNonNestableDelayedTask for the former and PostDelayedTask for the |
| // latter. |
| chosen_task_queue->task_runner()->PostDelayedTask( |
| FROM_HERE, |
| BindOnce(&Task::Execute, pending_task->weak_ptr_factory_.GetWeakPtr(), |
| task), |
| TimeDelta::FromMilliseconds(delay_ms)); |
| |
| { |
| AutoLock lock(lock_); |
| pending_tasks_.push_back(std::move(pending_task)); |
| } |
| } |
| |
| void ThreadManager::ExecuteSetQueuePriorityAction( |
| uint64_t action_id, |
| const SequenceManagerTestDescription::SetQueuePriorityAction& action) { |
| DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); |
| |
| processor_->LogActionForTesting(&ordered_actions_, action_id, |
| ActionForTest::ActionType::kSetQueuePriority, |
| NowTicks()); |
| |
| TestTaskQueue* chosen_task_queue = |
| GetTaskQueueFor(action.task_queue_id())->queue.get(); |
| chosen_task_queue->SetQueuePriority(ToTaskQueuePriority(action.priority())); |
| } |
| |
| void ThreadManager::ExecuteSetQueueEnabledAction( |
| uint64_t action_id, |
| const SequenceManagerTestDescription::SetQueueEnabledAction& action) { |
| DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); |
| |
| processor_->LogActionForTesting(&ordered_actions_, action_id, |
| ActionForTest::ActionType::kSetQueueEnabled, |
| NowTicks()); |
| |
| TaskQueueWithVoters* chosen_task_queue = |
| GetTaskQueueFor(action.task_queue_id()); |
| |
| if (chosen_task_queue->voters.empty()) { |
| chosen_task_queue->voters.push_back( |
| chosen_task_queue->queue.get()->CreateQueueEnabledVoter()); |
| } |
| |
| size_t voter_index = action.voter_id() % chosen_task_queue->voters.size(); |
| chosen_task_queue->voters[voter_index]->SetQueueEnabled(action.enabled()); |
| } |
| |
| void ThreadManager::ExecuteCreateQueueVoterAction( |
| uint64_t action_id, |
| const SequenceManagerTestDescription::CreateQueueVoterAction& action) { |
| DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); |
| |
| processor_->LogActionForTesting(&ordered_actions_, action_id, |
| ActionForTest::ActionType::kCreateQueueVoter, |
| NowTicks()); |
| |
| TaskQueueWithVoters* chosen_task_queue = |
| GetTaskQueueFor(action.task_queue_id()); |
| chosen_task_queue->voters.push_back( |
| chosen_task_queue->queue.get()->CreateQueueEnabledVoter()); |
| } |
| |
| void ThreadManager::ExecuteShutdownTaskQueueAction( |
| uint64_t action_id, |
| const SequenceManagerTestDescription::ShutdownTaskQueueAction& action) { |
| DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); |
| |
| processor_->LogActionForTesting(&ordered_actions_, action_id, |
| ActionForTest::ActionType::kShutdownTaskQueue, |
| NowTicks()); |
| |
| TestTaskQueue* chosen_task_queue = nullptr; |
| size_t queue_index; |
| { |
| AutoLock lock(lock_); |
| |
| // We always want to have a default task queue. |
| if (task_queues_.size() > 1) { |
| queue_index = action.task_queue_id() % task_queues_.size(); |
| chosen_task_queue = task_queues_[queue_index]->queue.get(); |
| } |
| } |
| |
| if (chosen_task_queue) { |
| chosen_task_queue->ShutdownTaskQueue(); |
| AutoLock lock(lock_); |
| task_queues_.erase(task_queues_.begin() + queue_index); |
| } |
| } |
| |
| void ThreadManager::ExecuteCancelTaskAction( |
| uint64_t action_id, |
| const SequenceManagerTestDescription::CancelTaskAction& action) { |
| DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); |
| |
| processor_->LogActionForTesting(&ordered_actions_, action_id, |
| ActionForTest::ActionType::kCancelTask, |
| NowTicks()); |
| |
| AutoLock lock(lock_); |
| if (!pending_tasks_.empty()) { |
| size_t task_index = action.task_id() % pending_tasks_.size(); |
| pending_tasks_[task_index]->weak_ptr_factory_.InvalidateWeakPtrs(); |
| |
| // If it is already running, it is a parent task and will be deleted when |
| // it is done. |
| if (!pending_tasks_[task_index]->is_running_) { |
| pending_tasks_.erase(pending_tasks_.begin() + task_index); |
| } |
| } |
| } |
| |
| void ThreadManager::ExecuteInsertFenceAction( |
| uint64_t action_id, |
| const SequenceManagerTestDescription::InsertFenceAction& action) { |
| DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); |
| |
| processor_->LogActionForTesting(&ordered_actions_, action_id, |
| ActionForTest::ActionType::kInsertFence, |
| NowTicks()); |
| |
| TestTaskQueue* chosen_task_queue = |
| GetTaskQueueFor(action.task_queue_id())->queue.get(); |
| |
| if (action.position() == |
| SequenceManagerTestDescription::InsertFenceAction::NOW) { |
| chosen_task_queue->InsertFence(TaskQueue::InsertFencePosition::kNow); |
| } else { |
| chosen_task_queue->InsertFence( |
| TaskQueue::InsertFencePosition::kBeginningOfTime); |
| } |
| } |
| |
| void ThreadManager::ExecuteRemoveFenceAction( |
| uint64_t action_id, |
| const SequenceManagerTestDescription::RemoveFenceAction& action) { |
| DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); |
| |
| processor_->LogActionForTesting(&ordered_actions_, action_id, |
| ActionForTest::ActionType::kRemoveFence, |
| NowTicks()); |
| |
| TestTaskQueue* chosen_task_queue = |
| GetTaskQueueFor(action.task_queue_id())->queue.get(); |
| chosen_task_queue->RemoveFence(); |
| } |
| |
| void ThreadManager::ExecuteTask( |
| const SequenceManagerTestDescription::Task& task) { |
| TimeTicks start_time = NowTicks(); |
| |
| // We can limit the depth of the nested post delayed action when processing |
| // the proto. |
| for (const auto& task_action : task.actions()) { |
| // TODO(farahcharab) Add run loop to deal with nested tasks later. So far, |
| // we are assuming tasks are non-nestable. |
| RunAction(task_action); |
| } |
| |
| TimeTicks end_time = NowTicks(); |
| |
| TimeTicks next_time = |
| start_time + |
| std::max(TimeDelta(), TimeDelta::FromMilliseconds(task.duration_ms()) - |
| (end_time - start_time)); |
| |
| while (NowTicks() != next_time) { |
| processor_->thread_pool_manager()->AdvanceClockSynchronouslyToTime( |
| this, next_time); |
| } |
| |
| processor_->LogTaskForTesting(&ordered_tasks_, task.task_id(), start_time, |
| NowTicks()); |
| } |
| |
| void ThreadManager::DeleteTask(Task* task) { |
| AutoLock lock(lock_); |
| size_t i = 0; |
| while (i < pending_tasks_.size() && task != pending_tasks_[i].get()) { |
| i++; |
| } |
| if (i < pending_tasks_.size()) |
| pending_tasks_.erase(pending_tasks_.begin() + i); |
| } |
| |
| TaskQueueWithVoters* ThreadManager::GetTaskQueueFor(uint64_t task_queue_id) { |
| AutoLock lock(lock_); |
| DCHECK(!task_queues_.empty()); |
| return task_queues_[task_queue_id % task_queues_.size()].get(); |
| } |
| |
| const std::vector<SequenceManagerFuzzerProcessor::TaskForTest>& |
| ThreadManager::ordered_tasks() const { |
| return ordered_tasks_; |
| } |
| |
| const std::vector<SequenceManagerFuzzerProcessor::ActionForTest>& |
| ThreadManager::ordered_actions() const { |
| return ordered_actions_; |
| } |
| |
| ThreadManager::Task::Task(ThreadManager* thread_manager) |
| : is_running_(false), |
| thread_manager_(thread_manager), |
| weak_ptr_factory_(this) { |
| DCHECK(thread_manager_); |
| } |
| |
| void ThreadManager::Task::Execute( |
| const SequenceManagerTestDescription::Task& task) { |
| DCHECK_CALLED_ON_VALID_THREAD(thread_manager_->thread_checker_); |
| is_running_ = true; |
| thread_manager_->ExecuteTask(task); |
| thread_manager_->DeleteTask(this); |
| } |
| |
| } // namespace sequence_manager |
| } // namespace base |