lib/logstash/inputs/gelf.rb in logstash-input-gelf-3.1.1 vs lib/logstash/inputs/gelf.rb in logstash-input-gelf-3.2.0
- old
+ new
@@ -65,147 +65,140 @@
# Whether or not to use TCP or/and UDP
config :use_tcp, :validate => :boolean, :default => false
config :use_udp, :validate => :boolean, :default => true
- public
def initialize(params)
super
BasicSocket.do_not_reverse_lookup = true
- end # def initialize
+ end
- public
def register
require 'gelfd'
@port_tcp ||= @port
@port_udp ||= @port
end
- public
def run(output_queue)
begin
if @use_tcp
- tcp_thr = Thread.new(output_queue) do |output_queue|
- tcp_listener(output_queue)
- end
+ @tcp_thr = Thread.new(output_queue) { |output_queue| tcp_listener(output_queue) }
end
if @use_udp
- udp_thr = Thread.new(output_queue) do |output_queue|
- udp_listener(output_queue)
- end
+ @udp_thr = Thread.new(output_queue) { |output_queue| udp_listener(output_queue) }
end
rescue => e
unless stop?
@logger.warn("gelf listener died", :exception => e, :backtrace => e.backtrace)
Stud.stoppable_sleep(RECONNECT_BACKOFF_SLEEP) { stop? }
- retry unless stop?
+ if !stop?
+ # before retrying make sure we close all sockets
+ stop
+ wait_server_thread
+ retry
+ end
end
- end # begin
- if @use_tcp
- tcp_thr.join
end
- if @use_udp
- udp_thr.join
- end
- end # def run
- public
+ wait_server_thread
+ end
+
def stop
begin
@udp.close if @use_udp
- rescue IOError => e
- @logger.warn("Caugh exception while closing udp socket", :exception => e.inspect)
+ @udp = nil
+ rescue => e
+ @logger.debug("Caught exception while closing udp socket", :exception => e)
end
begin
@tcp.close if @use_tcp
- rescue IOError => e
- @logger.warn("Caugh exception while closing tcp socket", :exception => e.inspect)
+ @tcp = nil
+ rescue => e
+ @logger.debug("Caught exception while closing tcp socket", :exception => e)
end
end
private
- def tcp_listener(output_queue)
+ def wait_server_thread
+ @tcp_thr.join if @use_tcp
+ @udp_thr.join if @use_udp
+ end
+
+ def tcp_listener(output_queue)
@logger.info("Starting gelf listener (tcp) ...", :address => "#{@host}:#{@port_tcp}")
if @tcp.nil?
@tcp = TCPServer.new(@host, @port_tcp)
end
- while !@shutdown_requested
+ while !stop?
Thread.new(@tcp.accept) do |client|
- @logger.debug? && @logger.debug("Gelf (tcp): Accepting connection from: #{client.peeraddr[2]}:#{client.peeraddr[1]}")
-
begin
- while !client.nil? && !client.eof?
+ @logger.debug? && @logger.debug("Gelf (tcp): Accepting connection from: #{client.peeraddr[2]}:#{client.peeraddr[1]}")
+ while !client.nil? && !client.eof? && !stop?
+
begin # Read from socket
data_in = client.gets("\u0000")
rescue => ex
- @logger.warn("Gelf (tcp): failed gets from client socket:", :exception => ex, :backtrace => ex.backtrace)
+ if !stop?
+ @logger.warn("Gelf (tcp): failed gets from client socket:", :exception => ex, :backtrace => ex.backtrace)
+ end
end
if data_in.nil?
- @logger.warn("Gelf (tcp): socket read succeeded, but data is nil. Skipping.")
+ @logger.debug("Gelf (tcp): socket read succeeded, but data is nil. Skipping.")
next
- end
+ end
# data received. Remove trailing \0
data_in[-1] == "\u0000" && data_in = data_in[0...-1]
- begin # Parse JSON
- jsonObj = JSON.parse(data_in)
- rescue => ex
- @logger.warn("Gelf (tcp): failed to parse a message. Skipping: " + data_in, :exception => ex, :backtrace => ex.backtrace)
- next
- end
- begin # Create event
- event = LogStash::Event.new(jsonObj)
- event.set(SOURCE_HOST_FIELD, host.force_encoding("UTF-8"))
- if event.get("timestamp").is_a?(Numeric)
- event.set("timestamp", LogStash::Timestamp.at(event.get("timestamp")))
- event.remove("timestamp")
- end
- remap_gelf(event) if @remap
- strip_leading_underscore(event) if @strip_leading_underscore
- decorate(event)
- output_queue << event
- rescue => ex
- @logger.warn("Gelf (tcp): failed to create event from json object. Skipping: " + jsonObj.to_s, :exception => ex, :backtrace => ex.backtrace)
- end
+ event = self.class.new_event(data_in, client.peeraddr[3])
+ next if event.nil?
- end # while client
+ remap_gelf(event) if @remap
+ strip_leading_underscore(event) if @strip_leading_underscore
+ decorate(event)
+ output_queue << event
+ end
+
@logger.debug? && @logger.debug("Gelf (tcp): Closing client connection")
- client.close
+ client.close rescue nil
client = nil
rescue => ex
- @logger.warn("Gelf (tcp): client socket failed.", :exception => ex, :backtrace => ex.backtrace)
+ if !stop?
+ @logger.warn("Gelf (tcp): client socket failed.", :exception => ex, :backtrace => ex.backtrace)
+ end
ensure
if !client.nil?
@logger.debug? && @logger.debug("Gelf (tcp): Ensuring client is closed")
- client.close
+ client.close rescue nil
client = nil
end
- end # begin client
- end # Thread.new
- end # @shutdown_requested
-
+ end
+ end
+ end
+ ensure
+ @logger.debug? && @logger.debug("Gelf (tcp): Ensuring tcp server is closed")
+ @tcp.close rescue nil
+ @tcp = nil
end
- private
def udp_listener(output_queue)
@logger.info("Starting gelf listener (udp) ...", :address => "#{@host}:#{@port_udp}")
@udp = UDPSocket.new(Socket::AF_INET)
@udp.bind(@host, @port_udp)
- while !@udp.closed?
+ while !stop?
begin
line, client = @udp.recvfrom(8192)
rescue => e
- if !stop? # if we're shutting down there's no point in logging anything
- @logger.error("Caught exception while reading from UDP socket", :exception => e.inspect)
+ if !stop?
+ @logger.error("Caught exception while reading from UDP socket", :exception => e)
end
next
end
begin
@@ -225,11 +218,15 @@
strip_leading_underscore(event) if @strip_leading_underscore
decorate(event)
output_queue << event
end
- end # def udp_listener
+ ensure
+ @logger.debug? && @logger.debug("Gelf (udp): Ensuring udp socket is closed")
+ @udp.close rescue nil
+ @udp = nil
+ end
# generate a new LogStash::Event from json input and assign host to source_host event field.
# @param json_gelf [String] GELF json data
# @param host [String] source host of GELF data
# @return [LogStash::Event] new event with parsed json gelf, assigned source host and coerced timestamp
@@ -254,36 +251,18 @@
def self.coerce_timestamp(timestamp)
# bug in JRuby prevents correcly parsing a BigDecimal fractional part, see https://github.com/elastic/logstash/issues/4565
timestamp.is_a?(BigDecimal) ? LogStash::Timestamp.at(timestamp.to_i, timestamp.frac * 1000000) : LogStash::Timestamp.at(timestamp)
end
- # from_json_parse uses the Event#from_json method to deserialize and directly produce events
- def self.from_json_parse(json)
- # from_json will always return an array of item.
- # in the context of gelf, the payload should be an array of 1
- LogStash::Event.from_json(json).first
- rescue LogStash::Json::ParserError => e
- logger.error(PARSE_FAILURE_LOG_MESSAGE, :error => e, :data => json)
- LogStash::Event.new(MESSAGE_FIELD => json, TAGS_FIELD => [PARSE_FAILURE_TAG, '_fromjsonparser'])
- end # def self.from_json_parse
-
- # legacy_parse uses the LogStash::Json class to deserialize json
- def self.legacy_parse(json)
+ def self.parse(json)
o = LogStash::Json.load(json)
LogStash::Event.new(o)
rescue LogStash::Json::ParserError => e
- logger.error(PARSE_FAILURE_LOG_MESSAGE, :error => e, :data => json)
- LogStash::Event.new(MESSAGE_FIELD => json, TAGS_FIELD => [PARSE_FAILURE_TAG, '_legacyjsonparser'])
- end # def self.parse
-
- # keep compatibility with all v2.x distributions. only in 2.3 will the Event#from_json method be introduced
- # and we need to keep compatibility for all v2 releases.
- class << self
- alias_method :parse, LogStash::Event.respond_to?(:from_json) ? :from_json_parse : :legacy_parse
+ logger.error(PARSE_FAILURE_LOG_MESSAGE, :error => e, :data => json.inspect)
+ LogStash::Event.new(MESSAGE_FIELD => json.inspect, TAGS_FIELD => [PARSE_FAILURE_TAG])
end
- private
def remap_gelf(event)
if event.get("full_message") && !event.get("full_message").empty?
event.set("message", event.get("full_message").dup)
event.remove("full_message")
if event.get("short_message") == event.get("message")
@@ -291,17 +270,16 @@
end
elsif event.get("short_message") && !event.get("short_message").empty?
event.set("message", event.get("short_message").dup)
event.remove("short_message")
end
- end # def remap_gelf
+ end
- private
def strip_leading_underscore(event)
# Map all '_foo' fields to simply 'foo'
event.to_hash.keys.each do |key|
next unless key[0,1] == "_"
event.set(key[1..-1], event.get(key))
event.remove(key)
end
- end # deef removing_leading_underscores
-end # class LogStash::Inputs::Gelf
+ end
+end