lib/logstash/inputs/gelf.rb in logstash-input-gelf-0.1.3 vs lib/logstash/inputs/gelf.rb in logstash-input-gelf-0.1.4

- old
+ new

@@ -44,30 +44,46 @@ public def initialize(params) super BasicSocket.do_not_reverse_lookup = true + @shutdown_requested = false + @udp = nil end # def initialize public def register require 'gelfd' - @udp = nil end # def register public def run(output_queue) begin # udp server udp_listener(output_queue) + rescue LogStash::ShutdownSignal + @shutdown_requested = true rescue => e - @logger.warn("gelf listener died", :exception => e, :backtrace => e.backtrace) - sleep(5) - retry + unless @shutdown_requested + @logger.warn("gelf listener died", :exception => e, :backtrace => e.backtrace) + sleep(5) + retry + end end # begin end # def run + public + def teardown + @shutdown_requested = true + if @udp + @udp.close_read rescue nil + @udp.close_write rescue nil + @udp = nil + end + finished + end + private def udp_listener(output_queue) @logger.info("Starting gelf listener", :address => "#{@host}:#{@port}") if @udp @@ -76,11 +92,11 @@ end @udp = UDPSocket.new(Socket::AF_INET) @udp.bind(@host, @port) - while true + while !@shutdown_requested line, client = @udp.recvfrom(8192) begin data = Gelfd::Parser.parse(line) rescue => ex @logger.warn("Gelfd failed to parse a message skipping", :exception => ex, :backtrace => ex.backtrace) @@ -89,25 +105,20 @@ # Gelfd parser outputs null if it received and cached a non-final chunk next if data.nil? event = LogStash::Event.new(LogStash::Json.load(data)) + event["source_host"] = client[3] if event["timestamp"].is_a?(Numeric) event.timestamp = LogStash::Timestamp.at(event["timestamp"]) event.remove("timestamp") end remap_gelf(event) if @remap strip_leading_underscore(event) if @strip_leading_underscore decorate(event) + output_queue << event - end - rescue LogStash::ShutdownSignal - # Do nothing, shutdown. - ensure - if @udp - @udp.close_read rescue nil - @udp.close_write rescue nil end end # def udp_listener private def remap_gelf(event)