// // // Copyright 2015 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // // #include <inttypes.h> #include <stddef.h> #include <algorithm> #include <limits> #include <memory> #include <string> #include <utility> #include "absl/container/flat_hash_map.h" #include "absl/log/check.h" #include "absl/log/log.h" #include "absl/status/status.h" #include "absl/types/optional.h" #include <grpc/event_engine/event_engine.h> #include <grpc/slice_buffer.h> #include <grpc/support/port_platform.h> #include <grpc/support/time.h> #include "src/core/channelz/channelz.h" #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/ext/transport/chttp2/transport/context_list_entry.h" #include "src/core/ext/transport/chttp2/transport/flow_control.h" #include "src/core/ext/transport/chttp2/transport/frame_data.h" #include "src/core/ext/transport/chttp2/transport/frame_ping.h" #include "src/core/ext/transport/chttp2/transport/frame_rst_stream.h" #include "src/core/ext/transport/chttp2/transport/frame_settings.h" #include "src/core/ext/transport/chttp2/transport/frame_window_update.h" #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" #include "src/core/ext/transport/chttp2/transport/http2_settings.h" #include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/ext/transport/chttp2/transport/legacy_frame.h" #include "src/core/ext/transport/chttp2/transport/ping_callbacks.h" #include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h" #include "src/core/ext/transport/chttp2/transport/write_size_policy.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/experiments/experiments.h" #include "src/core/lib/gprpp/match.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/slice/slice.h" #include "src/core/lib/slice/slice_buffer.h" #include "src/core/lib/transport/bdp_estimator.h" #include "src/core/lib/transport/http2_errors.h" #include "src/core/lib/transport/metadata_batch.h" #include "src/core/lib/transport/transport.h" #include "src/core/telemetry/call_tracer.h" #include "src/core/telemetry/stats.h" #include "src/core/telemetry/stats_data.h" #include "src/core/util/useful.h" // IWYU pragma: no_include "src/core/lib/gprpp/orphanable.h" static void add_to_write_list(grpc_chttp2_write_cb** list, grpc_chttp2_write_cb* cb) { cb->next = *list; *list = cb; } static void finish_write_cb(grpc_chttp2_transport* t, grpc_chttp2_write_cb* cb, grpc_error_handle error) { grpc_chttp2_complete_closure_step(t, &cb->closure, error, "finish_write_cb"); cb->next = t->write_cb_pool; t->write_cb_pool = cb; } static grpc_core::Duration NextAllowedPingInterval(grpc_chttp2_transport* t) { if (t->is_client) { return (t->keepalive_permit_without_calls == 0 && t->stream_map.empty()) ? grpc_core::Duration::Hours(2) : grpc_core::Duration::Seconds( 1); // A second is added to deal with // network delays and timing imprecision } if (t->sent_goaway_state != GRPC_CHTTP2_GRACEFUL_GOAWAY) { // The gRPC keepalive spec doesn't call for any throttling on the server // side, but we are adding some throttling for protection anyway, unless // we are doing a graceful GOAWAY in which case we don't want to wait. if (grpc_core::IsMultipingEnabled()) { return grpc_core::Duration::Seconds(1); } return t->keepalive_time == grpc_core::Duration::Infinity() ? grpc_core::Duration::Seconds(20) : t->keepalive_time / 2; } return grpc_core::Duration::Zero(); } static void maybe_initiate_ping(grpc_chttp2_transport* t) { if (!t->ping_callbacks.ping_requested()) { // no ping needed: wait return; } // InvalidateNow to avoid getting stuck re-initializing the ping timer // in a loop while draining the currently-held combiner. Also see // https://github.com/grpc/grpc/issues/26079. grpc_core::ExecCtx::Get()->InvalidateNow(); Match( t->ping_rate_policy.RequestSendPing(NextAllowedPingInterval(t), t->ping_callbacks.pings_inflight()), [t](grpc_core::Chttp2PingRatePolicy::SendGranted) { t->ping_rate_policy.SentPing(); const uint64_t id = t->ping_callbacks.StartPing(t->bitgen); grpc_slice_buffer_add(t->outbuf.c_slice_buffer(), grpc_chttp2_ping_create(false, id)); t->keepalive_incoming_data_wanted = true; if (t->channelz_socket != nullptr) { t->channelz_socket->RecordKeepaliveSent(); } grpc_core::global_stats().IncrementHttp2PingsSent(); if (GRPC_TRACE_FLAG_ENABLED(http) || GRPC_TRACE_FLAG_ENABLED(bdp_estimator) || GRPC_TRACE_FLAG_ENABLED(http_keepalive) || GRPC_TRACE_FLAG_ENABLED(http2_ping)) { LOG(INFO) << (t->is_client ? "CLIENT" : "SERVER") << "[" << t << "]: Ping " << id << " sent [" << std::string(t->peer_string.as_string_view()) << "]: " << t->ping_rate_policy.GetDebugString(); } }, [t](grpc_core::Chttp2PingRatePolicy::TooManyRecentPings) { // need to receive something of substance before sending a ping again if (GRPC_TRACE_FLAG_ENABLED(http) || GRPC_TRACE_FLAG_ENABLED(bdp_estimator) || GRPC_TRACE_FLAG_ENABLED(http_keepalive) || GRPC_TRACE_FLAG_ENABLED(http2_ping)) { LOG(INFO) << (t->is_client ? "CLIENT" : "SERVER") << "[" << t << "]: Ping delayed [" << std::string(t->peer_string.as_string_view()) << "]: too many recent pings: " << t->ping_rate_policy.GetDebugString(); } }, [t](grpc_core::Chttp2PingRatePolicy::TooSoon too_soon) { // not enough elapsed time between successive pings if (GRPC_TRACE_FLAG_ENABLED(http) || GRPC_TRACE_FLAG_ENABLED(bdp_estimator) || GRPC_TRACE_FLAG_ENABLED(http_keepalive) || GRPC_TRACE_FLAG_ENABLED(http2_ping)) { LOG(INFO) << (t->is_client ? "CLIENT" : "SERVER") << "[" << t << "]: Ping delayed [" << std::string(t->peer_string.as_string_view()) << "]: not enough time elapsed since last " "ping. Last ping:" << too_soon.last_ping << ", minimum wait:" << too_soon.next_allowed_ping_interval << ", need to wait:" << too_soon.wait; } if (t->delayed_ping_timer_handle == grpc_event_engine::experimental::EventEngine::TaskHandle:: kInvalid) { t->delayed_ping_timer_handle = t->event_engine->RunAfter( too_soon.wait, [t = t->Ref()]() mutable { grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; grpc_chttp2_retry_initiate_ping(std::move(t)); }); } }); } static bool update_list(grpc_chttp2_transport* t, int64_t send_bytes, grpc_chttp2_write_cb** list, int64_t* ctr, grpc_error_handle error) { bool sched_any = false; grpc_chttp2_write_cb* cb = *list; *list = nullptr; *ctr += send_bytes; while (cb) { grpc_chttp2_write_cb* next = cb->next; if (cb->call_at_byte <= *ctr) { sched_any = true; finish_write_cb(t, cb, error); } else { add_to_write_list(list, cb); } cb = next; } return sched_any; } static void report_stall(grpc_chttp2_transport* t, grpc_chttp2_stream* s, const char* staller) { GRPC_TRACE_VLOG(flowctl, 2) << t->peer_string.as_string_view() << ":" << t << " stream " << s->id << " moved to stalled list by " << staller << ". This is FULLY expected to happen in a healthy program that is not " "seeing flow control stalls. However, if you know that there are " "unwanted stalls, here is some helpful data: [fc:pending=" << s->flow_controlled_buffer.length << ":flowed=" << s->flow_controlled_bytes_flowed << ":peer_initwin=" << t->settings.acked().initial_window_size() << ":t_win=" << t->flow_control.remote_window() << ":s_win=" << static_cast<uint32_t>(std::max( int64_t{0}, s->flow_control.remote_window_delta() + static_cast<int64_t>( t->settings.peer().initial_window_size()))) << ":s_delta=" << s->flow_control.remote_window_delta() << "]"; } namespace { class CountDefaultMetadataEncoder { public: size_t count() const { return count_; } void Encode(const grpc_core::Slice&, const grpc_core::Slice&) {} template <typename Which> void Encode(Which, const typename Which::ValueType&) { count_++; } private: size_t count_ = 0; }; } // namespace // Returns true if initial_metadata contains only default headers. static bool is_default_initial_metadata(grpc_metadata_batch* initial_metadata) { CountDefaultMetadataEncoder enc; initial_metadata->Encode(&enc); return enc.count() == initial_metadata->count(); } namespace { class WriteContext { public: explicit WriteContext(grpc_chttp2_transport* t) : t_(t) { grpc_core::global_stats().IncrementHttp2WritesBegun(); } void FlushSettings() { auto update = t_->settings.MaybeSendUpdate(); if (update.has_value()) { grpc_core::Http2Frame frame(std::move(*update)); Serialize(absl::Span<grpc_core::Http2Frame>(&frame, 1), t_->outbuf); if (t_->keepalive_timeout != grpc_core::Duration::Infinity()) { CHECK( t_->settings_ack_watchdog == grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid); // We base settings timeout on keepalive timeout, but double it to allow // for implementations taking some more time about acking a setting. t_->settings_ack_watchdog = t_->event_engine->RunAfter( t_->settings_timeout, [t = t_->Ref()]() mutable { grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; grpc_chttp2_settings_timeout(std::move(t)); }); } t_->flow_control.FlushedSettings(); grpc_core::global_stats().IncrementHttp2SettingsWrites(); } } void FlushQueuedBuffers() { // simple writes are queued to qbuf, and flushed here grpc_slice_buffer_move_into(&t_->qbuf, t_->outbuf.c_slice_buffer()); t_->num_pending_induced_frames = 0; CHECK_EQ(t_->qbuf.count, 0u); } void FlushWindowUpdates() { uint32_t transport_announce = t_->flow_control.MaybeSendUpdate( t_->outbuf.c_slice_buffer()->count > 0); if (transport_announce) { grpc_slice_buffer_add( t_->outbuf.c_slice_buffer(), grpc_chttp2_window_update_create(0, transport_announce, nullptr)); grpc_chttp2_reset_ping_clock(t_); } } void FlushPingAcks() { if (t_->ping_ack_count == 0) return; // Limit the size of writes if we include ping acks - to avoid the ack being // delayed by crypto operations. target_write_size_ = 0; for (size_t i = 0; i < t_->ping_ack_count; i++) { grpc_slice_buffer_add(t_->outbuf.c_slice_buffer(), grpc_chttp2_ping_create(true, t_->ping_acks[i])); } t_->ping_ack_count = 0; } void EnactHpackSettings() { t_->hpack_compressor.SetMaxTableSize( t_->settings.peer().header_table_size()); } void UpdateStreamsNoLongerStalled() { grpc_chttp2_stream* s; while (grpc_chttp2_list_pop_stalled_by_transport(t_, &s)) { if (t_->closed_with_error.ok() && grpc_chttp2_list_add_writable_stream(t_, s)) { if (!s->refcount->refs.RefIfNonZero()) { grpc_chttp2_list_remove_writable_stream(t_, s); } } } } grpc_chttp2_stream* NextStream() { if (t_->outbuf.c_slice_buffer()->length > target_write_size_) { result_.partial = true; return nullptr; } grpc_chttp2_stream* s; if (!grpc_chttp2_list_pop_writable_stream(t_, &s)) { return nullptr; } return s; } void IncInitialMetadataWrites() { ++initial_metadata_writes_; } void IncWindowUpdateWrites() { ++flow_control_writes_; } void IncMessageWrites() { ++message_writes_; } void IncTrailingMetadataWrites() { ++trailing_metadata_writes_; } void NoteScheduledResults() { result_.early_results_scheduled = true; } grpc_chttp2_transport* transport() const { return t_; } grpc_chttp2_begin_write_result Result() { result_.writing = t_->outbuf.c_slice_buffer()->count > 0; return result_; } size_t target_write_size() const { return target_write_size_; } private: grpc_chttp2_transport* const t_; size_t target_write_size_ = t_->write_size_policy.WriteTargetSize(); // stats histogram counters: we increment these throughout this function, // and at the end publish to the central stats histograms int flow_control_writes_ = 0; int initial_metadata_writes_ = 0; int trailing_metadata_writes_ = 0; int message_writes_ = 0; grpc_chttp2_begin_write_result result_ = {false, false, false}; }; class DataSendContext { public: DataSendContext(WriteContext* write_context, grpc_chttp2_transport* t, grpc_chttp2_stream* s) : write_context_(write_context), t_(t), s_(s), sending_bytes_before_(s_->sending_bytes) {} uint32_t stream_remote_window() const { return static_cast<uint32_t>(std::max( int64_t{0}, s_->flow_control.remote_window_delta() + static_cast<int64_t>(t_->settings.peer().initial_window_size()))); } uint32_t max_outgoing() const { return grpc_core::Clamp<uint32_t>( std::min<int64_t>( {t_->settings.peer().max_frame_size(), stream_remote_window(), t_->flow_control.remote_window(), static_cast<int64_t>(write_context_->target_write_size())}), 0, std::numeric_limits<uint32_t>::max()); } bool AnyOutgoing() const { return max_outgoing() > 0; } void FlushBytes() { uint32_t send_bytes = static_cast<uint32_t>(std::min(static_cast<size_t>(max_outgoing()), s_->flow_controlled_buffer.length)); is_last_frame_ = send_bytes == s_->flow_controlled_buffer.length && s_->send_trailing_metadata != nullptr && s_->send_trailing_metadata->empty(); grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, send_bytes, is_last_frame_, &s_->call_tracer_wrapper, t_->outbuf.c_slice_buffer()); sfc_upd_.SentData(send_bytes); s_->sending_bytes += send_bytes; } bool is_last_frame() const { return is_last_frame_; } void CallCallbacks() { if (update_list( t_, static_cast<int64_t>(s_->sending_bytes - sending_bytes_before_), &s_->on_flow_controlled_cbs, &s_->flow_controlled_bytes_flowed, absl::OkStatus())) { write_context_->NoteScheduledResults(); } } private: WriteContext* write_context_; grpc_chttp2_transport* t_; grpc_chttp2_stream* s_; grpc_core::chttp2::StreamFlowControl::OutgoingUpdateContext sfc_upd_{ &s_->flow_control}; const size_t sending_bytes_before_; bool is_last_frame_ = false; }; class StreamWriteContext { public: StreamWriteContext(WriteContext* write_context, grpc_chttp2_stream* s) : write_context_(write_context), t_(write_context->transport()), s_(s) { GRPC_CHTTP2_IF_TRACING(INFO) << "W:" << t_ << " " << (t_->is_client ? "CLIENT" : "SERVER") << "[" << s->id << "] im-(sent,send)=(" << s->sent_initial_metadata << "," << (s->send_initial_metadata != nullptr) << ")"; } void FlushInitialMetadata() { // send initial metadata if it's available if (s_->sent_initial_metadata) return; if (s_->send_initial_metadata == nullptr) return; // We skip this on the server side if there is no custom initial // metadata, there are no messages to send, and we are also sending // trailing metadata. This results in a Trailers-Only response, // which is required for retries, as per: // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#when-retries-are-valid if (!t_->is_client && s_->flow_controlled_buffer.length == 0 && s_->send_trailing_metadata != nullptr && is_default_initial_metadata(s_->send_initial_metadata)) { ConvertInitialMetadataToTrailingMetadata(); } else { t_->hpack_compressor.EncodeHeaders( grpc_core::HPackCompressor::EncodeHeaderOptions{ s_->id, // stream_id false, // is_eof t_->settings.peer() .allow_true_binary_metadata(), // use_true_binary_metadata t_->settings.peer().max_frame_size(), // max_frame_size &s_->call_tracer_wrapper}, *s_->send_initial_metadata, t_->outbuf.c_slice_buffer()); grpc_chttp2_reset_ping_clock(t_); write_context_->IncInitialMetadataWrites(); } s_->send_initial_metadata = nullptr; s_->sent_initial_metadata = true; write_context_->NoteScheduledResults(); grpc_chttp2_complete_closure_step(t_, &s_->send_initial_metadata_finished, absl::OkStatus(), "send_initial_metadata_finished"); if (!grpc_core::IsCallTracerInTransportEnabled()) { if (s_->call_tracer) { grpc_core::HttpAnnotation::WriteStats write_stats; write_stats.target_write_size = write_context_->target_write_size(); s_->call_tracer->RecordAnnotation( grpc_core::HttpAnnotation( grpc_core::HttpAnnotation::Type::kHeadWritten, gpr_now(GPR_CLOCK_REALTIME)) .Add(s_->t->flow_control.stats()) .Add(s_->flow_control.stats()) .Add(write_stats)); } } else if (grpc_core::IsTraceRecordCallopsEnabled()) { auto* call_tracer = s_->arena->GetContext<grpc_core::CallTracerInterface>(); if (call_tracer != nullptr && call_tracer->IsSampled()) { grpc_core::HttpAnnotation::WriteStats write_stats; write_stats.target_write_size = write_context_->target_write_size(); call_tracer->RecordAnnotation( grpc_core::HttpAnnotation( grpc_core::HttpAnnotation::Type::kHeadWritten, gpr_now(GPR_CLOCK_REALTIME)) .Add(s_->t->flow_control.stats()) .Add(s_->flow_control.stats()) .Add(write_stats)); } } } void FlushWindowUpdates() { if (s_->read_closed) return; // send any window updates const uint32_t stream_announce = s_->flow_control.MaybeSendUpdate(); if (stream_announce == 0) return; grpc_slice_buffer_add( t_->outbuf.c_slice_buffer(), grpc_chttp2_window_update_create(s_->id, stream_announce, &s_->call_tracer_wrapper)); grpc_chttp2_reset_ping_clock(t_); write_context_->IncWindowUpdateWrites(); } void FlushData() { if (!s_->sent_initial_metadata) return; if (s_->flow_controlled_buffer.length == 0) { return; // early out: nothing to do } DataSendContext data_send_context(write_context_, t_, s_); if (!data_send_context.AnyOutgoing()) { if (t_->flow_control.remote_window() <= 0) { grpc_core::global_stats().IncrementHttp2TransportStalls(); report_stall(t_, s_, "transport"); grpc_chttp2_list_add_stalled_by_transport(t_, s_); } else if (data_send_context.stream_remote_window() <= 0) { grpc_core::global_stats().IncrementHttp2StreamStalls(); report_stall(t_, s_, "stream"); grpc_chttp2_list_add_stalled_by_stream(t_, s_); } return; // early out: nothing to do } while (s_->flow_controlled_buffer.length > 0 && data_send_context.max_outgoing() > 0) { data_send_context.FlushBytes(); } grpc_chttp2_reset_ping_clock(t_); if (data_send_context.is_last_frame()) { SentLastFrame(); } data_send_context.CallCallbacks(); stream_became_writable_ = true; if (s_->flow_controlled_buffer.length > 0) { GRPC_CHTTP2_STREAM_REF(s_, "chttp2_writing:fork"); grpc_chttp2_list_add_writable_stream(t_, s_); } write_context_->IncMessageWrites(); } void FlushTrailingMetadata() { if (!s_->sent_initial_metadata) return; if (s_->send_trailing_metadata == nullptr) return; if (s_->flow_controlled_buffer.length != 0) return; GRPC_CHTTP2_IF_TRACING(INFO) << "sending trailing_metadata"; if (s_->send_trailing_metadata->empty()) { grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, 0, true, &s_->call_tracer_wrapper, t_->outbuf.c_slice_buffer()); } else { t_->hpack_compressor.EncodeHeaders( grpc_core::HPackCompressor::EncodeHeaderOptions{ s_->id, true, t_->settings.peer().allow_true_binary_metadata(), t_->settings.peer().max_frame_size(), &s_->call_tracer_wrapper}, *s_->send_trailing_metadata, t_->outbuf.c_slice_buffer()); } write_context_->IncTrailingMetadataWrites(); grpc_chttp2_reset_ping_clock(t_); SentLastFrame(); write_context_->NoteScheduledResults(); grpc_chttp2_complete_closure_step(t_, &s_->send_trailing_metadata_finished, absl::OkStatus(), "send_trailing_metadata_finished"); } bool stream_became_writable() { return stream_became_writable_; } private: class TrailersOnlyMetadataEncoder { public: explicit TrailersOnlyMetadataEncoder(grpc_metadata_batch* trailing_md) : trailing_md_(trailing_md) {} template <typename Which, typename Value> void Encode(Which which, Value value) { if (Which::kTransferOnTrailersOnly) { trailing_md_->Set(which, value); } } template <typename Which> void Encode(Which which, const grpc_core::Slice& value) { if (Which::kTransferOnTrailersOnly) { trailing_md_->Set(which, value.Ref()); } } // Non-grpc metadata should not be transferred. void Encode(const grpc_core::Slice&, const grpc_core::Slice&) {} private: grpc_metadata_batch* trailing_md_; }; void ConvertInitialMetadataToTrailingMetadata() { GRPC_CHTTP2_IF_TRACING(INFO) << "not sending initial_metadata (Trailers-Only)"; // When sending Trailers-Only, we need to move metadata from headers to // trailers. TrailersOnlyMetadataEncoder encoder(s_->send_trailing_metadata); s_->send_initial_metadata->Encode(&encoder); } void SentLastFrame() { s_->send_trailing_metadata = nullptr; if (s_->sent_trailing_metadata_op) { *s_->sent_trailing_metadata_op = true; s_->sent_trailing_metadata_op = nullptr; } s_->sent_trailing_metadata = true; s_->eos_sent = true; if (!t_->is_client && !s_->read_closed) { grpc_slice_buffer_add( t_->outbuf.c_slice_buffer(), grpc_chttp2_rst_stream_create(s_->id, GRPC_HTTP2_NO_ERROR, &s_->call_tracer_wrapper)); } grpc_chttp2_mark_stream_closed(t_, s_, !t_->is_client, true, absl::OkStatus()); if (!grpc_core::IsCallTracerInTransportEnabled()) { if (s_->call_tracer) { s_->call_tracer->RecordAnnotation( grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kEnd, gpr_now(GPR_CLOCK_REALTIME)) .Add(s_->t->flow_control.stats()) .Add(s_->flow_control.stats())); } } else if (grpc_core::IsTraceRecordCallopsEnabled()) { auto* call_tracer = s_->arena->GetContext<grpc_core::CallTracerInterface>(); if (call_tracer != nullptr && call_tracer->IsSampled()) { call_tracer->RecordAnnotation( grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kEnd, gpr_now(GPR_CLOCK_REALTIME)) .Add(s_->t->flow_control.stats()) .Add(s_->flow_control.stats())); } } } WriteContext* const write_context_; grpc_chttp2_transport* const t_; grpc_chttp2_stream* const s_; bool stream_became_writable_ = false; }; } // namespace grpc_chttp2_begin_write_result grpc_chttp2_begin_write( grpc_chttp2_transport* t) { GRPC_LATENT_SEE_INNER_SCOPE("grpc_chttp2_begin_write"); int64_t outbuf_relative_start_pos = 0; WriteContext ctx(t); ctx.FlushSettings(); ctx.FlushPingAcks(); ctx.FlushQueuedBuffers(); ctx.EnactHpackSettings(); if (t->flow_control.remote_window() > 0) { ctx.UpdateStreamsNoLongerStalled(); } // for each grpc_chttp2_stream that's become writable, frame it's data // (according to available window sizes) and add to the output buffer while (grpc_chttp2_stream* s = ctx.NextStream()) { StreamWriteContext stream_ctx(&ctx, s); size_t orig_len = t->outbuf.c_slice_buffer()->length; int64_t num_stream_bytes = 0; stream_ctx.FlushInitialMetadata(); stream_ctx.FlushWindowUpdates(); stream_ctx.FlushData(); stream_ctx.FlushTrailingMetadata(); if (t->outbuf.c_slice_buffer()->length > orig_len) { // Add this stream to the list of the contexts to be traced at TCP num_stream_bytes = t->outbuf.c_slice_buffer()->length - orig_len; s->byte_counter += static_cast<size_t>(num_stream_bytes); ++s->write_counter; if (s->traced && grpc_endpoint_can_track_err(t->ep.get())) { grpc_core::CopyContextFn copy_context_fn = grpc_core::GrpcHttp2GetCopyContextFn(); if (copy_context_fn != nullptr && grpc_core::GrpcHttp2GetWriteTimestampsCallback() != nullptr) { t->context_list->emplace_back(copy_context_fn(s->arena), outbuf_relative_start_pos, num_stream_bytes, s->byte_counter, s->write_counter - 1, s->tcp_tracer); } } outbuf_relative_start_pos += num_stream_bytes; } if (stream_ctx.stream_became_writable()) { if (!grpc_chttp2_list_add_writing_stream(t, s)) { // already in writing list: drop ref GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:already_writing"); } else { // ref will be dropped at end of write } } else { GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:no_write"); } } ctx.FlushWindowUpdates(); maybe_initiate_ping(t); t->write_flow.Begin(GRPC_LATENT_SEE_METADATA("write")); return ctx.Result(); } void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error_handle error) { GRPC_LATENT_SEE_INNER_SCOPE("grpc_chttp2_end_write"); grpc_chttp2_stream* s; t->write_flow.End(); if (t->channelz_socket != nullptr) { t->channelz_socket->RecordMessagesSent(t->num_messages_in_next_write); } t->num_messages_in_next_write = 0; if (t->ping_callbacks.started_new_ping_without_setting_timeout() && t->keepalive_timeout != grpc_core::Duration::Infinity()) { // Set ping timeout after finishing write so we don't measure our own send // time. const auto timeout = t->ping_timeout; auto id = t->ping_callbacks.OnPingTimeout( timeout, t->event_engine.get(), [t = t->Ref()] { grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; grpc_chttp2_ping_timeout(t); }); if (GRPC_TRACE_FLAG_ENABLED(http2_ping) && id.has_value()) { LOG(INFO) << (t->is_client ? "CLIENT" : "SERVER") << "[" << t << "]: Set ping timeout timer of " << timeout.ToString() << " for ping id " << id.value(); } if (t->keepalive_incoming_data_wanted && t->keepalive_timeout < t->ping_timeout && t->keepalive_ping_timeout_handle != grpc_event_engine::experimental::EventEngine::TaskHandle:: kInvalid) { if (GRPC_TRACE_FLAG_ENABLED(http2_ping) || GRPC_TRACE_FLAG_ENABLED(http_keepalive)) { LOG(INFO) << (t->is_client ? "CLIENT" : "SERVER") << "[" << t << "]: Set keepalive ping timeout timer of " << t->keepalive_timeout.ToString(); } t->keepalive_ping_timeout_handle = t->event_engine->RunAfter(t->keepalive_timeout, [t = t->Ref()] { grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; grpc_chttp2_keepalive_timeout(t); }); } } while (grpc_chttp2_list_pop_writing_stream(t, &s)) { if (s->sending_bytes != 0) { update_list(t, static_cast<int64_t>(s->sending_bytes), &s->on_write_finished_cbs, &s->flow_controlled_bytes_written, error); s->sending_bytes = 0; } GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:end"); } grpc_slice_buffer_reset_and_unref(t->outbuf.c_slice_buffer()); }