lib/logstash/inputs/http_poller.rb in logstash-input-http_poller-5.0.2 vs lib/logstash/inputs/http_poller.rb in logstash-input-http_poller-5.1.0

- old
+ new

@@ -3,14 +3,23 @@ require "logstash/namespace" require "logstash/plugin_mixins/http_client" require "socket" # for Socket.gethostname require "manticore" require "rufus/scheduler" +require "logstash/plugin_mixins/ecs_compatibility_support" +require 'logstash/plugin_mixins/ecs_compatibility_support/target_check' +require 'logstash/plugin_mixins/validator_support/field_reference_validation_adapter' +require 'logstash/plugin_mixins/event_support/event_factory_adapter' class LogStash::Inputs::HTTP_Poller < LogStash::Inputs::Base include LogStash::PluginMixins::HttpClient + include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1) + include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck + include LogStash::PluginMixins::EventSupport::EventFactoryAdapter + extend LogStash::PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter + config_name "http_poller" default :codec, "json" # A Hash of urls in this format : `"name" => "url"`. @@ -26,11 +35,11 @@ # b) { "cron" => "* * * * * UTC" } # See: rufus/scheduler for details about different schedule options and value string format config :schedule, :validate => :hash, :required => true # Define the target field for placing the received data. If this setting is omitted, the data will be stored at the root (top level) of the event. - config :target, :validate => :string + config :target, :validate => :field_reference # If you'd like to work with the request/response metadata. # Set this value to the name of the field you'd like to store a nested # hash of metadata. config :metadata_target, :validate => :string, :default => '@metadata' @@ -40,10 +49,11 @@ def register @host = Socket.gethostname.force_encoding(Encoding::UTF_8) @logger.info("Registering http_poller Input", :type => @type, :schedule => @schedule, :timeout => @timeout) + setup_ecs_field! setup_requests! end def stop Stud.stop!(@interval_thread) if @interval_thread @@ -54,10 +64,39 @@ def setup_requests! @requests = Hash[@urls.map {|name, url| [name, normalize_request(url)] }] end private + # In the context of ECS, there are two type of events in this plugin, valid HTTP response and failure + # For a valid HTTP response, `url`, `request_method` and `host` are metadata of request. + # The call could retrieve event which contain `[url]`, `[http][request][method]`, `[host][hostname]` data + # Therefore, metadata should not write to those fields + # For a failure, `url`, `request_method` and `host` are primary data of the event because the plugin owns this event, + # so it writes to url.*, http.*, host.* + def setup_ecs_field! + @request_host_field = ecs_select[disabled: "[#{metadata_target}][host]", v1: "[#{metadata_target}][input][http_poller][request][host][hostname]"] + @response_code_field = ecs_select[disabled: "[#{metadata_target}][code]", v1: "[#{metadata_target}][input][http_poller][response][status_code]"] + @response_headers_field = ecs_select[disabled: "[#{metadata_target}][response_headers]", v1: "[#{metadata_target}][input][http_poller][response][headers]"] + @response_message_field = ecs_select[disabled: "[#{metadata_target}][response_message]", v1: "[#{metadata_target}][input][http_poller][response][status_message]"] + @response_time_s_field = ecs_select[disabled: "[#{metadata_target}][runtime_seconds]", v1: nil] + @response_time_ns_field = ecs_select[disabled: nil, v1: "[#{metadata_target}][input][http_poller][response][elapsed_time_ns]"] + @request_retry_count_field = ecs_select[disabled: "[#{metadata_target}][times_retried]", v1: "[#{metadata_target}][input][http_poller][request][retry_count]"] + @request_name_field = ecs_select[disabled: "[#{metadata_target}][name]", v1: "[#{metadata_target}][input][http_poller][request][name]"] + @original_request_field = ecs_select[disabled: "[#{metadata_target}][request]", v1: "[#{metadata_target}][input][http_poller][request][original]"] + + @error_msg_field = ecs_select[disabled: "[http_request_failure][error]", v1: "[error][message]"] + @stack_trace_field = ecs_select[disabled: "[http_request_failure][backtrace]", v1: "[error][stack_trace]"] + @fail_original_request_field = ecs_select[disabled: "[http_request_failure][request]", v1: nil] + @fail_request_name_field = ecs_select[disabled: "[http_request_failure][name]", v1: nil] + @fail_response_time_s_field = ecs_select[disabled: "[http_request_failure][runtime_seconds]", v1: nil] + @fail_response_time_ns_field = ecs_select[disabled: nil, v1: "[event][duration]"] + @fail_request_url_field = ecs_select[disabled: nil, v1: "[url][full]"] + @fail_request_method_field = ecs_select[disabled: nil, v1: "[http][request][method]"] + @fail_request_host_field = ecs_select[disabled: nil, v1: "[host][hostname]"] + end + + private def normalize_request(url_or_spec) if url_or_spec.is_a?(String) res = [:get, url_or_spec] elsif url_or_spec.is_a?(Hash) # The client will expect keys / values @@ -149,28 +188,32 @@ @logger.debug? && @logger.debug("Fetching URL", :name => name, :url => request) started = Time.now method, *request_opts = request client.async.send(method, *request_opts). - on_success {|response| handle_success(queue, name, request, response, Time.now - started)}. - on_failure {|exception| - handle_failure(queue, name, request, exception, Time.now - started) - } + on_success {|response| handle_success(queue, name, request, response, Time.now - started) }. + on_failure {|exception| handle_failure(queue, name, request, exception, Time.now - started) } end private + # time diff in float to nanoseconds + def to_nanoseconds(time_diff) + (time_diff * 1000000).to_i + end + + private def handle_success(queue, name, request, response, execution_time) body = response.body # If there is a usable response. HEAD requests are `nil` and empty get # responses come up as "" which will cause the codec to not yield anything if body && body.size > 0 decode_and_flush(@codec, body) do |decoded| - event = @target ? LogStash::Event.new(@target => decoded.to_hash) : decoded + event = @target ? targeted_event_factory.new_event(decoded.to_hash) : decoded handle_decoded_event(queue, name, request, response, event, execution_time) end else - event = ::LogStash::Event.new + event = event_factory.new_event handle_decoded_event(queue, name, request, response, event, execution_time) end end private @@ -195,25 +238,15 @@ end private # Beware, on old versions of manticore some uncommon failures are not handled def handle_failure(queue, name, request, exception, execution_time) - event = LogStash::Event.new - apply_metadata(event, name, request) - + event = event_factory.new_event event.tag("_http_request_failure") + apply_metadata(event, name, request, nil, execution_time) + apply_failure_fields(event, name, request, exception, execution_time) - # This is also in the metadata, but we send it anyone because we want this - # persisted by default, whereas metadata isn't. People don't like mysterious errors - event.set("http_request_failure", { - "request" => structure_request(request), - "name" => name, - "error" => exception.to_s, - "backtrace" => exception.backtrace, - "runtime_seconds" => execution_time - }) - queue << event rescue StandardError, java.lang.Exception => e @logger.error? && @logger.error("Cannot read URL or send the error as an event!", :exception => e, :exception_message => e.message, @@ -229,32 +262,42 @@ :name => name, :url => request) end private - def apply_metadata(event, name, request, response=nil, execution_time=nil) + def apply_metadata(event, name, request, response, execution_time) return unless @metadata_target - event.set(@metadata_target, event_metadata(name, request, response, execution_time)) - end - private - def event_metadata(name, request, response=nil, execution_time=nil) - m = { - "name" => name, - "host" => @host, - "request" => structure_request(request), - } + event.set(@request_host_field, @host) + event.set(@response_time_s_field, execution_time) if @response_time_s_field + event.set(@response_time_ns_field, to_nanoseconds(execution_time)) if @response_time_ns_field + event.set(@request_name_field, name) + event.set(@original_request_field, structure_request(request)) - m["runtime_seconds"] = execution_time - if response - m["code"] = response.code - m["response_headers"] = response.headers - m["response_message"] = response.message - m["times_retried"] = response.times_retried + event.set(@response_code_field, response.code) + event.set(@response_headers_field, response.headers) + event.set(@response_message_field, response.message) + event.set(@request_retry_count_field, response.times_retried) end + end - m + private + def apply_failure_fields(event, name, request, exception, execution_time) + # This is also in the metadata, but we send it anyone because we want this + # persisted by default, whereas metadata isn't. People don't like mysterious errors + event.set(@fail_original_request_field, structure_request(request)) if @fail_original_request_field + event.set(@fail_request_name_field, name) if @fail_request_name_field + + method, url, _ = request + event.set(@fail_request_url_field, url) if @fail_request_url_field + event.set(@fail_request_method_field, method.to_s) if @fail_request_method_field + event.set(@fail_request_host_field, @host) if @fail_request_host_field + + event.set(@fail_response_time_s_field, execution_time) if @fail_response_time_s_field + event.set(@fail_response_time_ns_field, to_nanoseconds(execution_time)) if @fail_response_time_ns_field + event.set(@error_msg_field, exception.to_s) + event.set(@stack_trace_field, exception.backtrace) end private # Turn [method, url, spec] requests into a hash for friendlier logging / ES indexing def structure_request(request)