lib/logstash/inputs/gelf.rb in logstash-input-gelf-3.0.7 vs lib/logstash/inputs/gelf.rb in logstash-input-gelf-3.1.0
- old
+ new
@@ -4,10 +4,11 @@
require "logstash/json"
require "logstash/timestamp"
require "stud/interval"
require "date"
require "socket"
+require "json"
# This input will read GELF messages as events over the network,
# making it a good choice if you already use Graylog2 today.
#
# The main use case for this input is to leverage existing GELF
@@ -27,13 +28,16 @@
default :codec, "plain"
# The IP address or hostname to listen on.
config :host, :validate => :string, :default => "0.0.0.0"
- # The port to listen on. Remember that ports less than 1024 (privileged
+ # The ports to listen on. Remember that ports less than 1024 (privileged
# ports) may require root to use.
+ # port_tcp and port_udp can be used to have a different port for udp than the tcp port.
config :port, :validate => :number, :default => 12201
+ config :port_tcp, :validate => :number
+ config :port_udp, :validate => :number
# Whether or not to remap the GELF message fields to Logstash event fields or
# leave them intact.
#
# Remapping converts the following GELF fields to Logstash equivalents:
@@ -57,50 +61,154 @@
MESSAGE_FIELD = "message"
TAGS_FIELD = "tags"
PARSE_FAILURE_TAG = "_jsonparsefailure"
PARSE_FAILURE_LOG_MESSAGE = "JSON parse failure. Falling back to plain-text"
+ # Whether or not to use TCP or/and UDP
+ config :use_tcp, :validate => :boolean, :default => false
+ config :use_udp, :validate => :boolean, :default => true
+
public
def initialize(params)
super
BasicSocket.do_not_reverse_lookup = true
end # def initialize
public
def register
require 'gelfd'
- end # def register
+ @port_tcp ||= @port
+ @port_udp ||= @port
+ end
public
def run(output_queue)
begin
- # udp server
- udp_listener(output_queue)
+ if @use_tcp
+ tcp_thr = Thread.new(output_queue) do |output_queue|
+ tcp_listener(output_queue)
+ end
+ end
+ if @use_udp
+ udp_thr = Thread.new(output_queue) do |output_queue|
+ udp_listener(output_queue)
+ end
+ end
rescue => e
unless stop?
@logger.warn("gelf listener died", :exception => e, :backtrace => e.backtrace)
Stud.stoppable_sleep(RECONNECT_BACKOFF_SLEEP) { stop? }
retry unless stop?
end
end # begin
+ if @use_tcp
+ tcp_thr.join
+ end
+ if @use_udp
+ udp_thr.join
+ end
end # def run
public
def stop
- @udp.close
- rescue IOError # the plugin is currently shutting down, so its safe to ignore theses errors
+ begin
+ @udp.close if @use_udp
+ rescue IOError => e
+ @logger.warn("Caugh exception while closing udp socket", :exception => e.inspect)
+ end
+ begin
+ @tcp.close if @use_tcp
+ rescue IOError => e
+ @logger.warn("Caugh exception while closing tcp socket", :exception => e.inspect)
+ end
end
private
+ def tcp_listener(output_queue)
+
+ @logger.info("Starting gelf listener (tcp) ...", :address => "#{@host}:#{@port_tcp}")
+
+ if @tcp.nil?
+ @tcp = TCPServer.new(@host, @port_tcp)
+ end
+
+ while !@shutdown_requested
+ Thread.new(@tcp.accept) do |client|
+ @logger.debug? && @logger.debug("Gelf (tcp): Accepting connection from: #{client.peeraddr[2]}:#{client.peeraddr[1]}")
+
+ begin
+ while !client.nil? && !client.eof?
+
+ begin # Read from socket
+ data_in = client.gets("\u0000")
+ rescue => ex
+ @logger.warn("Gelf (tcp): failed gets from client socket:", :exception => ex, :backtrace => ex.backtrace)
+ end
+
+ if data_in.nil?
+ @logger.warn("Gelf (tcp): socket read succeeded, but data is nil. Skipping.")
+ next
+ end
+
+ # data received. Remove trailing \0
+ data_in[-1] == "\u0000" && data_in = data_in[0...-1]
+ begin # Parse JSON
+ jsonObj = JSON.parse(data_in)
+ rescue => ex
+ @logger.warn("Gelf (tcp): failed to parse a message. Skipping: " + data_in, :exception => ex, :backtrace => ex.backtrace)
+ next
+ end
+
+ begin # Create event
+ event = LogStash::Event.new(jsonObj)
+ event.set(SOURCE_HOST_FIELD, host.force_encoding("UTF-8"))
+ if event.get("timestamp").is_a?(Numeric)
+ event.set("timestamp", LogStash::Timestamp.at(event.get("timestamp")))
+ event.remove("timestamp")
+ end
+ remap_gelf(event) if @remap
+ strip_leading_underscore(event) if @strip_leading_underscore
+ decorate(event)
+ output_queue << event
+ rescue => ex
+ @logger.warn("Gelf (tcp): failed to create event from json object. Skipping: " + jsonObj.to_s, :exception => ex, :backtrace => ex.backtrace)
+ end
+
+ end # while client
+ @logger.debug? && @logger.debug("Gelf (tcp): Closing client connection")
+ client.close
+ client = nil
+ rescue => ex
+ @logger.warn("Gelf (tcp): client socket failed.", :exception => ex, :backtrace => ex.backtrace)
+ ensure
+ if !client.nil?
+ @logger.debug? && @logger.debug("Gelf (tcp): Ensuring client is closed")
+ client.close
+ client = nil
+ end
+ end # begin client
+ end # Thread.new
+ end # @shutdown_requested
+
+ end
+
+ private
def udp_listener(output_queue)
- @logger.info("Starting gelf listener", :address => "#{@host}:#{@port}")
+ @logger.info("Starting gelf listener (udp) ...", :address => "#{@host}:#{@port_udp}")
@udp = UDPSocket.new(Socket::AF_INET)
- @udp.bind(@host, @port)
+ @udp.bind(@host, @port_udp)
- while !stop?
- line, client = @udp.recvfrom(8192)
+ while !@udp.closed?
+ begin
+ line, client = @udp.recvfrom(8192)
+ rescue => e
+ if !stop? # if we're shutting down there's no point in logging anything
+ @logger.error("Caught exception while reading from UDP socket", :exception => e.inspect)
+ end
+ next
+ end
begin
data = Gelfd::Parser.parse(line)
rescue => ex
@logger.warn("Gelfd failed to parse a message skipping", :exception => ex, :backtrace => ex.backtrace)
@@ -127,10 +235,10 @@
# @return [LogStash::Event] new event with parsed json gelf, assigned source host and coerced timestamp
def self.new_event(json_gelf, host)
event = parse(json_gelf)
return if event.nil?
- event.set(SOURCE_HOST_FIELD, host)
+ event.set(SOURCE_HOST_FIELD, host.force_encoding("UTF-8"))
if (gelf_timestamp = event.get(TIMESTAMP_GELF_FIELD)).is_a?(Numeric)
event.timestamp = self.coerce_timestamp(gelf_timestamp)
event.remove(TIMESTAMP_GELF_FIELD)
end