lib/logstash/inputs/delf.rb in logstash-input-delf-3.0.3 vs lib/logstash/inputs/delf.rb in logstash-input-delf-3.1.3
- old
+ new
@@ -3,10 +3,11 @@
require "logstash/namespace"
require "logstash/json"
require "logstash/timestamp"
require "stud/interval"
require "date"
+require "base64"
require "socket"
# This input will read GELF messages as events over the network,
# making it a good choice if you already use Graylog2 today.
#
@@ -32,11 +33,11 @@
# The port to listen on. Remember that ports less than 1024 (privileged
# ports) may require root to use.
config :port, :validate => :number, :default => 12201
# The incomplete mark, line ends with this mark will be considered as a incomplete event
- config :continue_mark, :validate => :string, :default => "\\"
+ config :continue_mark_base64, :validate => :string, :default => "XA=="
# The field to identify different event stream, for Docker events, it's 'container_id'
config :track, :validate => :string, :default => 'container_id'
RECONNECT_BACKOFF_SLEEP = 5
@@ -80,10 +81,11 @@
end
private
def udp_listener(output_queue)
@logger.info("Starting delf listener", :address => "#{@host}:#{@port}")
+ @continue_mark = Base64.urlsafe_decode64(@continue_mark_base64)
@udp = UDPSocket.new(Socket::AF_INET)
@udp.bind(@host, @port)
while !stop?
@@ -197,10 +199,13 @@
# Ignore if no message found
message = event.get("message")
return event unless message.kind_of?(String)
+ # strip right
+ message = message.rstrip
+
# Fetch last event
last_event = @incomplete_events[track_id]
if message.end_with?(@continue_mark)
# remove the continue_mark
@@ -212,11 +217,11 @@
# cache it as a pending event
@incomplete_events[track_id] = event
return nil
else
# append content to pending event
- last_event.set("message", last_event.get("message") + "\n" + message)
+ last_event.set("message", last_event.get("message") + "\r\n" + message)
# limit message length to 5000
if last_event.get("message").length > 5000
@incomplete_events[track_id] = nil
return last_event
else
@@ -228,10 +233,10 @@
if last_event.nil?
# just return if no pending incomplete event
return event
else
# append content to pending incomplete event and return it
- last_event.set("message", last_event.get("message") + "\n" + message)
+ last_event.set("message", last_event.get("message") + "\r\n" + message)
# clear the pending incomplete event
@incomplete_events[track_id] = nil
return last_event
end
end