blob: 8ee6d0b8a64ea3d76ba011bf4c04cfaa3cc633da [file] [log] [blame]
// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
(function(global, binding, v8) {
'use strict';
const _reader = v8.createPrivateSymbol('[[reader]]');
const _storedError = v8.createPrivateSymbol('[[storedError]]');
const _controller = v8.createPrivateSymbol('[[controller]]');
const _closedPromise = v8.createPrivateSymbol('[[closedPromise]]');
const _ownerReadableStream =
v8.createPrivateSymbol('[[ownerReadableStream]]');
const _readRequests = v8.createPrivateSymbol('[[readRequests]]');
const createWithExternalControllerSentinel =
v8.createPrivateSymbol('flag for UA-created ReadableStream to pass');
const _readableStreamBits =
v8.createPrivateSymbol('bit field for [[state]] and [[disturbed]]');
const DISTURBED = 0b1;
// The 2nd and 3rd bit are for [[state]].
const STATE_MASK = 0b110;
const STATE_BITS_OFFSET = 1;
const STATE_READABLE = 0;
const STATE_CLOSED = 1;
const STATE_ERRORED = 2;
const _controlledReadableStream =
v8.createPrivateSymbol('[[controlledReadableStream]]');
const _strategyHWM = v8.createPrivateSymbol('[[strategyHWM]]');
const _readableStreamDefaultControllerBits = v8.createPrivateSymbol(
'bit field for [[started]], [[closeRequested]], [[pulling]], ' +
'[[pullAgain]]');
// Remove this once C++ code has been updated to use CreateReadableStream.
const _lockNotifyTarget = v8.createPrivateSymbol('[[lockNotifyTarget]]');
const _strategySizeAlgorithm = v8.createPrivateSymbol(
'[[strategySizeAlgorithm]]');
const _pullAlgorithm = v8.createPrivateSymbol('[[pullAlgorithm]]');
const _cancelAlgorithm = v8.createPrivateSymbol('[[cancelAlgorithm]]');
const STARTED = 0b1;
const CLOSE_REQUESTED = 0b10;
const PULLING = 0b100;
const PULL_AGAIN = 0b1000;
// TODO(ricea): Remove this once blink::UnderlyingSourceBase no longer needs
// it.
const BLINK_LOCK_NOTIFICATIONS = 0b10000;
const defineProperty = global.Object.defineProperty;
const ObjectCreate = global.Object.create;
const callFunction = v8.uncurryThis(global.Function.prototype.call);
const applyFunction = v8.uncurryThis(global.Function.prototype.apply);
const TypeError = global.TypeError;
const RangeError = global.RangeError;
const Boolean = global.Boolean;
const String = global.String;
const Promise = global.Promise;
const thenPromise = v8.uncurryThis(Promise.prototype.then);
const Promise_resolve = Promise.resolve.bind(Promise);
const Promise_reject = Promise.reject.bind(Promise);
// From CommonOperations.js
const {
_queue,
_queueTotalSize,
hasOwnPropertyNoThrow,
rejectPromise,
resolvePromise,
markPromiseAsHandled,
CallOrNoop1,
CreateAlgorithmFromUnderlyingMethod,
CreateAlgorithmFromUnderlyingMethodPassingController,
CreateCrossRealmTransformReadable,
CreateCrossRealmTransformWritable,
DequeueValue,
EnqueueValueWithSize,
MakeSizeAlgorithmFromSizeFunction,
ValidateAndNormalizeHighWaterMark,
} = binding.streamOperations;
const streamErrors = binding.streamErrors;
const errCancelLockedStream =
'Cannot cancel a readable stream that is locked to a reader';
const errEnqueueCloseRequestedStream =
'Cannot enqueue a chunk into a readable stream that is closed or ' +
'has been requested to be closed';
const errCancelReleasedReader =
'This readable stream reader has been released and cannot be used ' +
'to cancel its previous owner stream';
const errReadReleasedReader =
'This readable stream reader has been released and cannot be used ' +
'to read from its previous owner stream';
const errCloseCloseRequestedStream =
'Cannot close a readable stream that has already been requested to ' +
'be closed';
const errEnqueueClosedStream =
'Cannot enqueue a chunk into a closed readable stream';
const errEnqueueErroredStream =
'Cannot enqueue a chunk into an errored readable stream';
const errCloseClosedStream = 'Cannot close a closed readable stream';
const errCloseErroredStream = 'Cannot close an errored readable stream';
const errGetReaderNotByteStream =
'This readable stream does not support BYOB readers';
const errGetReaderBadMode =
'Invalid reader mode given: expected undefined or "byob"';
const errReaderConstructorBadArgument =
'ReadableStreamReader constructor argument is not a readable stream';
const errReaderConstructorStreamAlreadyLocked =
'ReadableStreamReader constructor can only accept readable streams ' +
'that are not yet locked to a reader';
const errReleaseReaderWithPendingRead =
'Cannot release a readable stream reader when it still has ' +
'outstanding read() calls that have not yet settled';
const errReleasedReaderClosedPromise =
'This readable stream reader has been released and cannot be used ' +
'to monitor the stream\'s state';
const errCannotPipeLockedStream = 'Cannot pipe a locked stream';
const errCannotPipeToALockedStream = 'Cannot pipe to a locked stream';
const errDestinationStreamClosed = 'Destination stream closed';
const errPipeThroughUndefinedWritable =
'Failed to execute \'pipeThrough\' on \'ReadableStream\': parameter ' +
'1\'s \'writable\' property is undefined.';
const errPipeThroughUndefinedReadable =
'Failed to execute \'pipeThrough\' on \'ReadableStream\': parameter ' +
'1\'s \'readable\' property is undefined.';
const errCannotTransferLockedStream = 'Cannot transfer a locked stream';
const errCannotTransferUnsupportedContext =
'Cannot transfer from this context';
let useCounted = false;
class ReadableStream {
// TODO(ricea): Remove |internalArgument| once
// blink::ReadableStreamOperations has been updated to use
// CreateReadableStream.
constructor(underlyingSource = {}, strategy = {},
internalArgument = undefined) {
const enableBlinkLockNotifications =
internalArgument === createWithExternalControllerSentinel;
if (!useCounted && !enableBlinkLockNotifications) {
binding.countUse('ReadableStreamConstructor');
useCounted = true;
}
InitializeReadableStream(this);
const size = strategy.size;
let highWaterMark = strategy.highWaterMark;
const type = underlyingSource.type;
const typeString = String(type);
if (typeString === 'bytes') {
throw new RangeError('bytes type is not yet implemented');
}
if (type !== undefined) {
throw new RangeError(streamErrors.invalidType);
}
const sizeAlgorithm = MakeSizeAlgorithmFromSizeFunction(size);
if (highWaterMark === undefined) {
highWaterMark = 1;
}
highWaterMark = ValidateAndNormalizeHighWaterMark(highWaterMark);
SetUpReadableStreamDefaultControllerFromUnderlyingSource(
this, underlyingSource, highWaterMark, sizeAlgorithm,
enableBlinkLockNotifications);
}
get locked() {
if (IsReadableStream(this) === false) {
throw new TypeError(streamErrors.illegalInvocation);
}
return IsReadableStreamLocked(this);
}
cancel(reason) {
if (IsReadableStream(this) === false) {
return Promise_reject(new TypeError(streamErrors.illegalInvocation));
}
if (IsReadableStreamLocked(this) === true) {
return Promise_reject(new TypeError(errCancelLockedStream));
}
return ReadableStreamCancel(this, reason);
}
getReader({mode} = {}) {
if (IsReadableStream(this) === false) {
throw new TypeError(streamErrors.illegalInvocation);
}
if (mode === undefined) {
return AcquireReadableStreamDefaultReader(this);
}
mode = String(mode);
if (mode === 'byob') {
// TODO(ricea): When BYOB readers are supported:
//
// Return ? AcquireReadableStreamBYOBReader(this).
throw new TypeError(errGetReaderNotByteStream);
}
throw new RangeError(errGetReaderBadMode);
}
pipeThrough({writable, readable}, options) {
if (writable === undefined) {
throw new TypeError(errPipeThroughUndefinedWritable);
}
if (readable === undefined) {
throw new TypeError(errPipeThroughUndefinedReadable);
}
const promise = this.pipeTo(writable, options);
if (v8.isPromise(promise)) {
markPromiseAsHandled(promise);
}
return readable;
}
pipeTo(dest, {preventClose, preventAbort, preventCancel} = {}) {
if (!IsReadableStream(this)) {
return Promise_reject(new TypeError(streamErrors.illegalInvocation));
}
if (!binding.IsWritableStream(dest)) {
// TODO(ricea): Think about having a better error message.
return Promise_reject(new TypeError(streamErrors.illegalInvocation));
}
preventClose = Boolean(preventClose);
preventAbort = Boolean(preventAbort);
preventCancel = Boolean(preventCancel);
if (IsReadableStreamLocked(this)) {
return Promise_reject(new TypeError(errCannotPipeLockedStream));
}
if (binding.IsWritableStreamLocked(dest)) {
return Promise_reject(new TypeError(errCannotPipeToALockedStream));
}
return ReadableStreamPipeTo(
this, dest, preventClose, preventAbort, preventCancel);
}
tee() {
if (IsReadableStream(this) === false) {
throw new TypeError(streamErrors.illegalInvocation);
}
return ReadableStreamTee(this);
}
}
const ReadableStream_prototype = ReadableStream.prototype;
function ReadableStreamPipeTo(
readable, dest, preventClose, preventAbort, preventCancel) {
// Callers of this function must ensure that the following invariants
// are enforced:
// assert(IsReadableStream(readable));
// assert(binding.IsWritableStream(dest));
// assert(!IsReadableStreamLocked(readable));
// assert(!binding.IsWritableStreamLocked(dest));
const reader = AcquireReadableStreamDefaultReader(readable);
const writer = binding.AcquireWritableStreamDefaultWriter(dest);
let shuttingDown = false;
const promise = v8.createPromise();
let reading = false;
let lastWrite;
if (checkInitialState()) {
// Need to detect closing and error when we are not reading.
thenPromise(reader[_closedPromise], onReaderClosed, readableError);
// Need to detect error when we are not writing.
thenPromise(
binding.getWritableStreamDefaultWriterClosedPromise(writer),
undefined, writableError);
pump();
}
// Checks the state of the streams and executes the shutdown handlers if
// necessary. Returns true if piping can continue.
function checkInitialState() {
const state = ReadableStreamGetState(readable);
// Both streams can be errored or closed. To perform the right action the
// order of the checks must match the standard.
if (state === STATE_ERRORED) {
readableError(readable[_storedError]);
return false;
}
if (binding.isWritableStreamErrored(dest)) {
writableError(binding.getWritableStreamStoredError(dest));
return false;
}
if (state === STATE_CLOSED) {
readableClosed();
return false;
}
if (binding.isWritableStreamClosingOrClosed(dest)) {
writableStartedClosed();
return false;
}
return true;
}
function pump() {
if (shuttingDown) {
return;
}
const desiredSize =
binding.WritableStreamDefaultWriterGetDesiredSize(writer);
if (desiredSize === null) {
// This can happen if abort() is queued but not yet started when
// pipeTo() is called. In that case [[storedError]] is not set yet, and
// we need to wait until it is before we can cancel the pipe. Once
// [[storedError]] has been set, the rejection handler set on the writer
// closed promise above will detect it, so all we need to do here is
// nothing.
return;
}
if (desiredSize <= 0) {
thenPromise(
binding.getWritableStreamDefaultWriterReadyPromise(writer), pump,
writableError);
return;
}
reading = true;
thenPromise(
ReadableStreamDefaultReaderRead(reader), readFulfilled, readRejected);
}
function readFulfilled({value, done}) {
reading = false;
if (done) {
readableClosed();
return;
}
const write = binding.WritableStreamDefaultWriterWrite(writer, value);
lastWrite = write;
thenPromise(write, undefined, writableError);
pump();
}
function readRejected() {
reading = false;
readableError(readable[_storedError]);
}
// If read() is in progress, then wait for it to tell us that the stream is
// closed so that we write all the data before shutdown.
function onReaderClosed() {
if (!reading) {
readableClosed();
}
}
// These steps are from "Errors must be propagated forward" in the
// standard.
function readableError(error) {
if (!preventAbort) {
shutdownWithAction(
binding.WritableStreamAbort, [dest, error], error, true);
} else {
shutdown(error, true);
}
}
// These steps are from "Errors must be propagated backward".
function writableError(error) {
if (!preventCancel) {
shutdownWithAction(
ReadableStreamCancel, [readable, error], error, true);
} else {
shutdown(error, true);
}
}
// These steps are from "Closing must be propagated forward".
function readableClosed() {
if (!preventClose) {
shutdownWithAction(
binding.WritableStreamDefaultWriterCloseWithErrorPropagation,
[writer]);
} else {
shutdown();
}
}
// These steps are from "Closing must be propagated backward".
function writableStartedClosed() {
const destClosed = new TypeError(errDestinationStreamClosed);
if (!preventCancel) {
shutdownWithAction(
ReadableStreamCancel, [readable, destClosed], destClosed, true);
} else {
shutdown(destClosed, true);
}
}
function shutdownWithAction(
action, args, originalError = undefined, errorGiven = false) {
if (shuttingDown) {
return;
}
shuttingDown = true;
let p;
if (shouldWriteQueuedChunks()) {
p = thenPromise(writeQueuedChunks(),
() => applyFunction(action, undefined, args));
} else {
p = applyFunction(action, undefined, args);
}
thenPromise(
p, () => finalize(originalError, errorGiven),
newError => finalize(newError, true));
}
function shutdown(error = undefined, errorGiven = false) {
if (shuttingDown) {
return;
}
shuttingDown = true;
if (shouldWriteQueuedChunks()) {
thenPromise(writeQueuedChunks(), () => finalize(error, errorGiven));
} else {
finalize(error, errorGiven);
}
}
function finalize(error, errorGiven) {
binding.WritableStreamDefaultWriterRelease(writer);
ReadableStreamReaderGenericRelease(reader);
if (errorGiven) {
rejectPromise(promise, error);
} else {
resolvePromise(promise, undefined);
}
}
function shouldWriteQueuedChunks() {
return binding.isWritableStreamWritable(dest) &&
!binding.WritableStreamCloseQueuedOrInFlight(dest);
}
function writeQueuedChunks() {
if (lastWrite) {
// "Wait until every chunk that has been read has been written (i.e.
// the corresponding promises have settled)"
// This implies that we behave the same whether the promise fulfills or
// rejects.
return thenPromise(lastWrite, () => undefined, () => undefined);
}
return Promise_resolve(undefined);
}
return promise;
}
//
// Readable stream abstract operations
//
function AcquireReadableStreamDefaultReader(stream) {
return new ReadableStreamDefaultReader(stream);
}
// The non-standard boolean |enableBlinkLockNotifications| argument indicates
// whether the stream is being created from C++.
function CreateReadableStream(startAlgorithm, pullAlgorithm, cancelAlgorithm,
highWaterMark, sizeAlgorithm,
enableBlinkLockNotifications) {
if (highWaterMark === undefined) {
highWaterMark = 1;
}
if (sizeAlgorithm === undefined) {
sizeAlgorithm = () => 1;
}
// assert(IsNonNegativeNumber(highWaterMark),
// '! IsNonNegativeNumber(highWaterMark) is true.');
const stream = ObjectCreate(ReadableStream_prototype);
InitializeReadableStream(stream);
const controller = ObjectCreate(ReadableStreamDefaultController_prototype);
SetUpReadableStreamDefaultController(
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm,
highWaterMark, sizeAlgorithm, enableBlinkLockNotifications);
return stream;
}
function InitializeReadableStream(stream) {
stream[_readableStreamBits] = 0b0;
ReadableStreamSetState(stream, STATE_READABLE);
stream[_reader] = undefined;
stream[_storedError] = undefined;
}
function IsReadableStream(x) {
return hasOwnPropertyNoThrow(x, _controller);
}
function IsReadableStreamDisturbed(stream) {
return stream[_readableStreamBits] & DISTURBED;
}
function IsReadableStreamLocked(stream) {
return stream[_reader] !== undefined;
}
// TODO(domenic): cloneForBranch2 argument from spec not supported yet
function ReadableStreamTee(stream) {
const reader = AcquireReadableStreamDefaultReader(stream);
let closedOrErrored = false;
let canceled1 = false;
let canceled2 = false;
let reason1;
let reason2;
const cancelPromise = v8.createPromise();
function pullAlgorithm() {
return thenPromise(
ReadableStreamDefaultReaderRead(reader), ({value, done}) => {
if (done && !closedOrErrored) {
if (!canceled1) {
ReadableStreamDefaultControllerClose(branch1controller);
}
if (!canceled2) {
ReadableStreamDefaultControllerClose(branch2controller);
}
closedOrErrored = true;
}
if (closedOrErrored) {
return;
}
// TODO(ricea): Implement these steps for cloning.
//
// vii. Let _value1_ and _value2_ be _value_.
// viii. If _canceled2_ is false and _cloneForBranch2_ is true, set
// value2 to ? StructuredDeserialize(? StructuredSerialize(value2),
// the current Realm Record).
if (!canceled1) {
ReadableStreamDefaultControllerEnqueue(branch1controller, value);
}
if (!canceled2) {
ReadableStreamDefaultControllerEnqueue(branch2controller, value);
}
});
}
function cancel1Algorithm(reason) {
canceled1 = true;
reason1 = reason;
if (canceled2) {
const cancelResult = ReadableStreamCancel(stream, [reason1, reason2]);
resolvePromise(cancelPromise, cancelResult);
}
return cancelPromise;
}
function cancel2Algorithm(reason) {
canceled2 = true;
reason2 = reason;
if (canceled1) {
const cancelResult = ReadableStreamCancel(stream, [reason1, reason2]);
resolvePromise(cancelPromise, cancelResult);
}
return cancelPromise;
}
const startAlgorithm = () => undefined;
const branch1Stream = CreateReadableStream(
startAlgorithm, pullAlgorithm, cancel1Algorithm, undefined, undefined,
false);
const branch2Stream = CreateReadableStream(
startAlgorithm, pullAlgorithm, cancel2Algorithm, undefined, undefined,
false);
const branch1controller = branch1Stream[_controller];
const branch2controller = branch2Stream[_controller];
thenPromise(reader[_closedPromise], undefined, r => {
if (closedOrErrored === true) {
return;
}
ReadableStreamDefaultControllerError(branch1controller, r);
ReadableStreamDefaultControllerError(branch2controller, r);
closedOrErrored = true;
});
return [branch1Stream, branch2Stream];
}
//
// Abstract Operations Used By Controllers
//
function ReadableStreamAddReadRequest(stream, forAuthorCode) {
const promise = v8.createPromise();
stream[_reader][_readRequests].push({promise, forAuthorCode});
return promise;
}
function ReadableStreamCancel(stream, reason) {
stream[_readableStreamBits] |= DISTURBED;
const state = ReadableStreamGetState(stream);
if (state === STATE_CLOSED) {
return Promise_resolve(undefined);
}
if (state === STATE_ERRORED) {
return Promise_reject(stream[_storedError]);
}
ReadableStreamClose(stream);
const sourceCancelPromise =
ReadableStreamDefaultControllerCancel(stream[_controller], reason);
return thenPromise(sourceCancelPromise, () => undefined);
}
function ReadableStreamClose(stream) {
ReadableStreamSetState(stream, STATE_CLOSED);
const reader = stream[_reader];
if (reader === undefined) {
return;
}
if (IsReadableStreamDefaultReader(reader) === true) {
reader[_readRequests].forEach(
request =>
resolvePromise(
request.promise,
ReadableStreamCreateReadResult(undefined, true,
request.forAuthorCode)));
reader[_readRequests] = new binding.SimpleQueue();
}
resolvePromise(reader[_closedPromise], undefined);
}
function ReadableStreamCreateReadResult(value, done, forAuthorCode) {
// assert(typeof done === 'boolean', 'Type(_done_) is Boolean.');
if (forAuthorCode) {
return {value, done};
}
const obj = ObjectCreate(null);
obj.value = value;
obj.done = done;
return obj;
}
function ReadableStreamError(stream, e) {
ReadableStreamSetState(stream, STATE_ERRORED);
stream[_storedError] = e;
const reader = stream[_reader];
if (reader === undefined) {
return;
}
if (IsReadableStreamDefaultReader(reader) === true) {
reader[_readRequests].forEach(request =>
rejectPromise(request.promise, e));
reader[_readRequests] = new binding.SimpleQueue();
}
rejectPromise(reader[_closedPromise], e);
markPromiseAsHandled(reader[_closedPromise]);
}
function ReadableStreamFulfillReadRequest(stream, chunk, done) {
const readRequest = stream[_reader][_readRequests].shift();
resolvePromise(readRequest.promise,
ReadableStreamCreateReadResult(chunk, done,
readRequest.forAuthorCode));
}
function ReadableStreamGetNumReadRequests(stream) {
const reader = stream[_reader];
const readRequests = reader[_readRequests];
return readRequests.length;
}
//
// Class ReadableStreamDefaultReader
//
class ReadableStreamDefaultReader {
constructor(stream) {
if (IsReadableStream(stream) === false) {
throw new TypeError(errReaderConstructorBadArgument);
}
if (IsReadableStreamLocked(stream) === true) {
throw new TypeError(errReaderConstructorStreamAlreadyLocked);
}
ReadableStreamReaderGenericInitialize(this, stream);
this[_readRequests] = new binding.SimpleQueue();
}
get closed() {
if (IsReadableStreamDefaultReader(this) === false) {
return Promise_reject(new TypeError(streamErrors.illegalInvocation));
}
return this[_closedPromise];
}
cancel(reason) {
if (IsReadableStreamDefaultReader(this) === false) {
return Promise_reject(new TypeError(streamErrors.illegalInvocation));
}
if (this[_ownerReadableStream] === undefined) {
return Promise_reject(new TypeError(errCancelReleasedReader));
}
return ReadableStreamReaderGenericCancel(this, reason);
}
read() {
if (IsReadableStreamDefaultReader(this) === false) {
return Promise_reject(new TypeError(streamErrors.illegalInvocation));
}
if (this[_ownerReadableStream] === undefined) {
return Promise_reject(new TypeError(errReadReleasedReader));
}
return ReadableStreamDefaultReaderRead(this, true);
}
releaseLock() {
if (IsReadableStreamDefaultReader(this) === false) {
throw new TypeError(streamErrors.illegalInvocation);
}
if (this[_ownerReadableStream] === undefined) {
return;
}
if (this[_readRequests].length > 0) {
throw new TypeError(errReleaseReaderWithPendingRead);
}
ReadableStreamReaderGenericRelease(this);
}
}
//
// Readable Stream Reader Abstract Operations
//
function IsReadableStreamDefaultReader(x) {
return hasOwnPropertyNoThrow(x, _readRequests);
}
function ReadableStreamReaderGenericCancel(reader, reason) {
return ReadableStreamCancel(reader[_ownerReadableStream], reason);
}
function ReadableStreamReaderGenericInitialize(reader, stream) {
// TODO(yhirano): Remove this when we don't need hasPendingActivity in
// blink::UnderlyingSourceBase.
const controller = stream[_controller];
if (controller[_readableStreamDefaultControllerBits] &
BLINK_LOCK_NOTIFICATIONS) {
// The stream is created with an external controller (i.e. made in
// Blink).
const lockNotifyTarget = controller[_lockNotifyTarget];
callFunction(lockNotifyTarget.notifyLockAcquired, lockNotifyTarget);
}
reader[_ownerReadableStream] = stream;
stream[_reader] = reader;
switch (ReadableStreamGetState(stream)) {
case STATE_READABLE:
reader[_closedPromise] = v8.createPromise();
break;
case STATE_CLOSED:
reader[_closedPromise] = Promise_resolve(undefined);
break;
case STATE_ERRORED:
reader[_closedPromise] = Promise_reject(stream[_storedError]);
markPromiseAsHandled(reader[_closedPromise]);
break;
}
}
function ReadableStreamReaderGenericRelease(reader) {
// TODO(yhirano): Remove this when we don't need hasPendingActivity in
// blink::UnderlyingSourceBase.
const controller = reader[_ownerReadableStream][_controller];
if (controller[_readableStreamDefaultControllerBits] &
BLINK_LOCK_NOTIFICATIONS) {
// The stream is created with an external controller (i.e. made in
// Blink).
const lockNotifyTarget = controller[_lockNotifyTarget];
callFunction(lockNotifyTarget.notifyLockReleased, lockNotifyTarget);
}
if (ReadableStreamGetState(reader[_ownerReadableStream]) ===
STATE_READABLE) {
rejectPromise(
reader[_closedPromise],
new TypeError(errReleasedReaderClosedPromise));
} else {
reader[_closedPromise] =
Promise_reject(new TypeError(errReleasedReaderClosedPromise));
}
markPromiseAsHandled(reader[_closedPromise]);
reader[_ownerReadableStream][_reader] = undefined;
reader[_ownerReadableStream] = undefined;
}
function ReadableStreamDefaultReaderRead(reader, forAuthorCode = false) {
const stream = reader[_ownerReadableStream];
stream[_readableStreamBits] |= DISTURBED;
switch (ReadableStreamGetState(stream)) {
case STATE_CLOSED:
return Promise_resolve(ReadableStreamCreateReadResult(undefined, true,
forAuthorCode));
case STATE_ERRORED:
return Promise_reject(stream[_storedError]);
default:
return ReadableStreamDefaultControllerPull(stream[_controller],
forAuthorCode);
}
}
//
// Class ReadableStreamDefaultController
//
class ReadableStreamDefaultController {
constructor() {
throw new TypeError(streamErrors.illegalConstructor);
}
get desiredSize() {
if (IsReadableStreamDefaultController(this) === false) {
throw new TypeError(streamErrors.illegalInvocation);
}
return ReadableStreamDefaultControllerGetDesiredSize(this);
}
close() {
if (IsReadableStreamDefaultController(this) === false) {
throw new TypeError(streamErrors.illegalInvocation);
}
if (ReadableStreamDefaultControllerCanCloseOrEnqueue(this) === false) {
let errorDescription;
if (this[_readableStreamDefaultControllerBits] & CLOSE_REQUESTED) {
errorDescription = errCloseCloseRequestedStream;
} else {
const stream = this[_controlledReadableStream];
switch (ReadableStreamGetState(stream)) {
case STATE_ERRORED:
errorDescription = errCloseErroredStream;
break;
case STATE_CLOSED:
errorDescription = errCloseClosedStream;
break;
}
}
throw new TypeError(errorDescription);
}
return ReadableStreamDefaultControllerClose(this);
}
enqueue(chunk) {
if (IsReadableStreamDefaultController(this) === false) {
throw new TypeError(streamErrors.illegalInvocation);
}
if (!ReadableStreamDefaultControllerCanCloseOrEnqueue(this)) {
const stream = this[_controlledReadableStream];
throw getReadableStreamEnqueueError(stream, this);
}
return ReadableStreamDefaultControllerEnqueue(this, chunk);
}
error(e) {
if (IsReadableStreamDefaultController(this) === false) {
throw new TypeError(streamErrors.illegalInvocation);
}
return ReadableStreamDefaultControllerError(this, e);
}
}
const ReadableStreamDefaultController_prototype =
ReadableStreamDefaultController.prototype;
// [[CancelSteps]] in the standard.
function ReadableStreamDefaultControllerCancel(controller, reason) {
controller[_queue] = new binding.SimpleQueue();
return controller[_cancelAlgorithm](reason);
}
// [[PullSteps]] in the standard.
function ReadableStreamDefaultControllerPull(controller, forAuthorCode) {
const stream = controller[_controlledReadableStream];
if (controller[_queue].length > 0) {
const chunk = DequeueValue(controller);
if ((controller[_readableStreamDefaultControllerBits] &
CLOSE_REQUESTED) &&
controller[_queue].length === 0) {
ReadableStreamClose(stream);
} else {
ReadableStreamDefaultControllerCallPullIfNeeded(controller);
}
return Promise_resolve(ReadableStreamCreateReadResult(chunk, false,
forAuthorCode));
}
const pendingPromise = ReadableStreamAddReadRequest(stream, forAuthorCode);
ReadableStreamDefaultControllerCallPullIfNeeded(controller);
return pendingPromise;
}
//
// Readable Stream Default Controller Abstract Operations
//
function IsReadableStreamDefaultController(x) {
return hasOwnPropertyNoThrow(x, _controlledReadableStream);
}
function ReadableStreamDefaultControllerCallPullIfNeeded(controller) {
const shouldPull =
ReadableStreamDefaultControllerShouldCallPull(controller);
if (shouldPull === false) {
return;
}
if (controller[_readableStreamDefaultControllerBits] & PULLING) {
controller[_readableStreamDefaultControllerBits] |= PULL_AGAIN;
return;
}
controller[_readableStreamDefaultControllerBits] |= PULLING;
thenPromise(
controller[_pullAlgorithm](),
() => {
controller[_readableStreamDefaultControllerBits] &= ~PULLING;
if (controller[_readableStreamDefaultControllerBits] & PULL_AGAIN) {
controller[_readableStreamDefaultControllerBits] &= ~PULL_AGAIN;
ReadableStreamDefaultControllerCallPullIfNeeded(controller);
}
},
e => {
ReadableStreamDefaultControllerError(controller, e);
});
}
function ReadableStreamDefaultControllerShouldCallPull(controller) {
if (!ReadableStreamDefaultControllerCanCloseOrEnqueue(controller)) {
return false;
}
if (!(controller[_readableStreamDefaultControllerBits] & STARTED)) {
return false;
}
const stream = controller[_controlledReadableStream];
if (IsReadableStreamLocked(stream) === true &&
ReadableStreamGetNumReadRequests(stream) > 0) {
return true;
}
const desiredSize =
ReadableStreamDefaultControllerGetDesiredSize(controller);
// assert(desiredSize !== null, '_desiredSize_ is not *null*.');
return desiredSize > 0;
}
function ReadableStreamDefaultControllerClose(controller) {
controller[_readableStreamDefaultControllerBits] |= CLOSE_REQUESTED;
if (controller[_queue].length === 0) {
ReadableStreamClose(controller[_controlledReadableStream]);
}
}
function ReadableStreamDefaultControllerEnqueue(controller, chunk) {
const stream = controller[_controlledReadableStream];
if (IsReadableStreamLocked(stream) === true &&
ReadableStreamGetNumReadRequests(stream) > 0) {
ReadableStreamFulfillReadRequest(stream, chunk, false);
} else {
let chunkSize;
// TODO(ricea): Would it be more efficient if we avoided the
// try ... catch when we're using the default strategy size algorithm?
try {
// Unlike other algorithms, strategySizeAlgorithm isn't indirected, so
// we need to be careful with the |this| value.
chunkSize = callFunction(controller[_strategySizeAlgorithm], undefined,
chunk);
} catch (chunkSizeE) {
ReadableStreamDefaultControllerError(controller, chunkSizeE);
throw chunkSizeE;
}
try {
EnqueueValueWithSize(controller, chunk, chunkSize);
} catch (enqueueE) {
ReadableStreamDefaultControllerError(controller, enqueueE);
throw enqueueE;
}
}
ReadableStreamDefaultControllerCallPullIfNeeded(controller);
}
function ReadableStreamDefaultControllerError(controller, e) {
const stream = controller[_controlledReadableStream];
if (ReadableStreamGetState(stream) !== STATE_READABLE) {
return;
}
controller[_queue] = new binding.SimpleQueue();
ReadableStreamError(stream, e);
}
function ReadableStreamDefaultControllerGetDesiredSize(controller) {
switch (ReadableStreamGetState(controller[_controlledReadableStream])) {
case STATE_ERRORED:
return null;
case STATE_CLOSED:
return 0;
default:
return controller[_strategyHWM] - controller[_queueTotalSize];
}
}
function ReadableStreamDefaultControllerHasBackpressure(controller) {
return !ReadableStreamDefaultControllerShouldCallPull(controller);
}
function ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) {
if (controller[_readableStreamDefaultControllerBits] & CLOSE_REQUESTED) {
return false;
}
const state = ReadableStreamGetState(controller[_controlledReadableStream]);
return state === STATE_READABLE;
}
function SetUpReadableStreamDefaultController(
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm,
highWaterMark, sizeAlgorithm, enableBlinkLockNotifications) {
controller[_controlledReadableStream] = stream;
controller[_queue] = new binding.SimpleQueue();
controller[_queueTotalSize] = 0;
controller[_readableStreamDefaultControllerBits] =
enableBlinkLockNotifications ? BLINK_LOCK_NOTIFICATIONS : 0b0;
controller[_strategySizeAlgorithm] = sizeAlgorithm;
controller[_strategyHWM] = highWaterMark;
controller[_pullAlgorithm] = pullAlgorithm;
controller[_cancelAlgorithm] = cancelAlgorithm;
stream[_controller] = controller;
thenPromise(Promise_resolve(startAlgorithm()), () => {
controller[_readableStreamDefaultControllerBits] |= STARTED;
ReadableStreamDefaultControllerCallPullIfNeeded(controller);
}, r => ReadableStreamDefaultControllerError(controller, r));
}
function SetUpReadableStreamDefaultControllerFromUnderlyingSource(
stream, underlyingSource, highWaterMark, sizeAlgorithm,
enableBlinkLockNotifications) {
const controller = ObjectCreate(ReadableStreamDefaultController_prototype);
const startAlgorithm =
() => CallOrNoop1(underlyingSource, 'start', controller,
'underlyingSource.start');
const pullAlgorithm = CreateAlgorithmFromUnderlyingMethodPassingController(
underlyingSource, 'pull', 0, controller, 'underlyingSource.pull');
const cancelAlgorithm = CreateAlgorithmFromUnderlyingMethod(
underlyingSource, 'cancel', 1, 'underlyingSource.cancel');
// TODO(ricea): Remove this once C++ API has been updated.
if (enableBlinkLockNotifications) {
controller[_lockNotifyTarget] = underlyingSource;
}
SetUpReadableStreamDefaultController(
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm,
highWaterMark, sizeAlgorithm, enableBlinkLockNotifications);
}
//
// Functions for transferable streams.
//
function ReadableStreamSerialize(readable) {
// assert(IsReadableStream(readable),
// `! IsReadableStream(_readable_) is true`);
if (IsReadableStreamLocked(readable)) {
throw new TypeError(errCannotTransferLockedStream);
}
if (!binding.MessageChannel) {
throw new TypeError(errCannotTransferUnsupportedContext);
}
const mc = new binding.MessageChannel();
const writable = CreateCrossRealmTransformWritable(
callFunction(binding.MessageChannel_port1_get, mc));
const promise =
ReadableStreamPipeTo(readable, writable, false, false, false);
markPromiseAsHandled(promise);
return callFunction(binding.MessageChannel_port2_get, mc);
}
function ReadableStreamDeserialize(port) {
return CreateCrossRealmTransformReadable(port);
}
//
// Internal functions. Not part of the standard.
//
function ReadableStreamGetState(stream) {
return (stream[_readableStreamBits] & STATE_MASK) >> STATE_BITS_OFFSET;
}
function ReadableStreamSetState(stream, state) {
stream[_readableStreamBits] = (stream[_readableStreamBits] & ~STATE_MASK) |
(state << STATE_BITS_OFFSET);
}
//
// Functions exported for use by TransformStream. Not part of the standard.
//
function IsReadableStreamReadable(stream) {
return ReadableStreamGetState(stream) === STATE_READABLE;
}
function IsReadableStreamClosed(stream) {
return ReadableStreamGetState(stream) === STATE_CLOSED;
}
function IsReadableStreamErrored(stream) {
return ReadableStreamGetState(stream) === STATE_ERRORED;
}
// Used internally by enqueue() and also by TransformStream.
function getReadableStreamEnqueueError(stream, controller) {
if (controller[_readableStreamDefaultControllerBits] & CLOSE_REQUESTED) {
return new TypeError(errEnqueueCloseRequestedStream);
}
const state = ReadableStreamGetState(stream);
if (state === STATE_ERRORED) {
return new TypeError(errEnqueueErroredStream);
}
// assert(state === STATE_CLOSED, 'state is "closed"');
return new TypeError(errEnqueueClosedStream);
}
//
// Accessors used by TransformStream
//
function getReadableStreamController(stream) {
// assert(
// IsReadableStream(stream), '! IsReadableStream(stream) is true.');
return stream[_controller];
}
function getReadableStreamStoredError(stream) {
// assert(
// IsReadableStream(stream), '! IsReadableStream(stream) is true.');
return stream[_storedError];
}
// TODO(yhirano): Rename this to constructReadableStream.
function createReadableStream(underlyingSource, strategy) {
return new ReadableStream(underlyingSource, strategy);
}
// TODO(yhirano): Rename this to
// constructReadableStreamWithExternalController.
// TODO(ricea): Remove this once the C++ code switches to calling
// CreateReadableStream().
function createReadableStreamWithExternalController(
underlyingSource, strategy) {
return new ReadableStream(
underlyingSource, strategy, createWithExternalControllerSentinel);
}
//
// Additions to the global
//
defineProperty(global, 'ReadableStream', {
value: ReadableStream,
enumerable: false,
configurable: true,
writable: true
});
Object.assign(binding, {
//
// ReadableStream exports to Blink C++
//
AcquireReadableStreamDefaultReader,
createReadableStream,
createReadableStreamWithExternalController,
IsReadableStream,
IsReadableStreamDisturbed,
IsReadableStreamLocked,
IsReadableStreamReadable,
IsReadableStreamClosed,
IsReadableStreamErrored,
IsReadableStreamDefaultReader,
ReadableStreamDefaultReaderRead,
ReadableStreamCancel,
ReadableStreamTee,
ReadableStreamPipeTo,
ReadableStreamSerialize,
ReadableStreamDeserialize,
//
// Controller exports to Blink C++
//
ReadableStreamDefaultControllerClose,
ReadableStreamDefaultControllerGetDesiredSize,
ReadableStreamDefaultControllerEnqueue,
ReadableStreamDefaultControllerError,
//
// Exports to TransformStream
//
CreateReadableStream,
ReadableStreamDefaultControllerCanCloseOrEnqueue,
ReadableStreamDefaultControllerHasBackpressure,
getReadableStreamEnqueueError,
getReadableStreamController,
getReadableStreamStoredError,
});
});