/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ /* * Copyright 2020-2021 Couchbase, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include "core/service_type_fmt.hxx" #include "core/tracing/constants.hxx" #include "core/utils/movable_function.hxx" #include "http_session.hxx" #include "http_traits.hxx" #include #include #include namespace couchbase::core::operations { using http_command_handler = utils::movable_function; template struct http_command : public std::enable_shared_from_this> { using encoded_request_type = typename Request::encoded_request_type; using encoded_response_type = typename Request::encoded_response_type; using error_context_type = typename Request::error_context_type; asio::steady_timer deadline; asio::steady_timer retry_backoff; Request request; encoded_request_type encoded; std::shared_ptr tracer_; std::shared_ptr span_{ nullptr }; std::shared_ptr meter_{}; std::shared_ptr session_{}; http_command_handler handler_{}; std::chrono::milliseconds timeout_{}; std::string client_context_id_; std::shared_ptr parent_span{ nullptr }; http_command(asio::io_context& ctx, Request req, std::shared_ptr tracer, std::shared_ptr meter, std::chrono::milliseconds default_timeout) : deadline(ctx) , retry_backoff(ctx) , request(req) , tracer_(std::move(tracer)) , meter_(std::move(meter)) , timeout_(request.timeout.value_or(default_timeout)) , client_context_id_(request.client_context_id.value_or(uuid::to_string(uuid::random()))) { if constexpr (io::http_traits::supports_parent_span_v) { parent_span = request.parent_span; } } void finish_dispatch(const std::string& remote_address, const std::string& local_address) { if (span_ == nullptr) { return; } span_->add_tag(tracing::attributes::remote_socket, remote_address); span_->add_tag(tracing::attributes::local_socket, local_address); span_->end(); span_ = nullptr; } void start(http_command_handler&& handler) { span_ = tracer_->start_span(tracing::span_name_for_http_service(request.type), parent_span); span_->add_tag(tracing::attributes::service, tracing::service_name_for_http_service(request.type)); span_->add_tag(tracing::attributes::operation_id, client_context_id_); handler_ = std::move(handler); deadline.expires_after(timeout_); deadline.async_wait([self = this->shared_from_this()](std::error_code ec) { if (ec == asio::error::operation_aborted) { return; } self->cancel(); }); } void cancel() { if (session_) { session_->stop(); } invoke_handler(errc::common::unambiguous_timeout, {}); } void invoke_handler(std::error_code ec, io::http_response&& msg) { if (span_ != nullptr) { span_->end(); span_ = nullptr; } if (handler_) { handler_(ec, std::move(msg)); } handler_ = nullptr; retry_backoff.cancel(); deadline.cancel(); } void send_to(std::shared_ptr session) { if (!handler_) { return; } session_ = std::move(session); span_->add_tag(tracing::attributes::local_id, session_->id()); send(); } private: void send() { encoded.type = request.type; encoded.client_context_id = client_context_id_; encoded.timeout = timeout_; if (auto ec = request.encode_to(encoded, session_->http_context()); ec) { return invoke_handler(ec, {}); } encoded.headers["client-context-id"] = client_context_id_; CB_LOG_TRACE(R"({} HTTP request: {}, method={}, path="{}", client_context_id="{}", timeout={}ms)", session_->log_prefix(), encoded.type, encoded.method, encoded.path, client_context_id_, timeout_.count()); session_->write_and_subscribe( encoded, [self = this->shared_from_this(), start = std::chrono::steady_clock::now()](std::error_code ec, io::http_response&& msg) { if (ec == asio::error::operation_aborted) { return self->invoke_handler(errc::common::ambiguous_timeout, std::move(msg)); } static std::string meter_name = "db.couchbase.operations"; static std::map tags = { { "db.couchbase.service", fmt::format("{}", self->request.type) }, { "db.operation", self->encoded.path }, }; if (self->meter_) { self->meter_->get_value_recorder(meter_name, tags) ->record_value(std::chrono::duration_cast(std::chrono::steady_clock::now() - start).count()); } self->deadline.cancel(); self->finish_dispatch(self->session_->remote_address(), self->session_->local_address()); CB_LOG_TRACE(R"({} HTTP response: {}, client_context_id="{}", status={}, body={})", self->session_->log_prefix(), self->request.type, self->client_context_id_, msg.status_code, msg.status_code == 200 ? "[hidden]" : msg.body.data()); if (auto parser_ec = msg.body.ec(); !ec && parser_ec) { ec = parser_ec; } try { self->invoke_handler(ec, std::move(msg)); } catch (const priv::retry_http_request&) { self->send(); } }); } }; } // namespace couchbase::core::operations