blob: 567771f2df4b46dfe3ef7009e5493abb1307db7c [file] [log] [blame]
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "components/devtools_bridge/session_dependency_factory.h"
#include "base/bind.h"
#include "base/location.h"
#include "base/task_runner.h"
#include "base/threading/thread.h"
#include "components/devtools_bridge/abstract_data_channel.h"
#include "components/devtools_bridge/abstract_peer_connection.h"
#include "components/devtools_bridge/rtc_configuration.h"
#include "third_party/libjingle/source/talk/app/webrtc/mediaconstraintsinterface.h"
#include "third_party/libjingle/source/talk/app/webrtc/peerconnectioninterface.h"
#include "third_party/webrtc/base/bind.h"
#include "third_party/webrtc/base/messagehandler.h"
#include "third_party/webrtc/base/messagequeue.h"
#include "third_party/webrtc/base/ssladapter.h"
#include "third_party/webrtc/base/thread.h"
namespace devtools_bridge {
class RTCConfiguration::Impl
: public RTCConfiguration,
public webrtc::PeerConnectionInterface::RTCConfiguration {
public:
void AddIceServer(const std::string& uri,
const std::string& username,
const std::string& credential) override {
webrtc::PeerConnectionInterface::IceServer server;
server.uri = uri;
server.username = username;
server.password = credential;
servers.push_back(server);
}
const Impl& impl() const override {
return *this;
}
private:
webrtc::PeerConnectionInterface::RTCConfiguration base_;
};
namespace {
template <typename T>
void CheckedRelease(rtc::scoped_refptr<T>* ptr) {
CHECK_EQ(0, ptr->release()->Release());
}
class MediaConstraints
: public webrtc::MediaConstraintsInterface {
public:
~MediaConstraints() override {}
const Constraints& GetMandatory() const override { return mandatory_; }
const Constraints& GetOptional() const override { return optional_; }
void AddMandatory(const std::string& key, const std::string& value) {
mandatory_.push_back(Constraint(key, value));
}
private:
Constraints mandatory_;
Constraints optional_;
};
/**
* Posts tasks on signaling thread. If stopped (when SesseionDependencyFactry
* is destroying) ignores posted tasks.
*/
class SignalingThreadTaskRunner : public base::TaskRunner,
private rtc::MessageHandler {
public:
explicit SignalingThreadTaskRunner(rtc::Thread* thread) : thread_(thread) {}
bool PostDelayedTask(const tracked_objects::Location& from_here,
const base::Closure& task,
base::TimeDelta delay) override {
DCHECK(delay.ToInternalValue() == 0);
rtc::CritScope scope(&critical_section_);
if (thread_)
thread_->Send(this, 0, new Task(task));
return true;
}
bool RunsTasksOnCurrentThread() const override {
rtc::CritScope scope(&critical_section_);
return thread_ != NULL && thread_->IsCurrent();
}
void Stop() {
rtc::CritScope scope(&critical_section_);
thread_ = NULL;
}
private:
typedef rtc::TypedMessageData<base::Closure> Task;
~SignalingThreadTaskRunner() override {}
void OnMessage(rtc::Message* msg) override {
static_cast<Task*>(msg->pdata)->data().Run();
}
mutable rtc::CriticalSection critical_section_;
rtc::Thread* thread_; // Guarded by |critical_section_|.
};
class DataChannelObserverImpl : public webrtc::DataChannelObserver {
public:
DataChannelObserverImpl(
webrtc::DataChannelInterface* data_channel,
scoped_ptr<AbstractDataChannel::Observer> observer)
: data_channel_(data_channel),
observer_(observer.Pass()) {
}
void InitState() {
open_ = data_channel_->state() == webrtc::DataChannelInterface::kOpen;
}
void OnStateChange() override {
bool open = data_channel_->state() == webrtc::DataChannelInterface::kOpen;
if (open == open_) return;
open_ = open;
if (open) {
observer_->OnOpen();
} else {
observer_->OnClose();
}
}
void OnMessage(const webrtc::DataBuffer& buffer) override {
observer_->OnMessage(buffer.data.data(), buffer.size());
}
private:
webrtc::DataChannelInterface* const data_channel_;
scoped_ptr<AbstractDataChannel::Observer> const observer_;
bool open_;
};
/**
* Thread-safe view on AbstractDataChannel.
*/
class DataChannelProxyImpl : public AbstractDataChannel::Proxy {
public:
DataChannelProxyImpl(
SessionDependencyFactory* factory,
rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel)
: data_channel_(data_channel),
signaling_thread_task_runner_(
factory->signaling_thread_task_runner()) {
}
void StopOnSignalingThread() {
data_channel_ = NULL;
}
void SendBinaryMessage(const void* data, size_t length) override {
auto buffer = make_scoped_ptr(new webrtc::DataBuffer(rtc::Buffer(), true));
buffer->data.SetData(data, length);
signaling_thread_task_runner_->PostTask(
FROM_HERE, base::Bind(
&DataChannelProxyImpl::SendMessageOnSignalingThread,
this,
base::Passed(&buffer)));
}
void Close() override {
signaling_thread_task_runner_->PostTask(
FROM_HERE, base::Bind(&DataChannelProxyImpl::CloseOnSignalingThread,
this));
}
private:
~DataChannelProxyImpl() override {}
void SendMessageOnSignalingThread(scoped_ptr<webrtc::DataBuffer> message) {
if (data_channel_ != NULL)
data_channel_->Send(*message);
}
void CloseOnSignalingThread() {
if (data_channel_ != NULL)
data_channel_->Close();
}
// Accessed on signaling thread.
rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel_;
const scoped_refptr<base::TaskRunner> signaling_thread_task_runner_;
};
class DataChannelImpl : public AbstractDataChannel {
public:
DataChannelImpl(
SessionDependencyFactory* factory,
rtc::Thread* const signaling_thread,
rtc::scoped_refptr<webrtc::DataChannelInterface> impl)
: factory_(factory),
signaling_thread_(signaling_thread),
impl_(impl) {
}
~DataChannelImpl() override {
if (proxy_.get()) {
signaling_thread_->Invoke<void>(rtc::Bind(
&DataChannelProxyImpl::StopOnSignalingThread, proxy_.get()));
}
}
void RegisterObserver(scoped_ptr<Observer> observer) override {
observer_.reset(new DataChannelObserverImpl(impl_.get(), observer.Pass()));
signaling_thread_->Invoke<void>(rtc::Bind(
&DataChannelImpl::RegisterObserverOnSignalingThread, this));
}
void UnregisterObserver() override {
DCHECK(observer_.get() != NULL);
impl_->UnregisterObserver();
observer_.reset();
}
void SendBinaryMessage(void* data, size_t length) override {
SendMessage(data, length, true);
}
void SendTextMessage(void* data, size_t length) override {
SendMessage(data, length, false);
}
void SendMessage(void* data, size_t length, bool is_binary) {
impl_->Send(webrtc::DataBuffer(rtc::Buffer(data, length), is_binary));
}
void Close() override {
impl_->Close();
}
scoped_refptr<Proxy> proxy() override {
if (!proxy_.get())
proxy_ = new DataChannelProxyImpl(factory_, impl_);
return proxy_;
}
private:
void RegisterObserverOnSignalingThread() {
// State initialization and observer registration happen atomically
// if done on the signaling thread (see rtc::Thread::Send).
observer_->InitState();
impl_->RegisterObserver(observer_.get());
}
SessionDependencyFactory* const factory_;
scoped_refptr<DataChannelProxyImpl> proxy_;
rtc::Thread* const signaling_thread_;
scoped_ptr<DataChannelObserverImpl> observer_;
const rtc::scoped_refptr<webrtc::DataChannelInterface> impl_;
};
class PeerConnectionObserverImpl
: public webrtc::PeerConnectionObserver {
public:
PeerConnectionObserverImpl(AbstractPeerConnection::Delegate* delegate)
: delegate_(delegate),
connected_(false) {
}
void OnAddStream(webrtc::MediaStreamInterface* stream) override {}
void OnRemoveStream(webrtc::MediaStreamInterface* stream) override {}
void OnDataChannel(webrtc::DataChannelInterface* data_channel) override {}
void OnRenegotiationNeeded() override {}
void OnSignalingChange(
webrtc::PeerConnectionInterface::SignalingState new_state) override {}
void OnIceConnectionChange(
webrtc::PeerConnectionInterface::IceConnectionState new_state) override {
bool connected =
new_state == webrtc::PeerConnectionInterface::kIceConnectionConnected ||
new_state == webrtc::PeerConnectionInterface::kIceConnectionCompleted;
if (connected != connected_) {
connected_ = connected;
delegate_->OnIceConnectionChange(connected_);
}
}
void OnIceCandidate(const webrtc::IceCandidateInterface* candidate) override {
std::string sdp;
candidate->ToString(&sdp);
delegate_->OnIceCandidate(
candidate->sdp_mid(), candidate->sdp_mline_index(), sdp);
}
private:
AbstractPeerConnection::Delegate* const delegate_;
bool connected_;
};
/**
* Helper object which may outlive PeerConnectionImpl. Provides access
* to the connection and the delegate to operaion callback objects
* in a safe way. Always accessible on the signaling thread.
*/
class PeerConnectionHolder : public rtc::RefCountInterface {
public:
PeerConnectionHolder(
rtc::Thread* signaling_thread,
webrtc::PeerConnectionInterface* connection,
AbstractPeerConnection::Delegate* delegate)
: signaling_thread_(signaling_thread),
connection_(connection),
delegate_(delegate),
disposed_(false) {
}
~PeerConnectionHolder() override { DCHECK(disposed_); }
void Dispose() {
DCHECK(!IsDisposed());
disposed_ = true;
}
webrtc::PeerConnectionInterface* connection() {
DCHECK(!IsDisposed());
return connection_;
}
AbstractPeerConnection::Delegate* delegate() {
DCHECK(!IsDisposed());
return delegate_;
}
bool IsDisposed() {
DCHECK(signaling_thread_->IsCurrent());
return disposed_;
}
private:
rtc::Thread* const signaling_thread_;
webrtc::PeerConnectionInterface* const connection_;
AbstractPeerConnection::Delegate* const delegate_;
bool disposed_;
};
class CreateAndSetHandler
: public webrtc::CreateSessionDescriptionObserver,
public webrtc::SetSessionDescriptionObserver {
public:
explicit CreateAndSetHandler(
rtc::scoped_refptr<PeerConnectionHolder> holder)
: holder_(holder) {
}
void OnSuccess(webrtc::SessionDescriptionInterface* desc) override {
if (holder_->IsDisposed()) return;
type_ = desc->type();
if (desc->ToString(&description_)) {
holder_->connection()->SetLocalDescription(this, desc);
} else {
OnFailure("Can't serialize session description");
}
}
void OnSuccess() override {
if (holder_->IsDisposed()) return;
if (type_ == webrtc::SessionDescriptionInterface::kOffer) {
holder_->delegate()->OnLocalOfferCreatedAndSetSet(description_);
} else {
DCHECK_EQ(webrtc::SessionDescriptionInterface::kAnswer, type_);
holder_->delegate()->OnLocalAnswerCreatedAndSetSet(description_);
}
}
void OnFailure(const std::string& error) override {
if (holder_->IsDisposed()) return;
holder_->delegate()->OnFailure(error);
}
private:
const rtc::scoped_refptr<PeerConnectionHolder> holder_;
std::string type_;
std::string description_;
};
class SetRemoteDescriptionHandler
: public webrtc::SetSessionDescriptionObserver {
public:
SetRemoteDescriptionHandler(
rtc::scoped_refptr<PeerConnectionHolder> holder)
: holder_(holder) {
}
void OnSuccess() override {
if (holder_->IsDisposed()) return;
holder_->delegate()->OnRemoteDescriptionSet();
}
void OnFailure(const std::string& error) override {
if (holder_->IsDisposed()) return;
holder_->delegate()->OnFailure(error);
}
private:
const rtc::scoped_refptr<PeerConnectionHolder> holder_;
};
class PeerConnectionImpl : public AbstractPeerConnection {
public:
PeerConnectionImpl(
SessionDependencyFactory* const factory,
rtc::Thread* signaling_thread,
rtc::scoped_refptr<webrtc::PeerConnectionInterface> connection,
scoped_ptr<PeerConnectionObserverImpl> observer,
scoped_ptr<AbstractPeerConnection::Delegate> delegate)
: factory_(factory),
holder_(new rtc::RefCountedObject<PeerConnectionHolder>(
signaling_thread, connection.get(), delegate.get())),
signaling_thread_(signaling_thread),
connection_(connection),
observer_(observer.Pass()),
delegate_(delegate.Pass()) {
}
~PeerConnectionImpl() override {
signaling_thread_->Invoke<void>(rtc::Bind(
&PeerConnectionImpl::DisposeOnSignalingThread, this));
}
void CreateAndSetLocalOffer() override {
connection_->CreateOffer(MakeCreateAndSetHandler(), NULL);
}
void CreateAndSetLocalAnswer() override {
connection_->CreateAnswer(MakeCreateAndSetHandler(), NULL);
}
void SetRemoteOffer(const std::string& description) override {
SetRemoteDescription(
webrtc::SessionDescriptionInterface::kOffer, description);
}
void SetRemoteAnswer(const std::string& description) override {
SetRemoteDescription(
webrtc::SessionDescriptionInterface::kAnswer, description);
}
void SetRemoteDescription(
const std::string& type, const std::string& description) {
webrtc::SdpParseError error;
scoped_ptr<webrtc::SessionDescriptionInterface> value(
webrtc::CreateSessionDescription(type, description, &error));
if (value == NULL) {
OnParseError(error);
return;
}
// Takes ownership on |value|.
connection_->SetRemoteDescription(
new rtc::RefCountedObject<SetRemoteDescriptionHandler>(holder_),
value.release());
}
void AddIceCandidate(const std::string& sdp_mid,
int sdp_mline_index,
const std::string& sdp) override {
webrtc::SdpParseError error;
auto candidate = webrtc::CreateIceCandidate(
sdp_mid, sdp_mline_index, sdp, &error);
if (candidate == NULL) {
OnParseError(error);
return;
}
// Doesn't takes ownership.
connection_->AddIceCandidate(candidate);
delete candidate;
}
scoped_ptr<AbstractDataChannel> CreateDataChannel(int channelId) override {
webrtc::DataChannelInit init;
init.ordered = true;
init.negotiated = true;
init.id = channelId;
return make_scoped_ptr(new DataChannelImpl(
factory_,
signaling_thread_,
connection_->CreateDataChannel("", &init)));
}
private:
webrtc::CreateSessionDescriptionObserver* MakeCreateAndSetHandler() {
return new rtc::RefCountedObject<CreateAndSetHandler>(holder_);
}
void DisposeOnSignalingThread() {
DCHECK(signaling_thread_->IsCurrent());
CheckedRelease(&connection_);
holder_->Dispose();
}
void OnParseError(const webrtc::SdpParseError& error) {
// TODO(serya): Send on signaling thread.
}
SessionDependencyFactory* const factory_;
const rtc::scoped_refptr<PeerConnectionHolder> holder_;
rtc::Thread* const signaling_thread_;
rtc::scoped_refptr<webrtc::PeerConnectionInterface> connection_;
const scoped_ptr<PeerConnectionObserverImpl> observer_;
const scoped_ptr<AbstractPeerConnection::Delegate> delegate_;
};
class SessionDependencyFactoryImpl : public SessionDependencyFactory {
public:
SessionDependencyFactoryImpl(
const base::Closure& cleanup_on_signaling_thread)
: cleanup_on_signaling_thread_(cleanup_on_signaling_thread) {
signaling_thread_.SetName("signaling_thread", NULL);
signaling_thread_.Start();
worker_thread_.SetName("worker_thread", NULL);
worker_thread_.Start();
factory_ = webrtc::CreatePeerConnectionFactory(
&worker_thread_, &signaling_thread_, NULL, NULL, NULL);
}
~SessionDependencyFactoryImpl() override {
if (signaling_thread_task_runner_.get())
signaling_thread_task_runner_->Stop();
signaling_thread_.Invoke<void>(rtc::Bind(
&SessionDependencyFactoryImpl::DisposeOnSignalingThread, this));
}
scoped_ptr<AbstractPeerConnection> CreatePeerConnection(
scoped_ptr<RTCConfiguration> config,
scoped_ptr<AbstractPeerConnection::Delegate> delegate) override {
auto observer = make_scoped_ptr(
new PeerConnectionObserverImpl(delegate.get()));
MediaConstraints constraints;
constraints.AddMandatory(
MediaConstraints::kEnableDtlsSrtp, MediaConstraints::kValueTrue);
auto connection = factory_->CreatePeerConnection(
config->impl(), &constraints, NULL, NULL, observer.get());
return make_scoped_ptr(new PeerConnectionImpl(
this, &signaling_thread_, connection, observer.Pass(),
delegate.Pass()));
}
scoped_refptr<base::TaskRunner> signaling_thread_task_runner() override {
if (!signaling_thread_task_runner_.get()) {
signaling_thread_task_runner_ =
new SignalingThreadTaskRunner(&signaling_thread_);
}
return signaling_thread_task_runner_;
}
scoped_refptr<base::TaskRunner> io_thread_task_runner() override {
if (!io_thread_.get()) {
io_thread_.reset(new base::Thread("devtools bridge IO thread"));
base::Thread::Options options;
options.message_loop_type = base::MessageLoop::TYPE_IO;
CHECK(io_thread_->StartWithOptions(options));
}
return io_thread_->task_runner();
}
private:
void DisposeOnSignalingThread() {
DCHECK(signaling_thread_.IsCurrent());
CheckedRelease(&factory_);
if (!cleanup_on_signaling_thread_.is_null())
cleanup_on_signaling_thread_.Run();
}
scoped_ptr<base::Thread> io_thread_;
scoped_refptr<SignalingThreadTaskRunner> signaling_thread_task_runner_;
base::Closure cleanup_on_signaling_thread_;
rtc::Thread signaling_thread_;
rtc::Thread worker_thread_;
rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface> factory_;
};
} // namespace
// RTCCOnfiguration
// static
scoped_ptr<RTCConfiguration> RTCConfiguration::CreateInstance() {
return make_scoped_ptr(new RTCConfiguration::Impl());
}
// SessionDependencyFactory
// static
bool SessionDependencyFactory::InitializeSSL() {
return rtc::InitializeSSL();
}
// static
bool SessionDependencyFactory::CleanupSSL() {
return rtc::CleanupSSL();
}
// static
scoped_ptr<SessionDependencyFactory> SessionDependencyFactory::CreateInstance(
const base::Closure& cleanup_on_signaling_thread) {
return make_scoped_ptr(new SessionDependencyFactoryImpl(
cleanup_on_signaling_thread));
}
} // namespace devtools_bridge