lib/logstash/inputs/gelf.rb in logstash-input-gelf-3.1.1 vs lib/logstash/inputs/gelf.rb in logstash-input-gelf-3.2.0

- old
+ new

@@ -65,147 +65,140 @@ # Whether or not to use TCP or/and UDP config :use_tcp, :validate => :boolean, :default => false config :use_udp, :validate => :boolean, :default => true - public def initialize(params) super BasicSocket.do_not_reverse_lookup = true - end # def initialize + end - public def register require 'gelfd' @port_tcp ||= @port @port_udp ||= @port end - public def run(output_queue) begin if @use_tcp - tcp_thr = Thread.new(output_queue) do |output_queue| - tcp_listener(output_queue) - end + @tcp_thr = Thread.new(output_queue) { |output_queue| tcp_listener(output_queue) } end if @use_udp - udp_thr = Thread.new(output_queue) do |output_queue| - udp_listener(output_queue) - end + @udp_thr = Thread.new(output_queue) { |output_queue| udp_listener(output_queue) } end rescue => e unless stop? @logger.warn("gelf listener died", :exception => e, :backtrace => e.backtrace) Stud.stoppable_sleep(RECONNECT_BACKOFF_SLEEP) { stop? } - retry unless stop? + if !stop? + # before retrying make sure we close all sockets + stop + wait_server_thread + retry + end end - end # begin - if @use_tcp - tcp_thr.join end - if @use_udp - udp_thr.join - end - end # def run - public + wait_server_thread + end + def stop begin @udp.close if @use_udp - rescue IOError => e - @logger.warn("Caugh exception while closing udp socket", :exception => e.inspect) + @udp = nil + rescue => e + @logger.debug("Caught exception while closing udp socket", :exception => e) end begin @tcp.close if @use_tcp - rescue IOError => e - @logger.warn("Caugh exception while closing tcp socket", :exception => e.inspect) + @tcp = nil + rescue => e + @logger.debug("Caught exception while closing tcp socket", :exception => e) end end private - def tcp_listener(output_queue) + def wait_server_thread + @tcp_thr.join if @use_tcp + @udp_thr.join if @use_udp + end + + def tcp_listener(output_queue) @logger.info("Starting gelf listener (tcp) ...", :address => "#{@host}:#{@port_tcp}") if @tcp.nil? @tcp = TCPServer.new(@host, @port_tcp) end - while !@shutdown_requested + while !stop? Thread.new(@tcp.accept) do |client| - @logger.debug? && @logger.debug("Gelf (tcp): Accepting connection from: #{client.peeraddr[2]}:#{client.peeraddr[1]}") - begin - while !client.nil? && !client.eof? + @logger.debug? && @logger.debug("Gelf (tcp): Accepting connection from: #{client.peeraddr[2]}:#{client.peeraddr[1]}") + while !client.nil? && !client.eof? && !stop? + begin # Read from socket data_in = client.gets("\u0000") rescue => ex - @logger.warn("Gelf (tcp): failed gets from client socket:", :exception => ex, :backtrace => ex.backtrace) + if !stop? + @logger.warn("Gelf (tcp): failed gets from client socket:", :exception => ex, :backtrace => ex.backtrace) + end end if data_in.nil? - @logger.warn("Gelf (tcp): socket read succeeded, but data is nil. Skipping.") + @logger.debug("Gelf (tcp): socket read succeeded, but data is nil. Skipping.") next - end + end # data received. Remove trailing \0 data_in[-1] == "\u0000" && data_in = data_in[0...-1] - begin # Parse JSON - jsonObj = JSON.parse(data_in) - rescue => ex - @logger.warn("Gelf (tcp): failed to parse a message. Skipping: " + data_in, :exception => ex, :backtrace => ex.backtrace) - next - end - begin # Create event - event = LogStash::Event.new(jsonObj) - event.set(SOURCE_HOST_FIELD, host.force_encoding("UTF-8")) - if event.get("timestamp").is_a?(Numeric) - event.set("timestamp", LogStash::Timestamp.at(event.get("timestamp"))) - event.remove("timestamp") - end - remap_gelf(event) if @remap - strip_leading_underscore(event) if @strip_leading_underscore - decorate(event) - output_queue << event - rescue => ex - @logger.warn("Gelf (tcp): failed to create event from json object. Skipping: " + jsonObj.to_s, :exception => ex, :backtrace => ex.backtrace) - end + event = self.class.new_event(data_in, client.peeraddr[3]) + next if event.nil? - end # while client + remap_gelf(event) if @remap + strip_leading_underscore(event) if @strip_leading_underscore + decorate(event) + output_queue << event + end + @logger.debug? && @logger.debug("Gelf (tcp): Closing client connection") - client.close + client.close rescue nil client = nil rescue => ex - @logger.warn("Gelf (tcp): client socket failed.", :exception => ex, :backtrace => ex.backtrace) + if !stop? + @logger.warn("Gelf (tcp): client socket failed.", :exception => ex, :backtrace => ex.backtrace) + end ensure if !client.nil? @logger.debug? && @logger.debug("Gelf (tcp): Ensuring client is closed") - client.close + client.close rescue nil client = nil end - end # begin client - end # Thread.new - end # @shutdown_requested - + end + end + end + ensure + @logger.debug? && @logger.debug("Gelf (tcp): Ensuring tcp server is closed") + @tcp.close rescue nil + @tcp = nil end - private def udp_listener(output_queue) @logger.info("Starting gelf listener (udp) ...", :address => "#{@host}:#{@port_udp}") @udp = UDPSocket.new(Socket::AF_INET) @udp.bind(@host, @port_udp) - while !@udp.closed? + while !stop? begin line, client = @udp.recvfrom(8192) rescue => e - if !stop? # if we're shutting down there's no point in logging anything - @logger.error("Caught exception while reading from UDP socket", :exception => e.inspect) + if !stop? + @logger.error("Caught exception while reading from UDP socket", :exception => e) end next end begin @@ -225,11 +218,15 @@ strip_leading_underscore(event) if @strip_leading_underscore decorate(event) output_queue << event end - end # def udp_listener + ensure + @logger.debug? && @logger.debug("Gelf (udp): Ensuring udp socket is closed") + @udp.close rescue nil + @udp = nil + end # generate a new LogStash::Event from json input and assign host to source_host event field. # @param json_gelf [String] GELF json data # @param host [String] source host of GELF data # @return [LogStash::Event] new event with parsed json gelf, assigned source host and coerced timestamp @@ -254,36 +251,18 @@ def self.coerce_timestamp(timestamp) # bug in JRuby prevents correcly parsing a BigDecimal fractional part, see https://github.com/elastic/logstash/issues/4565 timestamp.is_a?(BigDecimal) ? LogStash::Timestamp.at(timestamp.to_i, timestamp.frac * 1000000) : LogStash::Timestamp.at(timestamp) end - # from_json_parse uses the Event#from_json method to deserialize and directly produce events - def self.from_json_parse(json) - # from_json will always return an array of item. - # in the context of gelf, the payload should be an array of 1 - LogStash::Event.from_json(json).first - rescue LogStash::Json::ParserError => e - logger.error(PARSE_FAILURE_LOG_MESSAGE, :error => e, :data => json) - LogStash::Event.new(MESSAGE_FIELD => json, TAGS_FIELD => [PARSE_FAILURE_TAG, '_fromjsonparser']) - end # def self.from_json_parse - - # legacy_parse uses the LogStash::Json class to deserialize json - def self.legacy_parse(json) + def self.parse(json) o = LogStash::Json.load(json) LogStash::Event.new(o) rescue LogStash::Json::ParserError => e - logger.error(PARSE_FAILURE_LOG_MESSAGE, :error => e, :data => json) - LogStash::Event.new(MESSAGE_FIELD => json, TAGS_FIELD => [PARSE_FAILURE_TAG, '_legacyjsonparser']) - end # def self.parse - - # keep compatibility with all v2.x distributions. only in 2.3 will the Event#from_json method be introduced - # and we need to keep compatibility for all v2 releases. - class << self - alias_method :parse, LogStash::Event.respond_to?(:from_json) ? :from_json_parse : :legacy_parse + logger.error(PARSE_FAILURE_LOG_MESSAGE, :error => e, :data => json.inspect) + LogStash::Event.new(MESSAGE_FIELD => json.inspect, TAGS_FIELD => [PARSE_FAILURE_TAG]) end - private def remap_gelf(event) if event.get("full_message") && !event.get("full_message").empty? event.set("message", event.get("full_message").dup) event.remove("full_message") if event.get("short_message") == event.get("message") @@ -291,17 +270,16 @@ end elsif event.get("short_message") && !event.get("short_message").empty? event.set("message", event.get("short_message").dup) event.remove("short_message") end - end # def remap_gelf + end - private def strip_leading_underscore(event) # Map all '_foo' fields to simply 'foo' event.to_hash.keys.each do |key| next unless key[0,1] == "_" event.set(key[1..-1], event.get(key)) event.remove(key) end - end # deef removing_leading_underscores -end # class LogStash::Inputs::Gelf + end +end