require "logstash/outputs/elasticsearch/template_manager" module LogStash; module PluginMixins; module ElasticSearch module Common # This module defines common methods that can be reused by alternate elasticsearch output plugins such as the elasticsearch_data_streams output. attr_reader :client, :hosts # These codes apply to documents, not at the request level DOC_DLQ_CODES = [400, 404] DOC_SUCCESS_CODES = [200, 201] DOC_CONFLICT_CODE = 409 # Perform some ES options validations and Build the HttpClient. # Note that this methods may sets the @user, @password, @hosts and @client ivars as a side effect. # @param license_checker [#appropriate_license?] An optional license checker that will be used by the Pool class. # @return [HttpClient] the new http client def build_client(license_checker = nil) params["license_checker"] = license_checker # the following 3 options validation & setup methods are called inside build_client # because they must be executed prior to building the client and logstash # monitoring and management rely on directly calling build_client # see https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/934#pullrequestreview-396203307 validate_authentication fill_hosts_from_cloud_id setup_hosts params["metric"] = metric if @proxy.eql?('') @logger.warn "Supplied proxy setting (proxy => '') has no effect" end @client ||= ::LogStash::Outputs::ElasticSearch::HttpClientBuilder.build(@logger, @hosts, params) end def validate_authentication authn_options = 0 authn_options += 1 if @cloud_auth authn_options += 1 if (@api_key && @api_key.value) authn_options += 1 if (@user || (@password && @password.value)) if authn_options > 1 raise LogStash::ConfigurationError, 'Multiple authentication options are specified, please only use one of user/password, cloud_auth or api_key' end if @api_key && @api_key.value && @ssl != true raise(LogStash::ConfigurationError, "Using api_key authentication requires SSL/TLS secured communication using the `ssl => true` option") end if @cloud_auth @user, @password = parse_user_password_from_cloud_auth(@cloud_auth) # params is the plugin global params hash which will be passed to HttpClientBuilder.build params['user'], params['password'] = @user, @password end end private :validate_authentication def setup_hosts @hosts = Array(@hosts) if @hosts.empty? @logger.info("No 'host' set in elasticsearch output. Defaulting to localhost") @hosts.replace(["localhost"]) end end def hosts_default?(hosts) # NOTE: would be nice if pipeline allowed us a clean way to detect a config default : hosts.is_a?(Array) && hosts.size == 1 && hosts.first.equal?(LogStash::PluginMixins::ElasticSearch::APIConfigs::DEFAULT_HOST) end private :hosts_default? def fill_hosts_from_cloud_id return unless @cloud_id if @hosts && !hosts_default?(@hosts) raise LogStash::ConfigurationError, 'Both cloud_id and hosts specified, please only use one of those.' end @hosts = parse_host_uri_from_cloud_id(@cloud_id) end def parse_host_uri_from_cloud_id(cloud_id) begin # might not be available on older LS require 'logstash/util/cloud_setting_id' rescue LoadError raise LogStash::ConfigurationError, 'The cloud_id setting is not supported by your version of Logstash, ' + 'please upgrade your installation (or set hosts instead).' end begin cloud_id = LogStash::Util::CloudSettingId.new(cloud_id) # already does append ':{port}' to host rescue ArgumentError => e raise LogStash::ConfigurationError, e.message.to_s.sub(/Cloud Id/i, 'cloud_id') end cloud_uri = "#{cloud_id.elasticsearch_scheme}://#{cloud_id.elasticsearch_host}" LogStash::Util::SafeURI.new(cloud_uri) end private :parse_host_uri_from_cloud_id def parse_user_password_from_cloud_auth(cloud_auth) begin # might not be available on older LS require 'logstash/util/cloud_setting_auth' rescue LoadError raise LogStash::ConfigurationError, 'The cloud_auth setting is not supported by your version of Logstash, ' + 'please upgrade your installation (or set user/password instead).' end cloud_auth = cloud_auth.value if cloud_auth.is_a?(LogStash::Util::Password) begin cloud_auth = LogStash::Util::CloudSettingAuth.new(cloud_auth) rescue ArgumentError => e raise LogStash::ConfigurationError, e.message.to_s.sub(/Cloud Auth/i, 'cloud_auth') end [ cloud_auth.username, cloud_auth.password ] end private :parse_user_password_from_cloud_auth def maximum_seen_major_version client.maximum_seen_major_version end def successful_connection? !!maximum_seen_major_version end # launch a thread that waits for an initial successful connection to the ES cluster to call the given block # @param block [Proc] the block to execute upon initial successful connection # @return [Thread] the successful connection wait thread def setup_after_successful_connection(&block) Thread.new do sleep_interval = @retry_initial_interval until successful_connection? || @stopping.true? @logger.debug("Waiting for connectivity to Elasticsearch cluster. Retrying in #{sleep_interval}s") Stud.stoppable_sleep(sleep_interval) { @stopping.true? } sleep_interval = next_sleep_interval(sleep_interval) end block.call if successful_connection? end end def discover_cluster_uuid return unless defined?(plugin_metadata) cluster_info = client.get('/') plugin_metadata.set(:cluster_uuid, cluster_info['cluster_uuid']) rescue => e # TODO introducing this logging message breaks many tests that need refactoring # @logger.error("Unable to retrieve elasticsearch cluster uuid", error => e.message) end def retrying_submit(actions) # Initially we submit the full list of actions submit_actions = actions sleep_interval = @retry_initial_interval while submit_actions && submit_actions.length > 0 # We retry with whatever is didn't succeed begin submit_actions = submit(submit_actions) if submit_actions && submit_actions.size > 0 @logger.info("Retrying individual bulk actions that failed or were rejected by the previous bulk request.", :count => submit_actions.size) end rescue => e @logger.error("Encountered an unexpected error submitting a bulk request! Will retry.", :error_message => e.message, :class => e.class.name, :backtrace => e.backtrace) end # Everything was a success! break if !submit_actions || submit_actions.empty? # If we're retrying the action sleep for the recommended interval # Double the interval for the next time through to achieve exponential backoff Stud.stoppable_sleep(sleep_interval) { @stopping.true? } sleep_interval = next_sleep_interval(sleep_interval) end end def sleep_for_interval(sleep_interval) Stud.stoppable_sleep(sleep_interval) { @stopping.true? } next_sleep_interval(sleep_interval) end def next_sleep_interval(current_interval) doubled = current_interval * 2 doubled > @retry_max_interval ? @retry_max_interval : doubled end def submit(actions) bulk_response = safe_bulk(actions) # If the response is nil that means we were in a retry loop # and aborted since we're shutting down return if bulk_response.nil? # If it did return and there are no errors we're good as well if bulk_response["errors"] @bulk_request_metrics.increment(:with_errors) else @bulk_request_metrics.increment(:successes) @document_level_metrics.increment(:successes, actions.size) return end actions_to_retry = [] bulk_response["items"].each_with_index do |response,idx| action_type, action_props = response.first status = action_props["status"] failure = action_props["error"] action = actions[idx] action_params = action[1] # Retry logic: If it is success, we move on. If it is a failure, we have 3 paths: # - For 409, we log and drop. there is nothing we can do # - For a mapping error, we send to dead letter queue for a human to intervene at a later point. # - For everything else there's mastercard. Yep, and we retry indefinitely. This should fix #572 and other transient network issues if DOC_SUCCESS_CODES.include?(status) @document_level_metrics.increment(:successes) next elsif DOC_CONFLICT_CODE == status @document_level_metrics.increment(:non_retryable_failures) @logger.warn "Failed action.", status: status, action: action, response: response if !failure_type_logging_whitelist.include?(failure["type"]) next elsif DOC_DLQ_CODES.include?(status) handle_dlq_status("Could not index event to Elasticsearch.", action, status, response) @document_level_metrics.increment(:non_retryable_failures) next else # only log what the user whitelisted @document_level_metrics.increment(:retryable_failures) @logger.info "retrying failed action with response code: #{status} (#{failure})" if !failure_type_logging_whitelist.include?(failure["type"]) actions_to_retry << action end end actions_to_retry end def handle_dlq_status(message, action, status, response) # To support bwc, we check if DLQ exists. otherwise we log and drop event (previous behavior) if @dlq_writer # TODO: Change this to send a map with { :status => status, :action => action } in the future @dlq_writer.write(action[2], "#{message} status: #{status}, action: #{action}, response: #{response}") else error_type = response.fetch('index', {}).fetch('error', {})['type'] if 'invalid_index_name_exception' == error_type level = :error else level = :warn end @logger.send level, message, status: status, action: action, response: response end end # Rescue retryable errors during bulk submission def safe_bulk(actions) sleep_interval = @retry_initial_interval begin es_actions = actions.map {|action_type, params, event| [action_type, params, event.to_hash]} response = @client.bulk(es_actions) response rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError => e # If we can't even connect to the server let's just print out the URL (:hosts is actually a URL) # and let the user sort it out from there @logger.error( "Attempted to send a bulk request to elasticsearch'"+ " but Elasticsearch appears to be unreachable or down!", :error_message => e.message, :class => e.class.name, :will_retry_in_seconds => sleep_interval ) @logger.debug("Failed actions for last bad bulk request!", :actions => actions) # We retry until there are no errors! Errors should all go to the retry queue sleep_interval = sleep_for_interval(sleep_interval) @bulk_request_metrics.increment(:failures) retry unless @stopping.true? rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::NoConnectionAvailableError => e @logger.error( "Attempted to send a bulk request to elasticsearch, but no there are no living connections in the connection pool. Perhaps Elasticsearch is unreachable or down?", :error_message => e.message, :class => e.class.name, :will_retry_in_seconds => sleep_interval ) Stud.stoppable_sleep(sleep_interval) { @stopping.true? } sleep_interval = next_sleep_interval(sleep_interval) @bulk_request_metrics.increment(:failures) retry unless @stopping.true? rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e @bulk_request_metrics.increment(:failures) log_hash = {:code => e.response_code, :url => e.url.sanitized.to_s} log_hash[:body] = e.response_body if @logger.debug? # Generally this is too verbose message = "Encountered a retryable error. Will Retry with exponential backoff " # We treat 429s as a special case because these really aren't errors, but # rather just ES telling us to back off a bit, which we do. # The other retryable code is 503, which are true errors # Even though we retry the user should be made aware of these if e.response_code == 429 logger.debug(message, log_hash) else logger.error(message, log_hash) end sleep_interval = sleep_for_interval(sleep_interval) retry rescue => e # Stuff that should never happen # For all other errors print out full connection issues @logger.error( "An unknown error occurred sending a bulk request to Elasticsearch. We will retry indefinitely", :error_message => e.message, :error_class => e.class.name, :backtrace => e.backtrace ) @logger.debug("Failed actions for last bad bulk request!", :actions => actions) sleep_interval = sleep_for_interval(sleep_interval) @bulk_request_metrics.increment(:failures) retry unless @stopping.true? end end def dlq_enabled? # TODO there should be a better way to query if DLQ is enabled # See more in: https://github.com/elastic/logstash/issues/8064 respond_to?(:execution_context) && execution_context.respond_to?(:dlq_writer) && !execution_context.dlq_writer.inner_writer.is_a?(::LogStash::Util::DummyDeadLetterQueueWriter) end end end; end; end