lib/logstash/inputs/gelf.rb in logstash-input-gelf-3.0.7 vs lib/logstash/inputs/gelf.rb in logstash-input-gelf-3.1.0

- old
+ new

@@ -4,10 +4,11 @@ require "logstash/json" require "logstash/timestamp" require "stud/interval" require "date" require "socket" +require "json" # This input will read GELF messages as events over the network, # making it a good choice if you already use Graylog2 today. # # The main use case for this input is to leverage existing GELF @@ -27,13 +28,16 @@ default :codec, "plain" # The IP address or hostname to listen on. config :host, :validate => :string, :default => "0.0.0.0" - # The port to listen on. Remember that ports less than 1024 (privileged + # The ports to listen on. Remember that ports less than 1024 (privileged # ports) may require root to use. + # port_tcp and port_udp can be used to have a different port for udp than the tcp port. config :port, :validate => :number, :default => 12201 + config :port_tcp, :validate => :number + config :port_udp, :validate => :number # Whether or not to remap the GELF message fields to Logstash event fields or # leave them intact. # # Remapping converts the following GELF fields to Logstash equivalents: @@ -57,50 +61,154 @@ MESSAGE_FIELD = "message" TAGS_FIELD = "tags" PARSE_FAILURE_TAG = "_jsonparsefailure" PARSE_FAILURE_LOG_MESSAGE = "JSON parse failure. Falling back to plain-text" + # Whether or not to use TCP or/and UDP + config :use_tcp, :validate => :boolean, :default => false + config :use_udp, :validate => :boolean, :default => true + public def initialize(params) super BasicSocket.do_not_reverse_lookup = true end # def initialize public def register require 'gelfd' - end # def register + @port_tcp ||= @port + @port_udp ||= @port + end public def run(output_queue) begin - # udp server - udp_listener(output_queue) + if @use_tcp + tcp_thr = Thread.new(output_queue) do |output_queue| + tcp_listener(output_queue) + end + end + if @use_udp + udp_thr = Thread.new(output_queue) do |output_queue| + udp_listener(output_queue) + end + end rescue => e unless stop? @logger.warn("gelf listener died", :exception => e, :backtrace => e.backtrace) Stud.stoppable_sleep(RECONNECT_BACKOFF_SLEEP) { stop? } retry unless stop? end end # begin + if @use_tcp + tcp_thr.join + end + if @use_udp + udp_thr.join + end end # def run public def stop - @udp.close - rescue IOError # the plugin is currently shutting down, so its safe to ignore theses errors + begin + @udp.close if @use_udp + rescue IOError => e + @logger.warn("Caugh exception while closing udp socket", :exception => e.inspect) + end + begin + @tcp.close if @use_tcp + rescue IOError => e + @logger.warn("Caugh exception while closing tcp socket", :exception => e.inspect) + end end private + def tcp_listener(output_queue) + + @logger.info("Starting gelf listener (tcp) ...", :address => "#{@host}:#{@port_tcp}") + + if @tcp.nil? + @tcp = TCPServer.new(@host, @port_tcp) + end + + while !@shutdown_requested + Thread.new(@tcp.accept) do |client| + @logger.debug? && @logger.debug("Gelf (tcp): Accepting connection from: #{client.peeraddr[2]}:#{client.peeraddr[1]}") + + begin + while !client.nil? && !client.eof? + + begin # Read from socket + data_in = client.gets("\u0000") + rescue => ex + @logger.warn("Gelf (tcp): failed gets from client socket:", :exception => ex, :backtrace => ex.backtrace) + end + + if data_in.nil? + @logger.warn("Gelf (tcp): socket read succeeded, but data is nil. Skipping.") + next + end + + # data received. Remove trailing \0 + data_in[-1] == "\u0000" && data_in = data_in[0...-1] + begin # Parse JSON + jsonObj = JSON.parse(data_in) + rescue => ex + @logger.warn("Gelf (tcp): failed to parse a message. Skipping: " + data_in, :exception => ex, :backtrace => ex.backtrace) + next + end + + begin # Create event + event = LogStash::Event.new(jsonObj) + event.set(SOURCE_HOST_FIELD, host.force_encoding("UTF-8")) + if event.get("timestamp").is_a?(Numeric) + event.set("timestamp", LogStash::Timestamp.at(event.get("timestamp"))) + event.remove("timestamp") + end + remap_gelf(event) if @remap + strip_leading_underscore(event) if @strip_leading_underscore + decorate(event) + output_queue << event + rescue => ex + @logger.warn("Gelf (tcp): failed to create event from json object. Skipping: " + jsonObj.to_s, :exception => ex, :backtrace => ex.backtrace) + end + + end # while client + @logger.debug? && @logger.debug("Gelf (tcp): Closing client connection") + client.close + client = nil + rescue => ex + @logger.warn("Gelf (tcp): client socket failed.", :exception => ex, :backtrace => ex.backtrace) + ensure + if !client.nil? + @logger.debug? && @logger.debug("Gelf (tcp): Ensuring client is closed") + client.close + client = nil + end + end # begin client + end # Thread.new + end # @shutdown_requested + + end + + private def udp_listener(output_queue) - @logger.info("Starting gelf listener", :address => "#{@host}:#{@port}") + @logger.info("Starting gelf listener (udp) ...", :address => "#{@host}:#{@port_udp}") @udp = UDPSocket.new(Socket::AF_INET) - @udp.bind(@host, @port) + @udp.bind(@host, @port_udp) - while !stop? - line, client = @udp.recvfrom(8192) + while !@udp.closed? + begin + line, client = @udp.recvfrom(8192) + rescue => e + if !stop? # if we're shutting down there's no point in logging anything + @logger.error("Caught exception while reading from UDP socket", :exception => e.inspect) + end + next + end begin data = Gelfd::Parser.parse(line) rescue => ex @logger.warn("Gelfd failed to parse a message skipping", :exception => ex, :backtrace => ex.backtrace) @@ -127,10 +235,10 @@ # @return [LogStash::Event] new event with parsed json gelf, assigned source host and coerced timestamp def self.new_event(json_gelf, host) event = parse(json_gelf) return if event.nil? - event.set(SOURCE_HOST_FIELD, host) + event.set(SOURCE_HOST_FIELD, host.force_encoding("UTF-8")) if (gelf_timestamp = event.get(TIMESTAMP_GELF_FIELD)).is_a?(Numeric) event.timestamp = self.coerce_timestamp(gelf_timestamp) event.remove(TIMESTAMP_GELF_FIELD) end