blob: d7a5f64c06112393b371a5b9f5b1a7935915f70b [file] [log] [blame]
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "third_party/blink/renderer/core/workers/experimental/thread_pool.h"
#include "services/service_manager/public/cpp/interface_provider.h"
#include "third_party/blink/public/platform/dedicated_worker_factory.mojom-blink.h"
#include "third_party/blink/renderer/core/dom/document.h"
#include "third_party/blink/renderer/core/frame/local_frame.h"
#include "third_party/blink/renderer/core/origin_trials/origin_trial_context.h"
#include "third_party/blink/renderer/core/workers/global_scope_creation_params.h"
#include "third_party/blink/renderer/core/workers/threaded_messaging_proxy_base.h"
#include "third_party/blink/renderer/core/workers/threaded_object_proxy_base.h"
namespace blink {
class ThreadPoolObjectProxy final : public ThreadedObjectProxyBase {
public:
ThreadPoolObjectProxy(ThreadPoolMessagingProxy* messaging_proxy,
ParentExecutionContextTaskRunners* task_runners)
: ThreadedObjectProxyBase(task_runners),
messaging_proxy_(messaging_proxy) {}
~ThreadPoolObjectProxy() override = default;
CrossThreadWeakPersistent<ThreadedMessagingProxyBase> MessagingProxyWeakPtr()
override {
return messaging_proxy_;
}
private:
CrossThreadWeakPersistent<ThreadPoolMessagingProxy> messaging_proxy_;
};
class ThreadPoolMessagingProxy final : public ThreadedMessagingProxyBase {
public:
explicit ThreadPoolMessagingProxy(ExecutionContext* context)
: ThreadedMessagingProxyBase(context) {
object_proxy_ = std::make_unique<ThreadPoolObjectProxy>(
this, GetParentExecutionContextTaskRunners());
}
~ThreadPoolMessagingProxy() override = default;
void StartWorker(std::unique_ptr<GlobalScopeCreationParams> creation_params) {
InitializeWorkerThread(std::move(creation_params),
WorkerBackingThreadStartupData::CreateDefault());
}
std::unique_ptr<WorkerThread> CreateWorkerThread() override {
return std::make_unique<ThreadPoolThread>(
GetExecutionContext(), *object_proxy_.get(), ThreadPoolThread::kWorker);
}
ThreadPoolThread* GetWorkerThread() const {
return static_cast<ThreadPoolThread*>(
ThreadedMessagingProxyBase::GetWorkerThread());
}
private:
std::unique_ptr<ThreadPoolObjectProxy> object_proxy_;
};
service_manager::mojom::blink::InterfaceProviderPtrInfo
ConnectToWorkerInterfaceProviderForThreadPool(
ExecutionContext* execution_context,
scoped_refptr<const SecurityOrigin> script_origin) {
// TODO(japhet): Implement a proper factory.
mojom::blink::DedicatedWorkerFactoryPtr worker_factory;
execution_context->GetInterfaceProvider()->GetInterface(&worker_factory);
service_manager::mojom::blink::InterfaceProviderPtrInfo
interface_provider_ptr;
worker_factory->CreateDedicatedWorker(
script_origin, mojo::MakeRequest(&interface_provider_ptr));
return interface_provider_ptr;
}
const char ThreadPool::kSupplementName[] = "ThreadPool";
ThreadPool* ThreadPool::From(Document& document) {
ThreadPool* thread_pool = Supplement<Document>::From<ThreadPool>(document);
if (!thread_pool) {
thread_pool = new ThreadPool(document);
Supplement<Document>::ProvideTo(document, thread_pool);
}
return thread_pool;
}
static const size_t kMaxThreadCount = 4;
ThreadPool::ThreadPool(Document& document)
: Supplement<Document>(document), ContextLifecycleObserver(&document) {}
ThreadPool::~ThreadPool() {
DCHECK(IsMainThread());
for (auto proxy : thread_proxies_) {
proxy->ParentObjectDestroyed();
}
}
ThreadPoolThread* ThreadPool::CreateNewThread() {
DCHECK(IsMainThread());
DCHECK_LT(thread_proxies_.size(), kMaxThreadCount);
base::UnguessableToken devtools_worker_token =
GetFrame() ? GetFrame()->GetDevToolsFrameToken()
: base::UnguessableToken::Create();
ExecutionContext* context = GetExecutionContext();
ThreadPoolMessagingProxy* proxy = new ThreadPoolMessagingProxy(context);
std::unique_ptr<WorkerSettings> settings =
std::make_unique<WorkerSettings>(GetFrame()->GetSettings());
// WebWorkerFetchContext is provided later in
// ThreadedMessagingProxyBase::InitializeWorkerThread().
proxy->StartWorker(std::make_unique<GlobalScopeCreationParams>(
context->Url(), mojom::ScriptType::kClassic, context->UserAgent(),
nullptr /* web_worker_fetch_context */,
context->GetContentSecurityPolicy()->Headers(), kReferrerPolicyDefault,
context->GetSecurityOrigin(), context->IsSecureContext(),
context->GetHttpsState(), WorkerClients::Create(),
context->GetSecurityContext().AddressSpace(),
OriginTrialContext::GetTokens(context).get(), devtools_worker_token,
std::move(settings), kV8CacheOptionsDefault,
nullptr /* worklet_module_responses_map */,
ConnectToWorkerInterfaceProviderForThreadPool(
context, context->GetSecurityOrigin())));
thread_proxies_.insert(proxy);
return proxy->GetWorkerThread();
}
ThreadPoolThread* ThreadPool::GetLeastBusyThread() {
DCHECK(IsMainThread());
ThreadPoolThread* least_busy_thread = nullptr;
size_t lowest_task_count = std::numeric_limits<std::size_t>::max();
for (auto proxy : thread_proxies_) {
ThreadPoolThread* current_thread = proxy->GetWorkerThread();
size_t current_task_count = current_thread->GetTasksInProgressCount();
// If there's an idle thread, use it.
if (current_task_count == 0)
return current_thread;
if (current_task_count < lowest_task_count) {
least_busy_thread = current_thread;
lowest_task_count = current_task_count;
}
}
if (thread_proxies_.size() == kMaxThreadCount)
return least_busy_thread;
return CreateNewThread();
}
void ThreadPool::ContextDestroyed(ExecutionContext*) {
DCHECK(IsMainThread());
DCHECK(GetExecutionContext()->IsContextThread());
for (auto proxy : thread_proxies_) {
proxy->TerminateGlobalScope();
}
}
void ThreadPool::Trace(blink::Visitor* visitor) {
Supplement<Document>::Trace(visitor);
ContextLifecycleObserver::Trace(visitor);
visitor->Trace(thread_proxies_);
}
} // namespace blink