lib/syslogstash/syslog_reader.rb in syslogstash-0.4.1 vs lib/syslogstash/syslog_reader.rb in syslogstash-1.0.0

- old
+ new

@@ -5,14 +5,28 @@ class Syslogstash::SyslogReader include Syslogstash::Worker attr_reader :file - def initialize(file, tags, logstash, metrics) - @file, @tags, @logstash, @metrics = file, tags, logstash, metrics + def initialize(file, config, logstash, metrics) + @file, @logstash, @metrics = file, logstash, metrics + config ||= {} - log { "initializing syslog socket #{file} with tags #{tags.inspect}" } + @add_fields = config['add_fields'] || {} + @relay_to = config['relay_to'] || [] + + unless @add_fields.is_a? Hash + raise ArgumentError, + "add_fields parameter to socket #{file} must be a hash" + end + + unless @relay_to.is_a? Array + raise ArgumentError, + "relay_to parameter to socket #{file} must be an array" + end + + log { "initialized syslog socket #{file} with config #{config.inspect}" } end # Start reading from the socket file, parsing entries, and flinging # them at logstash. This method will return, with the operation # continuing in a separate thread. @@ -26,21 +40,22 @@ rescue Errno::EEXIST, Errno::EADDRINUSE log { "socket file #{@file} already exists; deleting" } File.unlink(@file) rescue nil retry rescue SystemCallError - $stderr.puts "Error while trying to bind to #{@file}" + log { "Error while trying to bind to #{@file}" } raise end @worker = Thread.new do begin loop do msg = socket.recvmsg debug { "Message received: #{msg.inspect}" } @metrics.received(@file, Time.now) process_message msg.first.chomp + relay_message msg.first end ensure socket.close log { "removing socket file #{@file}" } File.unlink(@file) rescue nil @@ -85,11 +100,11 @@ message: message, ).to_json @logstash.send_entry(log_entry) else - $stderr.puts "Unparseable message: #{msg}" + log { "Unparseable message: #{msg}" } end end def log_entry(h) {}.tap do |e| @@ -98,13 +113,46 @@ h[:facility_name] = FACILITIES[h[:facility]] h[:severity_name] = SEVERITIES[h[:severity]] e.merge!(h.delete_if { |k,v| v.nil? }) + e.merge!(@add_fields) - e.merge!(@tags) if @tags.is_a? Hash - debug { "Log entry is: #{e.inspect}" } + end + end + + def relay_message(msg) + @currently_failed ||= {} + + @relay_to.each do |f| + s = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM, 0) + begin + s.connect(Socket.pack_sockaddr_un(f)) + rescue Errno::ENOENT + # Socket doesn't exist; we don't care enough about this to bother + # reporting it. People will figure it out themselves soon enough. + rescue StandardError => ex + log { "Error while connecting to relay socket #{f}: #{ex.message} (#{ex.class})" } + next + end + + begin + s.sendmsg_nonblock(msg) + if @currently_failed[f] + log { "Backlog on socket #{f} has cleared; messages are being delivered again" } + @currently_failed[f] = false + end + rescue Errno::ENOTCONN + # Socket isn't being listened to. Not our problem. + rescue IO::EAGAINWaitWritable + unless @currently_failed[f] + log { "Socket #{f} is backlogged; messages to this socket from socket #{@file} are being discarded undelivered" } + @currently_failed = true + end + rescue StandardError => ex + log { "Failed to relay message to socket #{f} from #{@file}: #{ex.message} (#{ex.class})" } + end end end FACILITIES = %w{ kern