lib/logstash/inputs/delf.rb in logstash-input-delf-3.0.3 vs lib/logstash/inputs/delf.rb in logstash-input-delf-3.1.3

- old
+ new

@@ -3,10 +3,11 @@ require "logstash/namespace" require "logstash/json" require "logstash/timestamp" require "stud/interval" require "date" +require "base64" require "socket" # This input will read GELF messages as events over the network, # making it a good choice if you already use Graylog2 today. # @@ -32,11 +33,11 @@ # The port to listen on. Remember that ports less than 1024 (privileged # ports) may require root to use. config :port, :validate => :number, :default => 12201 # The incomplete mark, line ends with this mark will be considered as a incomplete event - config :continue_mark, :validate => :string, :default => "\\" + config :continue_mark_base64, :validate => :string, :default => "XA==" # The field to identify different event stream, for Docker events, it's 'container_id' config :track, :validate => :string, :default => 'container_id' RECONNECT_BACKOFF_SLEEP = 5 @@ -80,10 +81,11 @@ end private def udp_listener(output_queue) @logger.info("Starting delf listener", :address => "#{@host}:#{@port}") + @continue_mark = Base64.urlsafe_decode64(@continue_mark_base64) @udp = UDPSocket.new(Socket::AF_INET) @udp.bind(@host, @port) while !stop? @@ -197,10 +199,13 @@ # Ignore if no message found message = event.get("message") return event unless message.kind_of?(String) + # strip right + message = message.rstrip + # Fetch last event last_event = @incomplete_events[track_id] if message.end_with?(@continue_mark) # remove the continue_mark @@ -212,11 +217,11 @@ # cache it as a pending event @incomplete_events[track_id] = event return nil else # append content to pending event - last_event.set("message", last_event.get("message") + "\n" + message) + last_event.set("message", last_event.get("message") + "\r\n" + message) # limit message length to 5000 if last_event.get("message").length > 5000 @incomplete_events[track_id] = nil return last_event else @@ -228,10 +233,10 @@ if last_event.nil? # just return if no pending incomplete event return event else # append content to pending incomplete event and return it - last_event.set("message", last_event.get("message") + "\n" + message) + last_event.set("message", last_event.get("message") + "\r\n" + message) # clear the pending incomplete event @incomplete_events[track_id] = nil return last_event end end