lib/logstash/inputs/beats_support/connection_handler.rb in logstash-input-beats-2.2.7 vs lib/logstash/inputs/beats_support/connection_handler.rb in logstash-input-beats-2.2.8

- old
+ new

@@ -1,9 +1,10 @@ # encoding: utf-8 require "logstash/inputs/beats" require "logstash/inputs/beats_support/decoded_event_transform" require "logstash/inputs/beats_support/raw_event_transform" +require "lumberjack/beats" module LogStash::Inputs::BeatsSupport # Handle the data coming from a connection # Decide which Process should be used to decode the data coming # from the beat library. @@ -38,11 +39,15 @@ :event_hash => hash, :identity_stream => identity_stream, :peer => @connection.peer) # Filebeats uses the `message` key and LSF `line` - target_field = @input.target_field_for_codec ? hash.delete(@input.target_field_for_codec) : nil + target_field = if from_filebeat?(hash) + hash.delete(Lumberjack::Beats::FILEBEAT_LOG_LINE_FIELD) + elsif from_logstash_forwarder?(hash) + hash.delete(Lumberjack::Beats::LSF_LOG_LINE_FIELD) + end if target_field.nil? @logger.debug? && @logger.debug("Beats input: not using the codec for this event, can't find the codec target field", :target_field_for_codec => @input.target_field_for_codec, :event_hash => hash) @@ -72,8 +77,17 @@ def flush(&block) @logger.debug? && @logger.debug("Beats input, out of band call for flushing the content of this connection", :peer => @connection.peer) @codec.flush(&block) + end + + private + def from_filebeat?(hash) + !hash[Lumberjack::Beats::FILEBEAT_LOG_LINE_FIELD].nil? + end + + def from_logstash_forwarder?(hash) + !hash[Lumberjack::Beats::LSF_LOG_LINE_FIELD].nil? end end end