// // Copyright 2016 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #include #include "src/core/ext/filters/message_size/message_size_filter.h" #include #include #include "absl/strings/str_format.h" #include #include #include #include "src/core/ext/filters/client_channel/service_config.h" #include "src/core/ext/filters/client_channel/service_config_call_data.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack_builder.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel_init.h" static void recv_message_ready(void* user_data, grpc_error_handle error); static void recv_trailing_metadata_ready(void* user_data, grpc_error_handle error); namespace grpc_core { namespace { size_t g_message_size_parser_index; } // namespace // // MessageSizeParsedConfig // const MessageSizeParsedConfig* MessageSizeParsedConfig::GetFromCallContext( const grpc_call_context_element* context) { if (context == nullptr) return nullptr; auto* svc_cfg_call_data = static_cast( context[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value); if (svc_cfg_call_data == nullptr) return nullptr; return static_cast( svc_cfg_call_data->GetMethodParsedConfig( MessageSizeParser::ParserIndex())); } // // MessageSizeParser // std::unique_ptr MessageSizeParser::ParsePerMethodParams(const grpc_channel_args* /*args*/, const Json& json, grpc_error_handle* error) { GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); std::vector error_list; // Max request size. int max_request_message_bytes = -1; auto it = json.object_value().find("maxRequestMessageBytes"); if (it != json.object_value().end()) { if (it->second.type() != Json::Type::STRING && it->second.type() != Json::Type::NUMBER) { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "field:maxRequestMessageBytes error:should be of type number")); } else { max_request_message_bytes = gpr_parse_nonnegative_int(it->second.string_value().c_str()); if (max_request_message_bytes == -1) { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "field:maxRequestMessageBytes error:should be non-negative")); } } } // Max response size. int max_response_message_bytes = -1; it = json.object_value().find("maxResponseMessageBytes"); if (it != json.object_value().end()) { if (it->second.type() != Json::Type::STRING && it->second.type() != Json::Type::NUMBER) { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "field:maxResponseMessageBytes error:should be of type number")); } else { max_response_message_bytes = gpr_parse_nonnegative_int(it->second.string_value().c_str()); if (max_response_message_bytes == -1) { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "field:maxResponseMessageBytes error:should be non-negative")); } } } if (!error_list.empty()) { *error = GRPC_ERROR_CREATE_FROM_VECTOR("Message size parser", &error_list); return nullptr; } return absl::make_unique(max_request_message_bytes, max_response_message_bytes); } void MessageSizeParser::Register() { g_message_size_parser_index = ServiceConfigParser::RegisterParser( absl::make_unique()); } size_t MessageSizeParser::ParserIndex() { return g_message_size_parser_index; } int GetMaxRecvSizeFromChannelArgs(const grpc_channel_args* args) { if (grpc_channel_args_want_minimal_stack(args)) return -1; return grpc_channel_args_find_integer( args, GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, {GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH, -1, INT_MAX}); } int GetMaxSendSizeFromChannelArgs(const grpc_channel_args* args) { if (grpc_channel_args_want_minimal_stack(args)) return -1; return grpc_channel_args_find_integer( args, GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, {GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH, -1, INT_MAX}); } } // namespace grpc_core namespace { struct channel_data { grpc_core::MessageSizeParsedConfig::message_size_limits limits; }; struct call_data { call_data(grpc_call_element* elem, const channel_data& chand, const grpc_call_element_args& args) : call_combiner(args.call_combiner), limits(chand.limits) { GRPC_CLOSURE_INIT(&recv_message_ready, ::recv_message_ready, elem, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready, ::recv_trailing_metadata_ready, elem, grpc_schedule_on_exec_ctx); // Get max sizes from channel data, then merge in per-method config values. // Note: Per-method config is only available on the client, so we // apply the max request size to the send limit and the max response // size to the receive limit. const grpc_core::MessageSizeParsedConfig* limits = grpc_core::MessageSizeParsedConfig::GetFromCallContext(args.context); if (limits != nullptr) { if (limits->limits().max_send_size >= 0 && (limits->limits().max_send_size < this->limits.max_send_size || this->limits.max_send_size < 0)) { this->limits.max_send_size = limits->limits().max_send_size; } if (limits->limits().max_recv_size >= 0 && (limits->limits().max_recv_size < this->limits.max_recv_size || this->limits.max_recv_size < 0)) { this->limits.max_recv_size = limits->limits().max_recv_size; } } } ~call_data() { GRPC_ERROR_UNREF(error); } grpc_core::CallCombiner* call_combiner; grpc_core::MessageSizeParsedConfig::message_size_limits limits; // Receive closures are chained: we inject this closure as the // recv_message_ready up-call on transport_stream_op, and remember to // call our next_recv_message_ready member after handling it. grpc_closure recv_message_ready; grpc_closure recv_trailing_metadata_ready; // The error caused by a message that is too large, or GRPC_ERROR_NONE grpc_error_handle error = GRPC_ERROR_NONE; // Used by recv_message_ready. grpc_core::OrphanablePtr* recv_message = nullptr; // Original recv_message_ready callback, invoked after our own. grpc_closure* next_recv_message_ready = nullptr; // Original recv_trailing_metadata callback, invoked after our own. grpc_closure* original_recv_trailing_metadata_ready; bool seen_recv_trailing_metadata = false; grpc_error_handle recv_trailing_metadata_error; }; } // namespace // Callback invoked when we receive a message. Here we check the max // receive message size. static void recv_message_ready(void* user_data, grpc_error_handle error) { grpc_call_element* elem = static_cast(user_data); call_data* calld = static_cast(elem->call_data); if (*calld->recv_message != nullptr && calld->limits.max_recv_size >= 0 && (*calld->recv_message)->length() > static_cast(calld->limits.max_recv_size)) { grpc_error_handle new_error = grpc_error_set_int( GRPC_ERROR_CREATE_FROM_COPIED_STRING( absl::StrFormat("Received message larger than max (%u vs. %d)", (*calld->recv_message)->length(), calld->limits.max_recv_size) .c_str()), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED); error = grpc_error_add_child(GRPC_ERROR_REF(error), new_error); GRPC_ERROR_UNREF(calld->error); calld->error = GRPC_ERROR_REF(error); } else { GRPC_ERROR_REF(error); } // Invoke the next callback. grpc_closure* closure = calld->next_recv_message_ready; calld->next_recv_message_ready = nullptr; if (calld->seen_recv_trailing_metadata) { /* We might potentially see another RECV_MESSAGE op. In that case, we do not * want to run the recv_trailing_metadata_ready closure again. The newer * RECV_MESSAGE op cannot cause any errors since the transport has already * invoked the recv_trailing_metadata_ready closure and all further * RECV_MESSAGE ops will get null payloads. */ calld->seen_recv_trailing_metadata = false; GRPC_CALL_COMBINER_START(calld->call_combiner, &calld->recv_trailing_metadata_ready, calld->recv_trailing_metadata_error, "continue recv_trailing_metadata_ready"); } grpc_core::Closure::Run(DEBUG_LOCATION, closure, error); } // Callback invoked on completion of recv_trailing_metadata // Notifies the recv_trailing_metadata batch of any message size failures static void recv_trailing_metadata_ready(void* user_data, grpc_error_handle error) { grpc_call_element* elem = static_cast(user_data); call_data* calld = static_cast(elem->call_data); if (calld->next_recv_message_ready != nullptr) { calld->seen_recv_trailing_metadata = true; calld->recv_trailing_metadata_error = GRPC_ERROR_REF(error); GRPC_CALL_COMBINER_STOP(calld->call_combiner, "deferring recv_trailing_metadata_ready until " "after recv_message_ready"); return; } error = grpc_error_add_child(GRPC_ERROR_REF(error), GRPC_ERROR_REF(calld->error)); // Invoke the next callback. grpc_core::Closure::Run(DEBUG_LOCATION, calld->original_recv_trailing_metadata_ready, error); } // Start transport stream op. static void message_size_start_transport_stream_op_batch( grpc_call_element* elem, grpc_transport_stream_op_batch* op) { call_data* calld = static_cast(elem->call_data); // Check max send message size. if (op->send_message && calld->limits.max_send_size >= 0 && op->payload->send_message.send_message->length() > static_cast(calld->limits.max_send_size)) { grpc_transport_stream_op_batch_finish_with_failure( op, grpc_error_set_int( GRPC_ERROR_CREATE_FROM_COPIED_STRING( absl::StrFormat( "Sent message larger than max (%u vs. %d)", op->payload->send_message.send_message->length(), calld->limits.max_send_size) .c_str()), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED), calld->call_combiner); return; } // Inject callback for receiving a message. if (op->recv_message) { calld->next_recv_message_ready = op->payload->recv_message.recv_message_ready; calld->recv_message = op->payload->recv_message.recv_message; op->payload->recv_message.recv_message_ready = &calld->recv_message_ready; } // Inject callback for receiving trailing metadata. if (op->recv_trailing_metadata) { calld->original_recv_trailing_metadata_ready = op->payload->recv_trailing_metadata.recv_trailing_metadata_ready; op->payload->recv_trailing_metadata.recv_trailing_metadata_ready = &calld->recv_trailing_metadata_ready; } // Chain to the next filter. grpc_call_next_op(elem, op); } // Constructor for call_data. static grpc_error_handle message_size_init_call_elem( grpc_call_element* elem, const grpc_call_element_args* args) { channel_data* chand = static_cast(elem->channel_data); new (elem->call_data) call_data(elem, *chand, *args); return GRPC_ERROR_NONE; } // Destructor for call_data. static void message_size_destroy_call_elem( grpc_call_element* elem, const grpc_call_final_info* /*final_info*/, grpc_closure* /*ignored*/) { call_data* calld = static_cast(elem->call_data); calld->~call_data(); } grpc_core::MessageSizeParsedConfig::message_size_limits get_message_size_limits( const grpc_channel_args* channel_args) { grpc_core::MessageSizeParsedConfig::message_size_limits lim; lim.max_send_size = grpc_core::GetMaxSendSizeFromChannelArgs(channel_args); lim.max_recv_size = grpc_core::GetMaxRecvSizeFromChannelArgs(channel_args); return lim; } // Constructor for channel_data. static grpc_error_handle message_size_init_channel_elem( grpc_channel_element* elem, grpc_channel_element_args* args) { GPR_ASSERT(!args->is_last); channel_data* chand = static_cast(elem->channel_data); new (chand) channel_data(); chand->limits = get_message_size_limits(args->channel_args); return GRPC_ERROR_NONE; } // Destructor for channel_data. static void message_size_destroy_channel_elem(grpc_channel_element* elem) { channel_data* chand = static_cast(elem->channel_data); chand->~channel_data(); } const grpc_channel_filter grpc_message_size_filter = { message_size_start_transport_stream_op_batch, grpc_channel_next_op, sizeof(call_data), message_size_init_call_elem, grpc_call_stack_ignore_set_pollset_or_pollset_set, message_size_destroy_call_elem, sizeof(channel_data), message_size_init_channel_elem, message_size_destroy_channel_elem, grpc_channel_next_get_info, "message_size"}; // Used for GRPC_CLIENT_SUBCHANNEL static bool maybe_add_message_size_filter_subchannel( grpc_channel_stack_builder* builder, void* /*arg*/) { const grpc_channel_args* channel_args = grpc_channel_stack_builder_get_channel_arguments(builder); if (grpc_channel_args_want_minimal_stack(channel_args)) { return true; } return grpc_channel_stack_builder_prepend_filter( builder, &grpc_message_size_filter, nullptr, nullptr); } // Used for GRPC_CLIENT_DIRECT_CHANNEL and GRPC_SERVER_CHANNEL. Adds the filter // only if message size limits or service config is specified. static bool maybe_add_message_size_filter(grpc_channel_stack_builder* builder, void* /*arg*/) { const grpc_channel_args* channel_args = grpc_channel_stack_builder_get_channel_arguments(builder); if (grpc_channel_args_want_minimal_stack(channel_args)) { return true; } bool enable = false; grpc_core::MessageSizeParsedConfig::message_size_limits lim = get_message_size_limits(channel_args); if (lim.max_send_size != -1 || lim.max_recv_size != -1) { enable = true; } const grpc_arg* a = grpc_channel_args_find(channel_args, GRPC_ARG_SERVICE_CONFIG); const char* svc_cfg_str = grpc_channel_arg_get_string(a); if (svc_cfg_str != nullptr) { enable = true; } if (enable) { return grpc_channel_stack_builder_prepend_filter( builder, &grpc_message_size_filter, nullptr, nullptr); } else { return true; } } void grpc_message_size_filter_init(void) { grpc_channel_init_register_stage( GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, maybe_add_message_size_filter_subchannel, nullptr); grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, maybe_add_message_size_filter, nullptr); grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, maybe_add_message_size_filter, nullptr); grpc_core::MessageSizeParser::Register(); } void grpc_message_size_filter_shutdown(void) {}