blob: 6b755eb52f91cc8d092d3f91f31b1b42b7e84889 [file] [log] [blame]
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "third_party/blink/renderer/core/fetch/readable_stream_bytes_consumer.h"
#include <string.h>
#include <algorithm>
#include "third_party/blink/renderer/bindings/core/v8/script_function.h"
#include "third_party/blink/renderer/bindings/core/v8/script_value.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_iterator_result_value.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_uint8_array.h"
#include "third_party/blink/renderer/core/streams/readable_stream_operations.h"
#include "third_party/blink/renderer/platform/bindings/scoped_persistent.h"
#include "third_party/blink/renderer/platform/bindings/script_state.h"
#include "third_party/blink/renderer/platform/bindings/v8_binding_macros.h"
#include "third_party/blink/renderer/platform/wtf/assertions.h"
#include "third_party/blink/renderer/platform/wtf/text/wtf_string.h"
#include "v8/include/v8.h"
namespace blink {
class ReadableStreamBytesConsumer::OnFulfilled final : public ScriptFunction {
public:
static v8::Local<v8::Function> CreateFunction(
ScriptState* script_state,
ReadableStreamBytesConsumer* consumer) {
return (new OnFulfilled(script_state, consumer))->BindToV8Function();
}
ScriptValue Call(ScriptValue v) override {
bool done;
v8::Local<v8::Value> item = v.V8Value();
if (!item->IsObject()) {
consumer_->OnRejected();
return ScriptValue();
}
v8::Local<v8::Value> value;
if (!V8UnpackIteratorResult(v.GetScriptState(), item.As<v8::Object>(),
&done)
.ToLocal(&value)) {
consumer_->OnRejected();
return ScriptValue();
}
if (done) {
consumer_->OnReadDone();
return v;
}
if (!value->IsUint8Array()) {
consumer_->OnRejected();
return ScriptValue();
}
consumer_->OnRead(V8Uint8Array::ToImpl(value.As<v8::Object>()));
return v;
}
void Trace(blink::Visitor* visitor) override {
visitor->Trace(consumer_);
ScriptFunction::Trace(visitor);
}
private:
OnFulfilled(ScriptState* script_state, ReadableStreamBytesConsumer* consumer)
: ScriptFunction(script_state), consumer_(consumer) {}
Member<ReadableStreamBytesConsumer> consumer_;
};
class ReadableStreamBytesConsumer::OnRejected final : public ScriptFunction {
public:
static v8::Local<v8::Function> CreateFunction(
ScriptState* script_state,
ReadableStreamBytesConsumer* consumer) {
return (new OnRejected(script_state, consumer))->BindToV8Function();
}
ScriptValue Call(ScriptValue v) override {
consumer_->OnRejected();
return v;
}
void Trace(blink::Visitor* visitor) override {
visitor->Trace(consumer_);
ScriptFunction::Trace(visitor);
}
private:
OnRejected(ScriptState* script_state, ReadableStreamBytesConsumer* consumer)
: ScriptFunction(script_state), consumer_(consumer) {}
Member<ReadableStreamBytesConsumer> consumer_;
};
ReadableStreamBytesConsumer::ReadableStreamBytesConsumer(
ScriptState* script_state,
ScriptValue stream_reader)
: reader_(script_state->GetIsolate(), stream_reader.V8Value()),
script_state_(script_state) {
reader_.SetPhantom();
}
ReadableStreamBytesConsumer::~ReadableStreamBytesConsumer() {}
BytesConsumer::Result ReadableStreamBytesConsumer::BeginRead(
const char** buffer,
size_t* available) {
*buffer = nullptr;
*available = 0;
if (state_ == PublicState::kErrored)
return Result::kError;
if (state_ == PublicState::kClosed)
return Result::kDone;
if (pending_buffer_) {
DCHECK_LE(pending_offset_, pending_buffer_->length());
*buffer = reinterpret_cast<const char*>(pending_buffer_->Data()) +
pending_offset_;
*available = pending_buffer_->length() - pending_offset_;
return Result::kOk;
}
if (!is_reading_) {
is_reading_ = true;
ScriptState::Scope scope(script_state_.get());
ScriptValue reader(script_state_.get(),
reader_.NewLocal(script_state_->GetIsolate()));
// The owner must retain the reader.
DCHECK(!reader.IsEmpty());
ReadableStreamOperations::DefaultReaderRead(script_state_.get(), reader)
.Then(OnFulfilled::CreateFunction(script_state_.get(), this),
OnRejected::CreateFunction(script_state_.get(), this));
}
return Result::kShouldWait;
}
BytesConsumer::Result ReadableStreamBytesConsumer::EndRead(size_t read_size) {
DCHECK(pending_buffer_);
DCHECK_LE(pending_offset_ + read_size, pending_buffer_->length());
pending_offset_ += read_size;
if (pending_offset_ >= pending_buffer_->length()) {
pending_buffer_ = nullptr;
pending_offset_ = 0;
}
return Result::kOk;
}
void ReadableStreamBytesConsumer::SetClient(Client* client) {
DCHECK(!client_);
DCHECK(client);
client_ = client;
}
void ReadableStreamBytesConsumer::ClearClient() {
client_ = nullptr;
}
void ReadableStreamBytesConsumer::Cancel() {
if (state_ == PublicState::kClosed || state_ == PublicState::kErrored)
return;
state_ = PublicState::kClosed;
ClearClient();
reader_.Clear();
}
BytesConsumer::PublicState ReadableStreamBytesConsumer::GetPublicState() const {
return state_;
}
BytesConsumer::Error ReadableStreamBytesConsumer::GetError() const {
return Error("Failed to read from a ReadableStream.");
}
void ReadableStreamBytesConsumer::Trace(blink::Visitor* visitor) {
visitor->Trace(client_);
visitor->Trace(pending_buffer_);
BytesConsumer::Trace(visitor);
}
void ReadableStreamBytesConsumer::Dispose() {
reader_.Clear();
}
void ReadableStreamBytesConsumer::OnRead(DOMUint8Array* buffer) {
DCHECK(is_reading_);
DCHECK(buffer);
DCHECK(!pending_buffer_);
DCHECK(!pending_offset_);
is_reading_ = false;
if (state_ == PublicState::kClosed)
return;
DCHECK_EQ(state_, PublicState::kReadableOrWaiting);
pending_buffer_ = buffer;
if (client_)
client_->OnStateChange();
}
void ReadableStreamBytesConsumer::OnReadDone() {
DCHECK(is_reading_);
DCHECK(!pending_buffer_);
is_reading_ = false;
if (state_ == PublicState::kClosed)
return;
DCHECK_EQ(state_, PublicState::kReadableOrWaiting);
state_ = PublicState::kClosed;
reader_.Clear();
Client* client = client_;
ClearClient();
if (client)
client->OnStateChange();
}
void ReadableStreamBytesConsumer::OnRejected() {
DCHECK(is_reading_);
DCHECK(!pending_buffer_);
is_reading_ = false;
if (state_ == PublicState::kClosed)
return;
DCHECK_EQ(state_, PublicState::kReadableOrWaiting);
state_ = PublicState::kErrored;
reader_.Clear();
Client* client = client_;
ClearClient();
if (client)
client->OnStateChange();
}
} // namespace blink