lib/logstash/inputs/gelf.rb in logstash-input-gelf-0.1.3 vs lib/logstash/inputs/gelf.rb in logstash-input-gelf-0.1.4
- old
+ new
@@ -44,30 +44,46 @@
public
def initialize(params)
super
BasicSocket.do_not_reverse_lookup = true
+ @shutdown_requested = false
+ @udp = nil
end # def initialize
public
def register
require 'gelfd'
- @udp = nil
end # def register
public
def run(output_queue)
begin
# udp server
udp_listener(output_queue)
+ rescue LogStash::ShutdownSignal
+ @shutdown_requested = true
rescue => e
- @logger.warn("gelf listener died", :exception => e, :backtrace => e.backtrace)
- sleep(5)
- retry
+ unless @shutdown_requested
+ @logger.warn("gelf listener died", :exception => e, :backtrace => e.backtrace)
+ sleep(5)
+ retry
+ end
end # begin
end # def run
+ public
+ def teardown
+ @shutdown_requested = true
+ if @udp
+ @udp.close_read rescue nil
+ @udp.close_write rescue nil
+ @udp = nil
+ end
+ finished
+ end
+
private
def udp_listener(output_queue)
@logger.info("Starting gelf listener", :address => "#{@host}:#{@port}")
if @udp
@@ -76,11 +92,11 @@
end
@udp = UDPSocket.new(Socket::AF_INET)
@udp.bind(@host, @port)
- while true
+ while !@shutdown_requested
line, client = @udp.recvfrom(8192)
begin
data = Gelfd::Parser.parse(line)
rescue => ex
@logger.warn("Gelfd failed to parse a message skipping", :exception => ex, :backtrace => ex.backtrace)
@@ -89,25 +105,20 @@
# Gelfd parser outputs null if it received and cached a non-final chunk
next if data.nil?
event = LogStash::Event.new(LogStash::Json.load(data))
+
event["source_host"] = client[3]
if event["timestamp"].is_a?(Numeric)
event.timestamp = LogStash::Timestamp.at(event["timestamp"])
event.remove("timestamp")
end
remap_gelf(event) if @remap
strip_leading_underscore(event) if @strip_leading_underscore
decorate(event)
+
output_queue << event
- end
- rescue LogStash::ShutdownSignal
- # Do nothing, shutdown.
- ensure
- if @udp
- @udp.close_read rescue nil
- @udp.close_write rescue nil
end
end # def udp_listener
private
def remap_gelf(event)