lib/logstash/inputs/http_poller.rb in logstash-input-http_poller-5.0.2 vs lib/logstash/inputs/http_poller.rb in logstash-input-http_poller-5.1.0
- old
+ new
@@ -3,14 +3,23 @@
require "logstash/namespace"
require "logstash/plugin_mixins/http_client"
require "socket" # for Socket.gethostname
require "manticore"
require "rufus/scheduler"
+require "logstash/plugin_mixins/ecs_compatibility_support"
+require 'logstash/plugin_mixins/ecs_compatibility_support/target_check'
+require 'logstash/plugin_mixins/validator_support/field_reference_validation_adapter'
+require 'logstash/plugin_mixins/event_support/event_factory_adapter'
class LogStash::Inputs::HTTP_Poller < LogStash::Inputs::Base
include LogStash::PluginMixins::HttpClient
+ include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)
+ include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck
+ include LogStash::PluginMixins::EventSupport::EventFactoryAdapter
+ extend LogStash::PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter
+
config_name "http_poller"
default :codec, "json"
# A Hash of urls in this format : `"name" => "url"`.
@@ -26,11 +35,11 @@
# b) { "cron" => "* * * * * UTC" }
# See: rufus/scheduler for details about different schedule options and value string format
config :schedule, :validate => :hash, :required => true
# Define the target field for placing the received data. If this setting is omitted, the data will be stored at the root (top level) of the event.
- config :target, :validate => :string
+ config :target, :validate => :field_reference
# If you'd like to work with the request/response metadata.
# Set this value to the name of the field you'd like to store a nested
# hash of metadata.
config :metadata_target, :validate => :string, :default => '@metadata'
@@ -40,10 +49,11 @@
def register
@host = Socket.gethostname.force_encoding(Encoding::UTF_8)
@logger.info("Registering http_poller Input", :type => @type, :schedule => @schedule, :timeout => @timeout)
+ setup_ecs_field!
setup_requests!
end
def stop
Stud.stop!(@interval_thread) if @interval_thread
@@ -54,10 +64,39 @@
def setup_requests!
@requests = Hash[@urls.map {|name, url| [name, normalize_request(url)] }]
end
private
+ # In the context of ECS, there are two type of events in this plugin, valid HTTP response and failure
+ # For a valid HTTP response, `url`, `request_method` and `host` are metadata of request.
+ # The call could retrieve event which contain `[url]`, `[http][request][method]`, `[host][hostname]` data
+ # Therefore, metadata should not write to those fields
+ # For a failure, `url`, `request_method` and `host` are primary data of the event because the plugin owns this event,
+ # so it writes to url.*, http.*, host.*
+ def setup_ecs_field!
+ @request_host_field = ecs_select[disabled: "[#{metadata_target}][host]", v1: "[#{metadata_target}][input][http_poller][request][host][hostname]"]
+ @response_code_field = ecs_select[disabled: "[#{metadata_target}][code]", v1: "[#{metadata_target}][input][http_poller][response][status_code]"]
+ @response_headers_field = ecs_select[disabled: "[#{metadata_target}][response_headers]", v1: "[#{metadata_target}][input][http_poller][response][headers]"]
+ @response_message_field = ecs_select[disabled: "[#{metadata_target}][response_message]", v1: "[#{metadata_target}][input][http_poller][response][status_message]"]
+ @response_time_s_field = ecs_select[disabled: "[#{metadata_target}][runtime_seconds]", v1: nil]
+ @response_time_ns_field = ecs_select[disabled: nil, v1: "[#{metadata_target}][input][http_poller][response][elapsed_time_ns]"]
+ @request_retry_count_field = ecs_select[disabled: "[#{metadata_target}][times_retried]", v1: "[#{metadata_target}][input][http_poller][request][retry_count]"]
+ @request_name_field = ecs_select[disabled: "[#{metadata_target}][name]", v1: "[#{metadata_target}][input][http_poller][request][name]"]
+ @original_request_field = ecs_select[disabled: "[#{metadata_target}][request]", v1: "[#{metadata_target}][input][http_poller][request][original]"]
+
+ @error_msg_field = ecs_select[disabled: "[http_request_failure][error]", v1: "[error][message]"]
+ @stack_trace_field = ecs_select[disabled: "[http_request_failure][backtrace]", v1: "[error][stack_trace]"]
+ @fail_original_request_field = ecs_select[disabled: "[http_request_failure][request]", v1: nil]
+ @fail_request_name_field = ecs_select[disabled: "[http_request_failure][name]", v1: nil]
+ @fail_response_time_s_field = ecs_select[disabled: "[http_request_failure][runtime_seconds]", v1: nil]
+ @fail_response_time_ns_field = ecs_select[disabled: nil, v1: "[event][duration]"]
+ @fail_request_url_field = ecs_select[disabled: nil, v1: "[url][full]"]
+ @fail_request_method_field = ecs_select[disabled: nil, v1: "[http][request][method]"]
+ @fail_request_host_field = ecs_select[disabled: nil, v1: "[host][hostname]"]
+ end
+
+ private
def normalize_request(url_or_spec)
if url_or_spec.is_a?(String)
res = [:get, url_or_spec]
elsif url_or_spec.is_a?(Hash)
# The client will expect keys / values
@@ -149,28 +188,32 @@
@logger.debug? && @logger.debug("Fetching URL", :name => name, :url => request)
started = Time.now
method, *request_opts = request
client.async.send(method, *request_opts).
- on_success {|response| handle_success(queue, name, request, response, Time.now - started)}.
- on_failure {|exception|
- handle_failure(queue, name, request, exception, Time.now - started)
- }
+ on_success {|response| handle_success(queue, name, request, response, Time.now - started) }.
+ on_failure {|exception| handle_failure(queue, name, request, exception, Time.now - started) }
end
private
+ # time diff in float to nanoseconds
+ def to_nanoseconds(time_diff)
+ (time_diff * 1000000).to_i
+ end
+
+ private
def handle_success(queue, name, request, response, execution_time)
body = response.body
# If there is a usable response. HEAD requests are `nil` and empty get
# responses come up as "" which will cause the codec to not yield anything
if body && body.size > 0
decode_and_flush(@codec, body) do |decoded|
- event = @target ? LogStash::Event.new(@target => decoded.to_hash) : decoded
+ event = @target ? targeted_event_factory.new_event(decoded.to_hash) : decoded
handle_decoded_event(queue, name, request, response, event, execution_time)
end
else
- event = ::LogStash::Event.new
+ event = event_factory.new_event
handle_decoded_event(queue, name, request, response, event, execution_time)
end
end
private
@@ -195,25 +238,15 @@
end
private
# Beware, on old versions of manticore some uncommon failures are not handled
def handle_failure(queue, name, request, exception, execution_time)
- event = LogStash::Event.new
- apply_metadata(event, name, request)
-
+ event = event_factory.new_event
event.tag("_http_request_failure")
+ apply_metadata(event, name, request, nil, execution_time)
+ apply_failure_fields(event, name, request, exception, execution_time)
- # This is also in the metadata, but we send it anyone because we want this
- # persisted by default, whereas metadata isn't. People don't like mysterious errors
- event.set("http_request_failure", {
- "request" => structure_request(request),
- "name" => name,
- "error" => exception.to_s,
- "backtrace" => exception.backtrace,
- "runtime_seconds" => execution_time
- })
-
queue << event
rescue StandardError, java.lang.Exception => e
@logger.error? && @logger.error("Cannot read URL or send the error as an event!",
:exception => e,
:exception_message => e.message,
@@ -229,32 +262,42 @@
:name => name,
:url => request)
end
private
- def apply_metadata(event, name, request, response=nil, execution_time=nil)
+ def apply_metadata(event, name, request, response, execution_time)
return unless @metadata_target
- event.set(@metadata_target, event_metadata(name, request, response, execution_time))
- end
- private
- def event_metadata(name, request, response=nil, execution_time=nil)
- m = {
- "name" => name,
- "host" => @host,
- "request" => structure_request(request),
- }
+ event.set(@request_host_field, @host)
+ event.set(@response_time_s_field, execution_time) if @response_time_s_field
+ event.set(@response_time_ns_field, to_nanoseconds(execution_time)) if @response_time_ns_field
+ event.set(@request_name_field, name)
+ event.set(@original_request_field, structure_request(request))
- m["runtime_seconds"] = execution_time
-
if response
- m["code"] = response.code
- m["response_headers"] = response.headers
- m["response_message"] = response.message
- m["times_retried"] = response.times_retried
+ event.set(@response_code_field, response.code)
+ event.set(@response_headers_field, response.headers)
+ event.set(@response_message_field, response.message)
+ event.set(@request_retry_count_field, response.times_retried)
end
+ end
- m
+ private
+ def apply_failure_fields(event, name, request, exception, execution_time)
+ # This is also in the metadata, but we send it anyone because we want this
+ # persisted by default, whereas metadata isn't. People don't like mysterious errors
+ event.set(@fail_original_request_field, structure_request(request)) if @fail_original_request_field
+ event.set(@fail_request_name_field, name) if @fail_request_name_field
+
+ method, url, _ = request
+ event.set(@fail_request_url_field, url) if @fail_request_url_field
+ event.set(@fail_request_method_field, method.to_s) if @fail_request_method_field
+ event.set(@fail_request_host_field, @host) if @fail_request_host_field
+
+ event.set(@fail_response_time_s_field, execution_time) if @fail_response_time_s_field
+ event.set(@fail_response_time_ns_field, to_nanoseconds(execution_time)) if @fail_response_time_ns_field
+ event.set(@error_msg_field, exception.to_s)
+ event.set(@stack_trace_field, exception.backtrace)
end
private
# Turn [method, url, spec] requests into a hash for friendlier logging / ES indexing
def structure_request(request)