blob: ab7c9b352d9c34c9b040de595ceaf9f54554edcb [file] [log] [blame]
// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "net/quic/core/quic_session.h"
#include <cstdint>
#include <utility>
#include "net/quic/core/quic_connection.h"
#include "net/quic/core/quic_flags.h"
#include "net/quic/core/quic_flow_controller.h"
#include "net/quic/platform/api/quic_bug_tracker.h"
#include "net/quic/platform/api/quic_logging.h"
#include "net/quic/platform/api/quic_map_util.h"
#include "net/quic/platform/api/quic_str_cat.h"
using std::string;
namespace net {
#define ENDPOINT \
(perspective() == Perspective::IS_SERVER ? "Server: " : "Client: ")
QuicSession::QuicSession(QuicConnection* connection,
Visitor* owner,
const QuicConfig& config)
: connection_(connection),
visitor_(owner),
config_(config),
max_open_outgoing_streams_(kDefaultMaxStreamsPerConnection),
max_open_incoming_streams_(config_.GetMaxIncomingDynamicStreamsToSend()),
next_outgoing_stream_id_(perspective() == Perspective::IS_SERVER ? 2 : 3),
largest_peer_created_stream_id_(
perspective() == Perspective::IS_SERVER ? 1 : 0),
num_dynamic_incoming_streams_(0),
num_draining_incoming_streams_(0),
num_locally_closed_incoming_streams_highest_offset_(0),
error_(QUIC_NO_ERROR),
flow_controller_(connection_,
0,
perspective(),
kMinimumFlowControlSendWindow,
config_.GetInitialSessionFlowControlWindowToSend(),
perspective() == Perspective::IS_SERVER,
nullptr),
currently_writing_stream_id_(0),
flow_control_invariant_(
FLAGS_quic_reloadable_flag_quic_flow_control_invariant) {}
void QuicSession::Initialize() {
connection_->set_visitor(this);
connection_->SetFromConfig(config_);
DCHECK_EQ(kCryptoStreamId, GetCryptoStream()->id());
static_stream_map_[kCryptoStreamId] = GetCryptoStream();
}
QuicSession::~QuicSession() {
QUIC_LOG_IF(WARNING, num_locally_closed_incoming_streams_highest_offset() >
max_open_incoming_streams_)
<< "Surprisingly high number of locally closed peer initiated streams"
"still waiting for final byte offset: "
<< num_locally_closed_incoming_streams_highest_offset();
QUIC_LOG_IF(WARNING, GetNumLocallyClosedOutgoingStreamsHighestOffset() >
max_open_outgoing_streams_)
<< "Surprisingly high number of locally closed self initiated streams"
"still waiting for final byte offset: "
<< GetNumLocallyClosedOutgoingStreamsHighestOffset();
}
void QuicSession::OnStreamFrame(const QuicStreamFrame& frame) {
// TODO(rch) deal with the error case of stream id 0.
QuicStreamId stream_id = frame.stream_id;
QuicStream* stream = GetOrCreateStream(stream_id);
if (!stream) {
// The stream no longer exists, but we may still be interested in the
// final stream byte offset sent by the peer. A frame with a FIN can give
// us this offset.
if (frame.fin) {
QuicStreamOffset final_byte_offset = frame.offset + frame.data_length;
UpdateFlowControlOnFinalReceivedByteOffset(stream_id, final_byte_offset);
}
return;
}
stream->OnStreamFrame(frame);
}
void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) {
if (QuicContainsKey(static_stream_map_, frame.stream_id)) {
connection()->CloseConnection(
QUIC_INVALID_STREAM_ID, "Attempt to reset a static stream",
ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
return;
}
if (visitor_) {
visitor_->OnRstStreamReceived(frame);
}
QuicStream* stream = GetOrCreateDynamicStream(frame.stream_id);
if (!stream) {
HandleRstOnValidNonexistentStream(frame);
return; // Errors are handled by GetOrCreateStream.
}
stream->OnStreamReset(frame);
}
void QuicSession::OnGoAway(const QuicGoAwayFrame& frame) {
DCHECK(frame.last_good_stream_id < next_outgoing_stream_id_);
}
void QuicSession::OnConnectionClosed(QuicErrorCode error,
const string& error_details,
ConnectionCloseSource source) {
DCHECK(!connection_->connected());
if (error_ == QUIC_NO_ERROR) {
error_ = error;
}
while (!dynamic_stream_map_.empty()) {
DynamicStreamMap::iterator it = dynamic_stream_map_.begin();
QuicStreamId id = it->first;
it->second->OnConnectionClosed(error, source);
// The stream should call CloseStream as part of OnConnectionClosed.
if (dynamic_stream_map_.find(id) != dynamic_stream_map_.end()) {
QUIC_BUG << ENDPOINT << "Stream failed to close under OnConnectionClosed";
CloseStream(id);
}
}
if (visitor_) {
visitor_->OnConnectionClosed(connection_->connection_id(), error,
error_details);
}
}
void QuicSession::OnWriteBlocked() {
if (visitor_) {
visitor_->OnWriteBlocked(connection_);
}
}
void QuicSession::OnSuccessfulVersionNegotiation(
const QuicVersion& /*version*/) {}
void QuicSession::OnPathDegrading() {}
void QuicSession::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) {
// Stream may be closed by the time we receive a WINDOW_UPDATE, so we can't
// assume that it still exists.
QuicStreamId stream_id = frame.stream_id;
if (stream_id == kConnectionLevelId) {
// This is a window update that applies to the connection, rather than an
// individual stream.
QUIC_DLOG(INFO) << ENDPOINT
<< "Received connection level flow control window "
"update with byte offset: "
<< frame.byte_offset;
flow_controller_.UpdateSendWindowOffset(frame.byte_offset);
return;
}
QuicStream* stream = GetOrCreateStream(stream_id);
if (stream != nullptr) {
stream->OnWindowUpdateFrame(frame);
}
}
void QuicSession::OnBlockedFrame(const QuicBlockedFrame& frame) {
// TODO(rjshade): Compare our flow control receive windows for specified
// streams: if we have a large window then maybe something
// had gone wrong with the flow control accounting.
QUIC_DLOG(INFO) << ENDPOINT << "Received BLOCKED frame with stream id: "
<< frame.stream_id;
}
bool QuicSession::CheckStreamNotBusyLooping(QuicStream* stream,
uint64_t previous_bytes_written,
bool previous_fin_sent) {
if ( // Stream should not be closed.
!stream->write_side_closed() &&
// Not connection flow control blocked.
!flow_controller_.IsBlocked() &&
// Detect lack of forward progress.
previous_bytes_written == stream->stream_bytes_written() &&
previous_fin_sent == stream->fin_sent()) {
stream->set_busy_counter(stream->busy_counter() + 1);
QUIC_DVLOG(1) << "Suspected busy loop on stream id " << stream->id()
<< " stream_bytes_written " << stream->stream_bytes_written()
<< " fin " << stream->fin_sent() << " count "
<< stream->busy_counter();
// Wait a few iterations before firing, the exact count is
// arbitrary, more than a few to cover a few test-only false
// positives.
if (stream->busy_counter() > 20) {
QUIC_LOG(ERROR) << "Detected busy loop on stream id " << stream->id()
<< " stream_bytes_written "
<< stream->stream_bytes_written() << " fin "
<< stream->fin_sent();
return false;
}
} else {
stream->set_busy_counter(0);
}
return true;
}
void QuicSession::OnCanWrite() {
// We limit the number of writes to the number of pending streams. If more
// streams become pending, WillingAndAbleToWrite will be true, which will
// cause the connection to request resumption before yielding to other
// connections.
size_t num_writes = write_blocked_streams_.NumBlockedStreams();
if (flow_controller_.IsBlocked()) {
// If we are connection level flow control blocked, then only allow the
// crypto and headers streams to try writing as all other streams will be
// blocked.
num_writes = 0;
if (write_blocked_streams_.crypto_stream_blocked()) {
num_writes += 1;
}
if (write_blocked_streams_.headers_stream_blocked()) {
num_writes += 1;
}
}
if (num_writes == 0) {
return;
}
QuicConnection::ScopedPacketBundler ack_bundler(
connection_, QuicConnection::SEND_ACK_IF_QUEUED);
for (size_t i = 0; i < num_writes; ++i) {
if (!(write_blocked_streams_.HasWriteBlockedCryptoOrHeadersStream() ||
write_blocked_streams_.HasWriteBlockedDataStreams())) {
// Writing one stream removed another!? Something's broken.
QUIC_BUG << "WriteBlockedStream is missing";
connection_->CloseConnection(QUIC_INTERNAL_ERROR,
"WriteBlockedStream is missing",
ConnectionCloseBehavior::SILENT_CLOSE);
return;
}
if (!connection_->CanWriteStreamData()) {
return;
}
currently_writing_stream_id_ = write_blocked_streams_.PopFront();
QuicStream* stream = GetOrCreateStream(currently_writing_stream_id_);
if (stream != nullptr && !stream->flow_controller()->IsBlocked()) {
// If the stream can't write all bytes it'll re-add itself to the blocked
// list.
uint64_t previous_bytes_written = stream->stream_bytes_written();
bool previous_fin_sent = stream->fin_sent();
QUIC_DVLOG(1) << "stream " << stream->id() << " bytes_written "
<< previous_bytes_written << " fin " << previous_fin_sent;
stream->OnCanWrite();
DCHECK(CheckStreamNotBusyLooping(stream, previous_bytes_written,
previous_fin_sent));
}
currently_writing_stream_id_ = 0;
}
}
bool QuicSession::WillingAndAbleToWrite() const {
// If the crypto or headers streams are blocked, we want to schedule a write -
// they don't get blocked by connection level flow control. Otherwise only
// schedule a write if we are not flow control blocked at the connection
// level.
return write_blocked_streams_.HasWriteBlockedCryptoOrHeadersStream() ||
(!flow_controller_.IsBlocked() &&
write_blocked_streams_.HasWriteBlockedDataStreams());
}
bool QuicSession::HasPendingHandshake() const {
return write_blocked_streams_.crypto_stream_blocked();
}
bool QuicSession::HasOpenDynamicStreams() const {
return (dynamic_stream_map_.size() - draining_streams_.size() +
locally_closed_streams_highest_offset_.size()) > 0;
}
void QuicSession::ProcessUdpPacket(const QuicSocketAddress& self_address,
const QuicSocketAddress& peer_address,
const QuicReceivedPacket& packet) {
connection_->ProcessUdpPacket(self_address, peer_address, packet);
}
QuicConsumedData QuicSession::WritevData(
QuicStream* stream,
QuicStreamId id,
QuicIOVector iov,
QuicStreamOffset offset,
bool fin,
QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener) {
// This check is an attempt to deal with potential memory corruption
// in which |id| ends up set to 1 (the crypto stream id). If this happen
// it might end up resulting in unencrypted stream data being sent.
// While this is impossible to avoid given sufficient corruption, this
// seems like a reasonable mitigation.
if (id == kCryptoStreamId && stream != GetCryptoStream()) {
QUIC_BUG << "Stream id mismatch";
connection_->CloseConnection(
QUIC_INTERNAL_ERROR,
"Non-crypto stream attempted to write data as crypto stream.",
ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
return QuicConsumedData(0, false);
}
if (!IsEncryptionEstablished() && id != kCryptoStreamId) {
// Do not let streams write without encryption. The calling stream will end
// up write blocked until OnCanWrite is next called.
return QuicConsumedData(0, false);
}
QuicConsumedData data = connection_->SendStreamData(id, iov, offset, fin,
std::move(ack_listener));
write_blocked_streams_.UpdateBytesForStream(id, data.bytes_consumed);
return data;
}
void QuicSession::SendRstStream(QuicStreamId id,
QuicRstStreamErrorCode error,
QuicStreamOffset bytes_written) {
if (QuicContainsKey(static_stream_map_, id)) {
QUIC_BUG << "Cannot send RST for a static stream with ID " << id;
return;
}
if (connection()->connected()) {
// Only send a RST_STREAM frame if still connected.
connection_->SendRstStream(id, error, bytes_written);
}
CloseStreamInner(id, true);
}
void QuicSession::SendGoAway(QuicErrorCode error_code, const string& reason) {
if (goaway_sent()) {
return;
}
connection_->SendGoAway(error_code, largest_peer_created_stream_id_, reason);
}
void QuicSession::CloseStream(QuicStreamId stream_id) {
CloseStreamInner(stream_id, false);
}
void QuicSession::InsertLocallyClosedStreamsHighestOffset(
const QuicStreamId id,
QuicStreamOffset offset) {
locally_closed_streams_highest_offset_[id] = offset;
if (IsIncomingStream(id)) {
++num_locally_closed_incoming_streams_highest_offset_;
}
}
void QuicSession::CloseStreamInner(QuicStreamId stream_id, bool locally_reset) {
QUIC_DLOG(INFO) << ENDPOINT << "Closing stream " << stream_id;
DynamicStreamMap::iterator it = dynamic_stream_map_.find(stream_id);
if (it == dynamic_stream_map_.end()) {
// When CloseStreamInner has been called recursively (via
// QuicStream::OnClose), the stream will already have been deleted
// from stream_map_, so return immediately.
QUIC_DLOG(INFO) << ENDPOINT << "Stream is already closed: " << stream_id;
return;
}
QuicStream* stream = it->second.get();
// Tell the stream that a RST has been sent.
if (locally_reset) {
stream->set_rst_sent(true);
}
closed_streams_.push_back(std::move(it->second));
// If we haven't received a FIN or RST for this stream, we need to keep track
// of the how many bytes the stream's flow controller believes it has
// received, for accurate connection level flow control accounting.
if (!stream->HasFinalReceivedByteOffset()) {
InsertLocallyClosedStreamsHighestOffset(
stream_id, stream->flow_controller()->highest_received_byte_offset());
}
dynamic_stream_map_.erase(it);
if (IsIncomingStream(stream_id)) {
--num_dynamic_incoming_streams_;
}
if (draining_streams_.find(stream_id) != draining_streams_.end() &&
IsIncomingStream(stream_id)) {
--num_draining_incoming_streams_;
}
draining_streams_.erase(stream_id);
stream->OnClose();
// Decrease the number of streams being emulated when a new one is opened.
connection_->SetNumOpenStreams(dynamic_stream_map_.size());
}
void QuicSession::UpdateFlowControlOnFinalReceivedByteOffset(
QuicStreamId stream_id,
QuicStreamOffset final_byte_offset) {
std::map<QuicStreamId, QuicStreamOffset>::iterator it =
locally_closed_streams_highest_offset_.find(stream_id);
if (it == locally_closed_streams_highest_offset_.end()) {
return;
}
QUIC_DVLOG(1) << ENDPOINT << "Received final byte offset "
<< final_byte_offset << " for stream " << stream_id;
QuicByteCount offset_diff = final_byte_offset - it->second;
if (flow_controller_.UpdateHighestReceivedOffset(
flow_controller_.highest_received_byte_offset() + offset_diff)) {
// If the final offset violates flow control, close the connection now.
if (flow_controller_.FlowControlViolation()) {
connection_->CloseConnection(
QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
"Connection level flow control violation",
ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
return;
}
}
flow_controller_.AddBytesConsumed(offset_diff);
locally_closed_streams_highest_offset_.erase(it);
if (IsIncomingStream(stream_id)) {
--num_locally_closed_incoming_streams_highest_offset_;
}
}
bool QuicSession::IsEncryptionEstablished() {
return GetCryptoStream()->encryption_established();
}
bool QuicSession::IsCryptoHandshakeConfirmed() {
return GetCryptoStream()->handshake_confirmed();
}
void QuicSession::OnConfigNegotiated() {
connection_->SetFromConfig(config_);
const QuicVersion version = connection()->version();
uint32_t max_streams = 0;
if (version > QUIC_VERSION_34 &&
config_.HasReceivedMaxIncomingDynamicStreams()) {
max_streams = config_.ReceivedMaxIncomingDynamicStreams();
} else {
max_streams = config_.MaxStreamsPerConnection();
}
set_max_open_outgoing_streams(max_streams);
if (FLAGS_quic_reloadable_flag_quic_large_ifw_options &&
perspective() == Perspective::IS_SERVER) {
if (config_.HasReceivedConnectionOptions()) {
// The following variations change the initial receive flow control
// window sizes.
if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW6)) {
AdjustInitialFlowControlWindows(64 * 1024);
}
if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW7)) {
AdjustInitialFlowControlWindows(128 * 1024);
}
if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW8)) {
AdjustInitialFlowControlWindows(256 * 1024);
}
if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW9)) {
AdjustInitialFlowControlWindows(512 * 1024);
}
if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFWA)) {
AdjustInitialFlowControlWindows(1024 * 1024);
}
}
}
if (version <= QUIC_VERSION_34) {
// A small number of additional incoming streams beyond the limit should be
// allowed. This helps avoid early connection termination when FIN/RSTs for
// old streams are lost or arrive out of order.
// Use a minimum number of additional streams, or a percentage increase,
// whichever is larger.
uint32_t max_incoming_streams =
std::max(max_streams + kMaxStreamsMinimumIncrement,
static_cast<uint32_t>(max_streams * kMaxStreamsMultiplier));
set_max_open_incoming_streams(max_incoming_streams);
} else {
uint32_t max_incoming_streams_to_send =
config_.GetMaxIncomingDynamicStreamsToSend();
uint32_t max_incoming_streams =
std::max(max_incoming_streams_to_send + kMaxStreamsMinimumIncrement,
static_cast<uint32_t>(max_incoming_streams_to_send *
kMaxStreamsMultiplier));
set_max_open_incoming_streams(max_incoming_streams);
}
if (config_.HasReceivedInitialStreamFlowControlWindowBytes()) {
// Streams which were created before the SHLO was received (0-RTT
// requests) are now informed of the peer's initial flow control window.
OnNewStreamFlowControlWindow(
config_.ReceivedInitialStreamFlowControlWindowBytes());
}
if (config_.HasReceivedInitialSessionFlowControlWindowBytes()) {
OnNewSessionFlowControlWindow(
config_.ReceivedInitialSessionFlowControlWindowBytes());
}
}
void QuicSession::AdjustInitialFlowControlWindows(size_t stream_window) {
const float session_window_multiplier =
config_.GetInitialStreamFlowControlWindowToSend()
? static_cast<float>(
config_.GetInitialSessionFlowControlWindowToSend()) /
config_.GetInitialStreamFlowControlWindowToSend()
: 1.5;
QUIC_DVLOG(1) << ENDPOINT << "Set stream receive window to " << stream_window;
config_.SetInitialStreamFlowControlWindowToSend(stream_window);
size_t session_window = session_window_multiplier * stream_window;
QUIC_DVLOG(1) << ENDPOINT << "Set session receive window to "
<< session_window;
config_.SetInitialSessionFlowControlWindowToSend(session_window);
flow_controller_.UpdateReceiveWindowSize(session_window);
// Inform all existing streams about the new window.
for (auto const& kv : static_stream_map_) {
kv.second->flow_controller()->UpdateReceiveWindowSize(stream_window);
}
for (auto const& kv : dynamic_stream_map_) {
kv.second->flow_controller()->UpdateReceiveWindowSize(stream_window);
}
}
void QuicSession::HandleFrameOnNonexistentOutgoingStream(
QuicStreamId stream_id) {
DCHECK(!IsClosedStream(stream_id));
// Received a frame for a locally-created stream that is not currently
// active. This is an error.
connection()->CloseConnection(
QUIC_INVALID_STREAM_ID, "Data for nonexistent stream",
ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
}
void QuicSession::HandleRstOnValidNonexistentStream(
const QuicRstStreamFrame& frame) {
// If the stream is neither originally in active streams nor created in
// GetOrCreateDynamicStream(), it could be a closed stream in which case its
// final received byte offset need to be updated.
if (IsClosedStream(frame.stream_id)) {
// The RST frame contains the final byte offset for the stream: we can now
// update the connection level flow controller if needed.
UpdateFlowControlOnFinalReceivedByteOffset(frame.stream_id,
frame.byte_offset);
}
}
void QuicSession::OnNewStreamFlowControlWindow(QuicStreamOffset new_window) {
if (new_window < kMinimumFlowControlSendWindow) {
QUIC_LOG_FIRST_N(ERROR, 1)
<< "Peer sent us an invalid stream flow control send window: "
<< new_window << ", below default: " << kMinimumFlowControlSendWindow;
if (connection_->connected()) {
connection_->CloseConnection(
QUIC_FLOW_CONTROL_INVALID_WINDOW, "New stream window too low",
ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
}
return;
}
// Inform all existing streams about the new window.
for (auto const& kv : static_stream_map_) {
kv.second->UpdateSendWindowOffset(new_window);
}
for (auto const& kv : dynamic_stream_map_) {
kv.second->UpdateSendWindowOffset(new_window);
}
}
void QuicSession::OnNewSessionFlowControlWindow(QuicStreamOffset new_window) {
if (new_window < kMinimumFlowControlSendWindow) {
QUIC_LOG_FIRST_N(ERROR, 1)
<< "Peer sent us an invalid session flow control send window: "
<< new_window << ", below default: " << kMinimumFlowControlSendWindow;
if (connection_->connected()) {
connection_->CloseConnection(
QUIC_FLOW_CONTROL_INVALID_WINDOW, "New connection window too low",
ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
}
return;
}
flow_controller_.UpdateSendWindowOffset(new_window);
}
void QuicSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) {
switch (event) {
// TODO(satyamshekhar): Move the logic of setting the encrypter/decrypter
// to QuicSession since it is the glue.
case ENCRYPTION_FIRST_ESTABLISHED:
// Given any streams blocked by encryption a chance to write.
OnCanWrite();
break;
case ENCRYPTION_REESTABLISHED:
// Retransmit originally packets that were sent, since they can't be
// decrypted by the peer.
connection_->RetransmitUnackedPackets(ALL_INITIAL_RETRANSMISSION);
// Given any streams blocked by encryption a chance to write.
OnCanWrite();
break;
case HANDSHAKE_CONFIRMED:
QUIC_BUG_IF(!config_.negotiated())
<< ENDPOINT << "Handshake confirmed without parameter negotiation.";
// Discard originally encrypted packets, since they can't be decrypted by
// the peer.
connection_->NeuterUnencryptedPackets();
break;
default:
QUIC_LOG(ERROR) << ENDPOINT << "Got unknown handshake event: " << event;
}
}
void QuicSession::OnCryptoHandshakeMessageSent(
const CryptoHandshakeMessage& /*message*/) {}
void QuicSession::OnCryptoHandshakeMessageReceived(
const CryptoHandshakeMessage& /*message*/) {}
QuicConfig* QuicSession::config() {
return &config_;
}
void QuicSession::ActivateStream(std::unique_ptr<QuicStream> stream) {
QuicStreamId stream_id = stream->id();
QUIC_DLOG(INFO) << ENDPOINT << "num_streams: " << dynamic_stream_map_.size()
<< ". activating " << stream_id;
DCHECK(!QuicContainsKey(dynamic_stream_map_, stream_id));
DCHECK(!QuicContainsKey(static_stream_map_, stream_id));
dynamic_stream_map_[stream_id] = std::move(stream);
if (IsIncomingStream(stream_id)) {
++num_dynamic_incoming_streams_;
}
// Increase the number of streams being emulated when a new one is opened.
connection_->SetNumOpenStreams(dynamic_stream_map_.size());
}
QuicStreamId QuicSession::GetNextOutgoingStreamId() {
QuicStreamId id = next_outgoing_stream_id_;
next_outgoing_stream_id_ += 2;
return id;
}
QuicStream* QuicSession::GetOrCreateStream(const QuicStreamId stream_id) {
StaticStreamMap::iterator it = static_stream_map_.find(stream_id);
if (it != static_stream_map_.end()) {
return it->second;
}
return GetOrCreateDynamicStream(stream_id);
}
void QuicSession::StreamDraining(QuicStreamId stream_id) {
DCHECK(QuicContainsKey(dynamic_stream_map_, stream_id));
if (!QuicContainsKey(draining_streams_, stream_id)) {
draining_streams_.insert(stream_id);
if (IsIncomingStream(stream_id)) {
++num_draining_incoming_streams_;
}
}
}
bool QuicSession::MaybeIncreaseLargestPeerStreamId(
const QuicStreamId stream_id) {
if (stream_id <= largest_peer_created_stream_id_) {
return true;
}
// Check if the new number of available streams would cause the number of
// available streams to exceed the limit. Note that the peer can create
// only alternately-numbered streams.
size_t additional_available_streams =
(stream_id - largest_peer_created_stream_id_) / 2 - 1;
size_t new_num_available_streams =
GetNumAvailableStreams() + additional_available_streams;
if (new_num_available_streams > MaxAvailableStreams()) {
QUIC_DLOG(INFO) << ENDPOINT
<< "Failed to create a new incoming stream with id:"
<< stream_id << ". There are already "
<< GetNumAvailableStreams()
<< " streams available, which would become "
<< new_num_available_streams << ", which exceeds the limit "
<< MaxAvailableStreams() << ".";
connection()->CloseConnection(
QUIC_TOO_MANY_AVAILABLE_STREAMS,
QuicStrCat(new_num_available_streams, " above ", MaxAvailableStreams()),
ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
return false;
}
for (QuicStreamId id = largest_peer_created_stream_id_ + 2; id < stream_id;
id += 2) {
available_streams_.insert(id);
}
largest_peer_created_stream_id_ = stream_id;
return true;
}
bool QuicSession::ShouldYield(QuicStreamId stream_id) {
if (stream_id == currently_writing_stream_id_) {
return false;
}
return write_blocked_streams()->ShouldYield(stream_id);
}
QuicStream* QuicSession::GetOrCreateDynamicStream(
const QuicStreamId stream_id) {
DCHECK(!QuicContainsKey(static_stream_map_, stream_id))
<< "Attempt to call GetOrCreateDynamicStream for a static stream";
DynamicStreamMap::iterator it = dynamic_stream_map_.find(stream_id);
if (it != dynamic_stream_map_.end()) {
return it->second.get();
}
if (IsClosedStream(stream_id)) {
return nullptr;
}
if (!IsIncomingStream(stream_id)) {
HandleFrameOnNonexistentOutgoingStream(stream_id);
return nullptr;
}
available_streams_.erase(stream_id);
if (!MaybeIncreaseLargestPeerStreamId(stream_id)) {
return nullptr;
}
// Check if the new number of open streams would cause the number of
// open streams to exceed the limit.
if (GetNumOpenIncomingStreams() >= max_open_incoming_streams()) {
// Refuse to open the stream.
SendRstStream(stream_id, QUIC_REFUSED_STREAM, 0);
return nullptr;
}
return CreateIncomingDynamicStream(stream_id);
}
void QuicSession::set_max_open_incoming_streams(
size_t max_open_incoming_streams) {
QUIC_DVLOG(1) << "Setting max_open_incoming_streams_ to "
<< max_open_incoming_streams;
max_open_incoming_streams_ = max_open_incoming_streams;
QUIC_DVLOG(1) << "MaxAvailableStreams() became " << MaxAvailableStreams();
}
void QuicSession::set_max_open_outgoing_streams(
size_t max_open_outgoing_streams) {
QUIC_DVLOG(1) << "Setting max_open_outgoing_streams_ to "
<< max_open_outgoing_streams;
max_open_outgoing_streams_ = max_open_outgoing_streams;
}
bool QuicSession::goaway_sent() const {
return connection_->goaway_sent();
}
bool QuicSession::goaway_received() const {
return connection_->goaway_received();
}
bool QuicSession::IsClosedStream(QuicStreamId id) {
DCHECK_NE(0u, id);
if (IsOpenStream(id)) {
// Stream is active
return false;
}
if (!IsIncomingStream(id)) {
// Locally created streams are strictly in-order. If the id is in the
// range of created streams and it's not active, it must have been closed.
return id < next_outgoing_stream_id_;
}
// For peer created streams, we also need to consider available streams.
return id <= largest_peer_created_stream_id_ &&
!QuicContainsKey(available_streams_, id);
}
bool QuicSession::IsOpenStream(QuicStreamId id) {
DCHECK_NE(0u, id);
if (QuicContainsKey(static_stream_map_, id) ||
QuicContainsKey(dynamic_stream_map_, id)) {
// Stream is active
return true;
}
return false;
}
size_t QuicSession::GetNumOpenIncomingStreams() const {
return num_dynamic_incoming_streams_ - num_draining_incoming_streams_ +
num_locally_closed_incoming_streams_highest_offset_;
}
size_t QuicSession::GetNumOpenOutgoingStreams() const {
CHECK_GE(GetNumDynamicOutgoingStreams() +
GetNumLocallyClosedOutgoingStreamsHighestOffset(),
GetNumDrainingOutgoingStreams());
return GetNumDynamicOutgoingStreams() +
GetNumLocallyClosedOutgoingStreamsHighestOffset() -
GetNumDrainingOutgoingStreams();
}
size_t QuicSession::GetNumActiveStreams() const {
return dynamic_stream_map_.size() - draining_streams_.size();
}
size_t QuicSession::GetNumAvailableStreams() const {
return available_streams_.size();
}
void QuicSession::MarkConnectionLevelWriteBlocked(QuicStreamId id) {
QUIC_BUG_IF(GetOrCreateStream(id) == nullptr) << "Marking unknown stream "
<< id << " blocked.";
write_blocked_streams_.AddStream(id);
}
bool QuicSession::HasDataToWrite() const {
return write_blocked_streams_.HasWriteBlockedCryptoOrHeadersStream() ||
write_blocked_streams_.HasWriteBlockedDataStreams() ||
connection_->HasQueuedData();
}
void QuicSession::PostProcessAfterData() {
closed_streams_.clear();
}
size_t QuicSession::GetNumDynamicOutgoingStreams() const {
DCHECK_GE(dynamic_stream_map_.size(), num_dynamic_incoming_streams_);
return dynamic_stream_map_.size() - num_dynamic_incoming_streams_;
}
size_t QuicSession::GetNumDrainingOutgoingStreams() const {
DCHECK_GE(draining_streams_.size(), num_draining_incoming_streams_);
return draining_streams_.size() - num_draining_incoming_streams_;
}
size_t QuicSession::GetNumLocallyClosedOutgoingStreamsHighestOffset() const {
DCHECK_GE(locally_closed_streams_highest_offset_.size(),
num_locally_closed_incoming_streams_highest_offset_);
return locally_closed_streams_highest_offset_.size() -
num_locally_closed_incoming_streams_highest_offset_;
}
bool QuicSession::IsConnectionFlowControlBlocked() const {
return flow_controller_.IsBlocked();
}
bool QuicSession::IsStreamFlowControlBlocked() {
for (auto const& kv : static_stream_map_) {
if (kv.second->flow_controller()->IsBlocked()) {
return true;
}
}
for (auto const& kv : dynamic_stream_map_) {
if (kv.second->flow_controller()->IsBlocked()) {
return true;
}
}
return false;
}
size_t QuicSession::MaxAvailableStreams() const {
return max_open_incoming_streams_ * kMaxAvailableStreamsMultiplier;
}
bool QuicSession::IsIncomingStream(QuicStreamId id) const {
return id % 2 != next_outgoing_stream_id_ % 2;
}
} // namespace net