| // Copyright 2014 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "third_party/blink/renderer/core/fetch/body_stream_buffer.h" |
| |
| #include <memory> |
| #include "base/auto_reset.h" |
| #include "third_party/blink/renderer/core/execution_context/execution_context.h" |
| #include "third_party/blink/renderer/core/fetch/body.h" |
| #include "third_party/blink/renderer/core/fetch/readable_stream_bytes_consumer.h" |
| #include "third_party/blink/renderer/core/streams/readable_stream.h" |
| #include "third_party/blink/renderer/core/streams/readable_stream_default_controller_wrapper.h" |
| #include "third_party/blink/renderer/core/typed_arrays/dom_array_buffer.h" |
| #include "third_party/blink/renderer/core/typed_arrays/dom_typed_array.h" |
| #include "third_party/blink/renderer/platform/bindings/exception_code.h" |
| #include "third_party/blink/renderer/platform/bindings/exception_state.h" |
| #include "third_party/blink/renderer/platform/bindings/script_state.h" |
| #include "third_party/blink/renderer/platform/bindings/v8_throw_exception.h" |
| #include "third_party/blink/renderer/platform/blob/blob_data.h" |
| #include "third_party/blink/renderer/platform/network/encoded_form_data.h" |
| #include "third_party/blink/renderer/platform/wtf/assertions.h" |
| #include "third_party/blink/renderer/platform/wtf/functional.h" |
| #include "third_party/blink/renderer/platform/wtf/std_lib_extras.h" |
| |
| namespace blink { |
| |
| class BodyStreamBuffer::LoaderClient final |
| : public GarbageCollectedFinalized<LoaderClient>, |
| public ContextLifecycleObserver, |
| public FetchDataLoader::Client { |
| USING_GARBAGE_COLLECTED_MIXIN(LoaderClient); |
| |
| public: |
| LoaderClient(ExecutionContext* execution_context, |
| BodyStreamBuffer* buffer, |
| FetchDataLoader::Client* client) |
| : ContextLifecycleObserver(execution_context), |
| buffer_(buffer), |
| client_(client) {} |
| |
| void DidFetchDataLoadedBlobHandle( |
| scoped_refptr<BlobDataHandle> blob_data_handle) override { |
| buffer_->EndLoading(); |
| client_->DidFetchDataLoadedBlobHandle(std::move(blob_data_handle)); |
| } |
| |
| void DidFetchDataLoadedArrayBuffer(DOMArrayBuffer* array_buffer) override { |
| buffer_->EndLoading(); |
| client_->DidFetchDataLoadedArrayBuffer(array_buffer); |
| } |
| |
| void DidFetchDataLoadedFormData(FormData* form_data) override { |
| buffer_->EndLoading(); |
| client_->DidFetchDataLoadedFormData(form_data); |
| } |
| |
| void DidFetchDataLoadedString(const String& string) override { |
| buffer_->EndLoading(); |
| client_->DidFetchDataLoadedString(string); |
| } |
| |
| void DidFetchDataStartedDataPipe( |
| mojo::ScopedDataPipeConsumerHandle data_pipe) override { |
| client_->DidFetchDataStartedDataPipe(std::move(data_pipe)); |
| } |
| |
| void DidFetchDataLoadedDataPipe() override { |
| buffer_->EndLoading(); |
| client_->DidFetchDataLoadedDataPipe(); |
| } |
| |
| void DidFetchDataLoadedCustomFormat() override { |
| buffer_->EndLoading(); |
| client_->DidFetchDataLoadedCustomFormat(); |
| } |
| |
| void DidFetchDataLoadFailed() override { |
| buffer_->EndLoading(); |
| client_->DidFetchDataLoadFailed(); |
| } |
| |
| void Abort() override { NOTREACHED(); } |
| |
| void Trace(blink::Visitor* visitor) override { |
| visitor->Trace(buffer_); |
| visitor->Trace(client_); |
| ContextLifecycleObserver::Trace(visitor); |
| FetchDataLoader::Client::Trace(visitor); |
| } |
| |
| private: |
| void ContextDestroyed(ExecutionContext*) override { buffer_->StopLoading(); } |
| |
| Member<BodyStreamBuffer> buffer_; |
| Member<FetchDataLoader::Client> client_; |
| DISALLOW_COPY_AND_ASSIGN(LoaderClient); |
| }; |
| |
| BodyStreamBuffer::BodyStreamBuffer(ScriptState* script_state, |
| BytesConsumer* consumer, |
| AbortSignal* signal) |
| : UnderlyingSourceBase(script_state), |
| script_state_(script_state), |
| consumer_(consumer), |
| signal_(signal), |
| made_from_readable_stream_(false) { |
| stream_ = |
| ReadableStream::CreateWithCountQueueingStrategy(script_state_, this, 0); |
| stream_broken_ = !stream_; |
| |
| consumer_->SetClient(this); |
| if (signal) { |
| if (signal->aborted()) { |
| Abort(); |
| } else { |
| signal->AddAlgorithm( |
| WTF::Bind(&BodyStreamBuffer::Abort, WrapWeakPersistent(this))); |
| } |
| } |
| OnStateChange(); |
| } |
| |
| BodyStreamBuffer::BodyStreamBuffer(ScriptState* script_state, |
| ReadableStream* stream) |
| : UnderlyingSourceBase(script_state), |
| script_state_(script_state), |
| stream_(stream), |
| signal_(nullptr), |
| made_from_readable_stream_(true) { |
| DCHECK(stream_); |
| } |
| |
| scoped_refptr<BlobDataHandle> BodyStreamBuffer::DrainAsBlobDataHandle( |
| BytesConsumer::BlobSizePolicy policy, |
| ExceptionState& exception_state) { |
| DCHECK(!IsStreamLockedForDCheck(exception_state)); |
| DCHECK(!IsStreamDisturbedForDCheck(exception_state)); |
| const base::Optional<bool> is_closed = IsStreamClosed(exception_state); |
| if (exception_state.HadException() || is_closed.value()) |
| return nullptr; |
| const base::Optional<bool> is_errored = IsStreamErrored(exception_state); |
| if (exception_state.HadException() || is_errored.value()) |
| return nullptr; |
| |
| if (made_from_readable_stream_) |
| return nullptr; |
| |
| scoped_refptr<BlobDataHandle> blob_data_handle = |
| consumer_->DrainAsBlobDataHandle(policy); |
| if (blob_data_handle) { |
| CloseAndLockAndDisturb(exception_state); |
| if (exception_state.HadException()) |
| return nullptr; |
| return blob_data_handle; |
| } |
| return nullptr; |
| } |
| |
| scoped_refptr<EncodedFormData> BodyStreamBuffer::DrainAsFormData( |
| ExceptionState& exception_state) { |
| DCHECK(!IsStreamLockedForDCheck(exception_state)); |
| DCHECK(!IsStreamDisturbedForDCheck(exception_state)); |
| const base::Optional<bool> is_closed = IsStreamClosed(exception_state); |
| if (exception_state.HadException() || is_closed.value()) |
| return nullptr; |
| const base::Optional<bool> is_errored = IsStreamErrored(exception_state); |
| if (exception_state.HadException() || is_errored.value()) |
| return nullptr; |
| |
| if (made_from_readable_stream_) |
| return nullptr; |
| |
| scoped_refptr<EncodedFormData> form_data = consumer_->DrainAsFormData(); |
| if (form_data) { |
| CloseAndLockAndDisturb(exception_state); |
| if (exception_state.HadException()) |
| return nullptr; |
| return form_data; |
| } |
| return nullptr; |
| } |
| |
| void BodyStreamBuffer::StartLoading(FetchDataLoader* loader, |
| FetchDataLoader::Client* client, |
| ExceptionState& exception_state) { |
| DCHECK(!loader_); |
| DCHECK(script_state_->ContextIsValid()); |
| loader_ = loader; |
| if (signal_) { |
| if (signal_->aborted()) { |
| client->Abort(); |
| return; |
| } |
| signal_->AddAlgorithm( |
| WTF::Bind(&FetchDataLoader::Client::Abort, WrapWeakPersistent(client))); |
| } |
| auto* handle = ReleaseHandle(exception_state); |
| if (exception_state.HadException()) |
| return; |
| loader->Start(handle, new LoaderClient(ExecutionContext::From(script_state_), |
| this, client)); |
| } |
| |
| void BodyStreamBuffer::Tee(BodyStreamBuffer** branch1, |
| BodyStreamBuffer** branch2, |
| ExceptionState& exception_state) { |
| DCHECK(!IsStreamLockedForDCheck(exception_state)); |
| DCHECK(!IsStreamDisturbedForDCheck(exception_state)); |
| *branch1 = nullptr; |
| *branch2 = nullptr; |
| |
| if (made_from_readable_stream_) { |
| if (stream_broken_) { |
| // We don't really know what state the stream is in, so throw an exception |
| // rather than making things worse. |
| exception_state.ThrowDOMException( |
| DOMExceptionCode::kInvalidStateError, |
| "Unsafe to tee stream in unknown state"); |
| return; |
| } |
| ReadableStream* stream1 = nullptr; |
| ReadableStream* stream2 = nullptr; |
| |
| stream_->Tee(script_state_, &stream1, &stream2, exception_state); |
| if (exception_state.HadException()) { |
| stream_broken_ = true; |
| return; |
| } |
| |
| *branch1 = new BodyStreamBuffer(script_state_, stream1); |
| *branch2 = new BodyStreamBuffer(script_state_, stream2); |
| return; |
| } |
| BytesConsumer* dest1 = nullptr; |
| BytesConsumer* dest2 = nullptr; |
| auto* handle = ReleaseHandle(exception_state); |
| if (exception_state.HadException()) { |
| stream_broken_ = true; |
| return; |
| } |
| BytesConsumer::Tee(ExecutionContext::From(script_state_), handle, &dest1, |
| &dest2); |
| *branch1 = new BodyStreamBuffer(script_state_, dest1, signal_); |
| *branch2 = new BodyStreamBuffer(script_state_, dest2, signal_); |
| } |
| |
| ScriptPromise BodyStreamBuffer::pull(ScriptState* script_state) { |
| DCHECK_EQ(script_state, script_state_); |
| if (!consumer_) { |
| // This is a speculative workaround for a crash. See |
| // https://crbug.com/773525. |
| // TODO(yhirano): Remove this branch or have a better comment. |
| return ScriptPromise::CastUndefined(script_state); |
| } |
| |
| if (stream_needs_more_) |
| return ScriptPromise::CastUndefined(script_state); |
| stream_needs_more_ = true; |
| if (!in_process_data_) |
| ProcessData(); |
| return ScriptPromise::CastUndefined(script_state); |
| } |
| |
| ScriptPromise BodyStreamBuffer::Cancel(ScriptState* script_state, |
| ScriptValue reason) { |
| DCHECK_EQ(script_state, script_state_); |
| if (Controller()) |
| Controller()->Close(); |
| CancelConsumer(); |
| return ScriptPromise::CastUndefined(script_state); |
| } |
| |
| void BodyStreamBuffer::OnStateChange() { |
| if (!consumer_ || !GetExecutionContext() || |
| GetExecutionContext()->IsContextDestroyed()) |
| return; |
| |
| switch (consumer_->GetPublicState()) { |
| case BytesConsumer::PublicState::kReadableOrWaiting: |
| break; |
| case BytesConsumer::PublicState::kClosed: |
| Close(); |
| return; |
| case BytesConsumer::PublicState::kErrored: |
| GetError(); |
| return; |
| } |
| ProcessData(); |
| } |
| |
| bool BodyStreamBuffer::HasPendingActivity() const { |
| if (loader_) |
| return true; |
| return UnderlyingSourceBase::HasPendingActivity(); |
| } |
| |
| void BodyStreamBuffer::ContextDestroyed(ExecutionContext* destroyed_context) { |
| CancelConsumer(); |
| UnderlyingSourceBase::ContextDestroyed(destroyed_context); |
| } |
| |
| base::Optional<bool> BodyStreamBuffer::IsStreamReadable( |
| ExceptionState& exception_state) { |
| return BooleanStreamOperation(&ReadableStream::IsReadable, exception_state); |
| } |
| |
| base::Optional<bool> BodyStreamBuffer::IsStreamClosed( |
| ExceptionState& exception_state) { |
| return BooleanStreamOperation(&ReadableStream::IsClosed, exception_state); |
| } |
| |
| base::Optional<bool> BodyStreamBuffer::IsStreamErrored( |
| ExceptionState& exception_state) { |
| return BooleanStreamOperation(&ReadableStream::IsErrored, exception_state); |
| } |
| |
| base::Optional<bool> BodyStreamBuffer::IsStreamLocked( |
| ExceptionState& exception_state) { |
| return BooleanStreamOperation(&ReadableStream::IsLocked, exception_state); |
| } |
| |
| bool BodyStreamBuffer::IsStreamLockedForDCheck( |
| ExceptionState& exception_state) { |
| auto result = IsStreamLocked(exception_state); |
| return !result || *result; |
| } |
| |
| base::Optional<bool> BodyStreamBuffer::IsStreamDisturbed( |
| ExceptionState& exception_state) { |
| return BooleanStreamOperation(&ReadableStream::IsDisturbed, exception_state); |
| } |
| |
| bool BodyStreamBuffer::IsStreamDisturbedForDCheck( |
| ExceptionState& exception_state) { |
| auto result = IsStreamDisturbed(exception_state); |
| return !result || *result; |
| } |
| |
| void BodyStreamBuffer::CloseAndLockAndDisturb(ExceptionState& exception_state) { |
| if (stream_broken_) { |
| exception_state.ThrowDOMException( |
| DOMExceptionCode::kInvalidStateError, |
| "Body stream has suffered a fatal error and cannot be disturbed"); |
| return; |
| } |
| |
| if (stream_->IsInternalStreamMissing()) { |
| stream_broken_ = true; |
| exception_state.ThrowDOMException( |
| DOMExceptionCode::kInvalidStateError, |
| "Body stream has been lost and cannot be disturbed"); |
| return; |
| } |
| |
| base::Optional<bool> is_readable = IsStreamReadable(exception_state); |
| if (exception_state.HadException()) |
| return; |
| |
| DCHECK(is_readable.has_value()); |
| if (is_readable.value()) { |
| // Note that the stream cannot be "draining", because it doesn't have |
| // the internal buffer. |
| Close(); |
| } |
| DCHECK(!stream_broken_); |
| |
| stream_->LockAndDisturb(script_state_, exception_state); |
| } |
| |
| bool BodyStreamBuffer::IsAborted() { |
| if (!signal_) |
| return false; |
| return signal_->aborted(); |
| } |
| |
| void BodyStreamBuffer::Trace(blink::Visitor* visitor) { |
| visitor->Trace(script_state_); |
| visitor->Trace(stream_); |
| visitor->Trace(consumer_); |
| visitor->Trace(loader_); |
| visitor->Trace(signal_); |
| UnderlyingSourceBase::Trace(visitor); |
| } |
| |
| void BodyStreamBuffer::Abort() { |
| if (!Controller()) { |
| DCHECK(!GetExecutionContext()); |
| DCHECK(!consumer_); |
| return; |
| } |
| Controller()->GetError(DOMException::Create(DOMExceptionCode::kAbortError)); |
| CancelConsumer(); |
| } |
| |
| void BodyStreamBuffer::Close() { |
| // Close() can be called during construction, in which case Controller() |
| // will not be set yet. |
| if (Controller()) |
| Controller()->Close(); |
| CancelConsumer(); |
| } |
| |
| void BodyStreamBuffer::GetError() { |
| { |
| ScriptState::Scope scope(script_state_); |
| Controller()->GetError(V8ThrowException::CreateTypeError( |
| script_state_->GetIsolate(), "network error")); |
| } |
| CancelConsumer(); |
| } |
| |
| void BodyStreamBuffer::CancelConsumer() { |
| if (consumer_) { |
| consumer_->Cancel(); |
| consumer_ = nullptr; |
| } |
| } |
| |
| void BodyStreamBuffer::ProcessData() { |
| DCHECK(consumer_); |
| DCHECK(!in_process_data_); |
| |
| base::AutoReset<bool> auto_reset(&in_process_data_, true); |
| while (stream_needs_more_) { |
| const char* buffer = nullptr; |
| size_t available = 0; |
| auto result = consumer_->BeginRead(&buffer, &available); |
| if (result == BytesConsumer::Result::kShouldWait) |
| return; |
| DOMUint8Array* array = nullptr; |
| if (result == BytesConsumer::Result::kOk) { |
| array = |
| DOMUint8Array::Create(reinterpret_cast<const unsigned char*>(buffer), |
| SafeCast<uint32_t>(available)); |
| result = consumer_->EndRead(available); |
| } |
| switch (result) { |
| case BytesConsumer::Result::kOk: |
| case BytesConsumer::Result::kDone: |
| if (array) { |
| // Clear m_streamNeedsMore in order to detect a pull call. |
| stream_needs_more_ = false; |
| Controller()->Enqueue(array); |
| } |
| if (result == BytesConsumer::Result::kDone) { |
| Close(); |
| return; |
| } |
| // If m_streamNeedsMore is true, it means that pull is called and |
| // the stream needs more data even if the desired size is not |
| // positive. |
| if (!stream_needs_more_) |
| stream_needs_more_ = Controller()->DesiredSize() > 0; |
| break; |
| case BytesConsumer::Result::kShouldWait: |
| NOTREACHED(); |
| return; |
| case BytesConsumer::Result::kError: |
| GetError(); |
| return; |
| } |
| } |
| } |
| |
| void BodyStreamBuffer::EndLoading() { |
| DCHECK(loader_); |
| loader_ = nullptr; |
| } |
| |
| void BodyStreamBuffer::StopLoading() { |
| if (!loader_) |
| return; |
| loader_->Cancel(); |
| loader_ = nullptr; |
| } |
| |
| base::Optional<bool> BodyStreamBuffer::BooleanStreamOperation( |
| base::Optional<bool> (ReadableStream::*predicate)(ScriptState*, |
| ExceptionState&) const, |
| ExceptionState& exception_state) { |
| if (stream_broken_) { |
| exception_state.ThrowDOMException( |
| DOMExceptionCode::kInvalidStateError, |
| "Body stream has suffered a fatal error and cannot be inspected"); |
| return base::nullopt; |
| } |
| ScriptState::Scope scope(script_state_); |
| base::Optional<bool> result = |
| (stream_->*predicate)(script_state_, exception_state); |
| if (exception_state.HadException()) { |
| stream_broken_ = true; |
| return base::nullopt; |
| } |
| return result; |
| } |
| |
| BytesConsumer* BodyStreamBuffer::ReleaseHandle( |
| ExceptionState& exception_state) { |
| DCHECK(!IsStreamLockedForDCheck(exception_state)); |
| DCHECK(!IsStreamDisturbedForDCheck(exception_state)); |
| |
| if (stream_broken_) { |
| exception_state.ThrowDOMException( |
| DOMExceptionCode::kInvalidStateError, |
| "Body stream has suffered a fatal error and cannot be inspected"); |
| return nullptr; |
| } |
| |
| if (made_from_readable_stream_) { |
| ScriptState::Scope scope(script_state_); |
| // We need to have |reader| alive by some means (as noted in |
| // ReadableStreamDataConsumerHandle). Based on the following facts: |
| // - This function is used only from Tee and StartLoading. |
| // - This branch cannot be taken when called from Tee. |
| // - StartLoading makes HasPendingActivity return true while loading. |
| // - ReadableStream holds a reference to |reader| inside JS. |
| // we don't need to keep the reader explicitly. |
| ScriptValue reader = stream_->getReader(script_state_, exception_state); |
| if (exception_state.HadException()) { |
| stream_broken_ = true; |
| return nullptr; |
| } |
| return new ReadableStreamBytesConsumer(script_state_, reader); |
| } |
| // We need to call these before calling CloseAndLockAndDisturb. |
| const base::Optional<bool> is_closed = IsStreamClosed(exception_state); |
| if (exception_state.HadException()) |
| return nullptr; |
| const base::Optional<bool> is_errored = IsStreamErrored(exception_state); |
| if (exception_state.HadException()) |
| return nullptr; |
| |
| BytesConsumer* consumer = consumer_.Release(); |
| |
| CloseAndLockAndDisturb(exception_state); |
| if (exception_state.HadException()) |
| return nullptr; |
| |
| if (is_closed.value()) { |
| // Note that the stream cannot be "draining", because it doesn't have |
| // the internal buffer. |
| return BytesConsumer::CreateClosed(); |
| } |
| if (is_errored.value()) |
| return BytesConsumer::CreateErrored(BytesConsumer::Error("error")); |
| |
| DCHECK(consumer); |
| consumer->ClearClient(); |
| return consumer; |
| } |
| |
| } // namespace blink |