// // Copyright 2018 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #include #include "src/core/client_channel/subchannel_stream_client.h" #include #include #include #include "absl/log/check.h" #include "absl/log/log.h" #include #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/status_helper.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/resource_quota/resource_quota.h" #include "src/core/lib/transport/error_utils.h" #include "src/core/util/time_precise.h" #define SUBCHANNEL_STREAM_INITIAL_CONNECT_BACKOFF_SECONDS 1 #define SUBCHANNEL_STREAM_RECONNECT_BACKOFF_MULTIPLIER 1.6 #define SUBCHANNEL_STREAM_RECONNECT_MAX_BACKOFF_SECONDS 120 #define SUBCHANNEL_STREAM_RECONNECT_JITTER 0.2 namespace grpc_core { using ::grpc_event_engine::experimental::EventEngine; // // SubchannelStreamClient // SubchannelStreamClient::SubchannelStreamClient( RefCountedPtr connected_subchannel, grpc_pollset_set* interested_parties, std::unique_ptr event_handler, const char* tracer) : InternallyRefCounted(tracer), connected_subchannel_(std::move(connected_subchannel)), interested_parties_(interested_parties), tracer_(tracer), call_allocator_(MakeRefCounted( connected_subchannel_->args() .GetObject() ->memory_quota() ->CreateMemoryAllocator( (tracer != nullptr) ? tracer : "SubchannelStreamClient"), 1024)), event_handler_(std::move(event_handler)), retry_backoff_( BackOff::Options() .set_initial_backoff(Duration::Seconds( SUBCHANNEL_STREAM_INITIAL_CONNECT_BACKOFF_SECONDS)) .set_multiplier(SUBCHANNEL_STREAM_RECONNECT_BACKOFF_MULTIPLIER) .set_jitter(SUBCHANNEL_STREAM_RECONNECT_JITTER) .set_max_backoff(Duration::Seconds( SUBCHANNEL_STREAM_RECONNECT_MAX_BACKOFF_SECONDS))), event_engine_(connected_subchannel_->args().GetObject()) { if (GPR_UNLIKELY(tracer_ != nullptr)) { LOG(INFO) << tracer_ << " " << this << ": created SubchannelStreamClient"; } StartCall(); } SubchannelStreamClient::~SubchannelStreamClient() { if (GPR_UNLIKELY(tracer_ != nullptr)) { LOG(INFO) << tracer_ << " " << this << ": destroying SubchannelStreamClient"; } } void SubchannelStreamClient::Orphan() { if (GPR_UNLIKELY(tracer_ != nullptr)) { LOG(INFO) << tracer_ << " " << this << ": SubchannelStreamClient shutting down"; } { MutexLock lock(&mu_); event_handler_.reset(); call_state_.reset(); if (retry_timer_handle_.has_value()) { event_engine_->Cancel(*retry_timer_handle_); retry_timer_handle_.reset(); } } Unref(DEBUG_LOCATION, "orphan"); } void SubchannelStreamClient::StartCall() { MutexLock lock(&mu_); StartCallLocked(); } void SubchannelStreamClient::StartCallLocked() { if (event_handler_ == nullptr) return; CHECK(call_state_ == nullptr); if (event_handler_ != nullptr) { event_handler_->OnCallStartLocked(this); } call_state_ = MakeOrphanable(Ref(), interested_parties_); if (GPR_UNLIKELY(tracer_ != nullptr)) { LOG(INFO) << tracer_ << " " << this << ": SubchannelStreamClient created CallState " << call_state_.get(); } call_state_->StartCallLocked(); } void SubchannelStreamClient::StartRetryTimerLocked() { if (event_handler_ != nullptr) { event_handler_->OnRetryTimerStartLocked(this); } const Duration timeout = retry_backoff_.NextAttemptDelay(); if (GPR_UNLIKELY(tracer_ != nullptr)) { LOG(INFO) << tracer_ << " " << this << ": SubchannelStreamClient health check call lost..."; if (timeout > Duration::Zero()) { LOG(INFO) << tracer_ << " " << this << ": ... will retry in " << timeout.millis() << "ms."; } else { LOG(INFO) << tracer_ << " " << this << ": ... retrying immediately."; } } retry_timer_handle_ = event_engine_->RunAfter( timeout, [self = Ref(DEBUG_LOCATION, "health_retry_timer")]() mutable { ApplicationCallbackExecCtx callback_exec_ctx; ExecCtx exec_ctx; self->OnRetryTimer(); self.reset(DEBUG_LOCATION, "health_retry_timer"); }); } void SubchannelStreamClient::OnRetryTimer() { MutexLock lock(&mu_); if (event_handler_ != nullptr && retry_timer_handle_.has_value() && call_state_ == nullptr) { if (GPR_UNLIKELY(tracer_ != nullptr)) { LOG(INFO) << tracer_ << " " << this << ": SubchannelStreamClient restarting health check call"; } StartCallLocked(); } retry_timer_handle_.reset(); } // // SubchannelStreamClient::CallState // SubchannelStreamClient::CallState::CallState( RefCountedPtr health_check_client, grpc_pollset_set* interested_parties) : subchannel_stream_client_(std::move(health_check_client)), pollent_(grpc_polling_entity_create_from_pollset_set(interested_parties)), arena_(subchannel_stream_client_->call_allocator_->MakeArena()) {} SubchannelStreamClient::CallState::~CallState() { if (GPR_UNLIKELY(subchannel_stream_client_->tracer_ != nullptr)) { LOG(INFO) << subchannel_stream_client_->tracer_ << " " << subchannel_stream_client_.get() << ": SubchannelStreamClient destroying CallState " << this; } // Unset the call combiner cancellation closure. This has the // effect of scheduling the previously set cancellation closure, if // any, so that it can release any internal references it may be // holding to the call stack. call_combiner_.SetNotifyOnCancel(nullptr); } void SubchannelStreamClient::CallState::Orphan() { call_combiner_.Cancel(absl::CancelledError()); Cancel(); } void SubchannelStreamClient::CallState::StartCallLocked() { SubchannelCall::Args args = { subchannel_stream_client_->connected_subchannel_, &pollent_, Slice::FromStaticString("/grpc.health.v1.Health/Watch"), gpr_get_cycle_counter(), // start_time Timestamp::InfFuture(), // deadline arena_.get(), &call_combiner_, }; grpc_error_handle error; call_ = SubchannelCall::Create(std::move(args), &error).release(); // Register after-destruction callback. GRPC_CLOSURE_INIT(&after_call_stack_destruction_, AfterCallStackDestruction, this, grpc_schedule_on_exec_ctx); call_->SetAfterCallStackDestroy(&after_call_stack_destruction_); // Check if creation failed. if (!error.ok() || subchannel_stream_client_->event_handler_ == nullptr) { LOG(ERROR) << "SubchannelStreamClient " << subchannel_stream_client_.get() << " CallState " << this << ": error creating " << "stream on subchannel (" << StatusToString(error) << "); will retry"; CallEndedLocked(/*retry=*/true); return; } // Initialize payload and batch. batch_.payload = &payload_; // on_complete callback takes ref, handled manually. call_->Ref(DEBUG_LOCATION, "on_complete").release(); batch_.on_complete = GRPC_CLOSURE_INIT(&on_complete_, OnComplete, this, grpc_schedule_on_exec_ctx); // Add send_initial_metadata op. send_initial_metadata_.Set( HttpPathMetadata(), subchannel_stream_client_->event_handler_->GetPathLocked()); CHECK(error.ok()); payload_.send_initial_metadata.send_initial_metadata = &send_initial_metadata_; batch_.send_initial_metadata = true; // Add send_message op. send_message_.Append(Slice( subchannel_stream_client_->event_handler_->EncodeSendMessageLocked())); payload_.send_message.send_message = &send_message_; batch_.send_message = true; // Add send_trailing_metadata op. payload_.send_trailing_metadata.send_trailing_metadata = &send_trailing_metadata_; batch_.send_trailing_metadata = true; // Add recv_initial_metadata op. payload_.recv_initial_metadata.recv_initial_metadata = &recv_initial_metadata_; payload_.recv_initial_metadata.trailing_metadata_available = nullptr; // recv_initial_metadata_ready callback takes ref, handled manually. call_->Ref(DEBUG_LOCATION, "recv_initial_metadata_ready").release(); payload_.recv_initial_metadata.recv_initial_metadata_ready = GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady, this, grpc_schedule_on_exec_ctx); batch_.recv_initial_metadata = true; // Add recv_message op. payload_.recv_message.recv_message = &recv_message_; payload_.recv_message.call_failed_before_recv_message = nullptr; // recv_message callback takes ref, handled manually. call_->Ref(DEBUG_LOCATION, "recv_message_ready").release(); payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT( &recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx); batch_.recv_message = true; // Start batch. StartBatch(&batch_); // Initialize recv_trailing_metadata batch. recv_trailing_metadata_batch_.payload = &payload_; // Add recv_trailing_metadata op. payload_.recv_trailing_metadata.recv_trailing_metadata = &recv_trailing_metadata_; payload_.recv_trailing_metadata.collect_stats = &collect_stats_; // This callback signals the end of the call, so it relies on the // initial ref instead of taking a new ref. When it's invoked, the // initial ref is released. payload_.recv_trailing_metadata.recv_trailing_metadata_ready = GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady, this, grpc_schedule_on_exec_ctx); recv_trailing_metadata_batch_.recv_trailing_metadata = true; // Start recv_trailing_metadata batch. StartBatch(&recv_trailing_metadata_batch_); } void SubchannelStreamClient::CallState::StartBatchInCallCombiner( void* arg, grpc_error_handle /*error*/) { auto* batch = static_cast(arg); auto* call = static_cast(batch->handler_private.extra_arg); call->StartTransportStreamOpBatch(batch); } void SubchannelStreamClient::CallState::StartBatch( grpc_transport_stream_op_batch* batch) { batch->handler_private.extra_arg = call_; GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner, batch, grpc_schedule_on_exec_ctx); GRPC_CALL_COMBINER_START(&call_combiner_, &batch->handler_private.closure, absl::OkStatus(), "start_subchannel_batch"); } void SubchannelStreamClient::CallState::AfterCallStackDestruction( void* arg, grpc_error_handle /*error*/) { auto* self = static_cast(arg); delete self; } void SubchannelStreamClient::CallState::OnCancelComplete( void* arg, grpc_error_handle /*error*/) { auto* self = static_cast(arg); GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "health_cancel"); self->call_->Unref(DEBUG_LOCATION, "cancel"); } void SubchannelStreamClient::CallState::StartCancel( void* arg, grpc_error_handle /*error*/) { auto* self = static_cast(arg); auto* batch = grpc_make_transport_stream_op( GRPC_CLOSURE_CREATE(OnCancelComplete, self, grpc_schedule_on_exec_ctx)); batch->cancel_stream = true; batch->payload->cancel_stream.cancel_error = absl::CancelledError(); self->call_->StartTransportStreamOpBatch(batch); } void SubchannelStreamClient::CallState::Cancel() { bool expected = false; if (cancelled_.compare_exchange_strong(expected, true, std::memory_order_acq_rel, std::memory_order_acquire)) { call_->Ref(DEBUG_LOCATION, "cancel").release(); GRPC_CALL_COMBINER_START( &call_combiner_, GRPC_CLOSURE_CREATE(StartCancel, this, grpc_schedule_on_exec_ctx), absl::OkStatus(), "health_cancel"); } } void SubchannelStreamClient::CallState::OnComplete( void* arg, grpc_error_handle /*error*/) { auto* self = static_cast(arg); GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "on_complete"); self->send_initial_metadata_.Clear(); self->send_trailing_metadata_.Clear(); self->call_->Unref(DEBUG_LOCATION, "on_complete"); } void SubchannelStreamClient::CallState::RecvInitialMetadataReady( void* arg, grpc_error_handle /*error*/) { auto* self = static_cast(arg); GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_initial_metadata_ready"); self->recv_initial_metadata_.Clear(); self->call_->Unref(DEBUG_LOCATION, "recv_initial_metadata_ready"); } void SubchannelStreamClient::CallState::RecvMessageReady() { if (!recv_message_.has_value()) { call_->Unref(DEBUG_LOCATION, "recv_message_ready"); return; } // Report payload. { MutexLock lock(&subchannel_stream_client_->mu_); if (subchannel_stream_client_->event_handler_ != nullptr) { absl::Status status = subchannel_stream_client_->event_handler_->RecvMessageReadyLocked( subchannel_stream_client_.get(), recv_message_->JoinIntoString()); if (!status.ok()) { if (GPR_UNLIKELY(subchannel_stream_client_->tracer_ != nullptr)) { LOG(INFO) << subchannel_stream_client_->tracer_ << " " << subchannel_stream_client_.get() << ": SubchannelStreamClient CallState " << this << ": failed to parse response message: " << status; } Cancel(); } } } seen_response_.store(true, std::memory_order_release); recv_message_.reset(); // Start another recv_message batch. // This re-uses the ref we're holding. // Note: Can't just reuse batch_ here, since we don't know that all // callbacks from the original batch have completed yet. recv_message_batch_.payload = &payload_; payload_.recv_message.recv_message = &recv_message_; payload_.recv_message.call_failed_before_recv_message = nullptr; payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT( &recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx); recv_message_batch_.recv_message = true; StartBatch(&recv_message_batch_); } void SubchannelStreamClient::CallState::RecvMessageReady( void* arg, grpc_error_handle /*error*/) { auto* self = static_cast(arg); GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_message_ready"); self->RecvMessageReady(); } void SubchannelStreamClient::CallState::RecvTrailingMetadataReady( void* arg, grpc_error_handle error) { auto* self = static_cast(arg); GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_trailing_metadata_ready"); // Get call status. grpc_status_code status = self->recv_trailing_metadata_.get(GrpcStatusMetadata()) .value_or(GRPC_STATUS_UNKNOWN); if (!error.ok()) { grpc_error_get_status(error, Timestamp::InfFuture(), &status, nullptr /* slice */, nullptr /* http_error */, nullptr /* error_string */); } if (GPR_UNLIKELY(self->subchannel_stream_client_->tracer_ != nullptr)) { LOG(INFO) << self->subchannel_stream_client_->tracer_ << " " << self->subchannel_stream_client_.get() << ": SubchannelStreamClient CallState " << self << ": health watch failed with status " << status; } // Clean up. self->recv_trailing_metadata_.Clear(); // Report call end. MutexLock lock(&self->subchannel_stream_client_->mu_); if (self->subchannel_stream_client_->event_handler_ != nullptr) { self->subchannel_stream_client_->event_handler_ ->RecvTrailingMetadataReadyLocked(self->subchannel_stream_client_.get(), status); } // For status UNIMPLEMENTED, give up and assume always healthy. self->CallEndedLocked(/*retry=*/status != GRPC_STATUS_UNIMPLEMENTED); } void SubchannelStreamClient::CallState::CallEndedLocked(bool retry) { // If this CallState is still in use, this call ended because of a failure, // so we need to stop using it and optionally create a new one. // Otherwise, we have deliberately ended this call, and no further action // is required. if (this == subchannel_stream_client_->call_state_.get()) { subchannel_stream_client_->call_state_.reset(); if (retry) { CHECK(subchannel_stream_client_->event_handler_ != nullptr); if (seen_response_.load(std::memory_order_acquire)) { // If the call fails after we've gotten a successful response, reset // the backoff and restart the call immediately. subchannel_stream_client_->retry_backoff_.Reset(); subchannel_stream_client_->StartCallLocked(); } else { // If the call failed without receiving any messages, retry later. subchannel_stream_client_->StartRetryTimerLocked(); } } } // When the last ref to the call stack goes away, the CallState object // will be automatically destroyed. call_->Unref(DEBUG_LOCATION, "call_ended"); } } // namespace grpc_core