lib/logstash/inputs/beats_support/connection_handler.rb in logstash-input-beats-2.2.7 vs lib/logstash/inputs/beats_support/connection_handler.rb in logstash-input-beats-2.2.8
- old
+ new
@@ -1,9 +1,10 @@
# encoding: utf-8
require "logstash/inputs/beats"
require "logstash/inputs/beats_support/decoded_event_transform"
require "logstash/inputs/beats_support/raw_event_transform"
+require "lumberjack/beats"
module LogStash::Inputs::BeatsSupport
# Handle the data coming from a connection
# Decide which Process should be used to decode the data coming
# from the beat library.
@@ -38,11 +39,15 @@
:event_hash => hash,
:identity_stream => identity_stream,
:peer => @connection.peer)
# Filebeats uses the `message` key and LSF `line`
- target_field = @input.target_field_for_codec ? hash.delete(@input.target_field_for_codec) : nil
+ target_field = if from_filebeat?(hash)
+ hash.delete(Lumberjack::Beats::FILEBEAT_LOG_LINE_FIELD)
+ elsif from_logstash_forwarder?(hash)
+ hash.delete(Lumberjack::Beats::LSF_LOG_LINE_FIELD)
+ end
if target_field.nil?
@logger.debug? && @logger.debug("Beats input: not using the codec for this event, can't find the codec target field",
:target_field_for_codec => @input.target_field_for_codec,
:event_hash => hash)
@@ -72,8 +77,17 @@
def flush(&block)
@logger.debug? && @logger.debug("Beats input, out of band call for flushing the content of this connection",
:peer => @connection.peer)
@codec.flush(&block)
+ end
+
+ private
+ def from_filebeat?(hash)
+ !hash[Lumberjack::Beats::FILEBEAT_LOG_LINE_FIELD].nil?
+ end
+
+ def from_logstash_forwarder?(hash)
+ !hash[Lumberjack::Beats::LSF_LOG_LINE_FIELD].nil?
end
end
end