lib/syslogstash/syslog_reader.rb in syslogstash-1.3.0 vs lib/syslogstash/syslog_reader.rb in syslogstash-2.1.0

- old
+ new

@@ -1,66 +1,50 @@ -require_relative 'worker' - # A single socket reader. # class Syslogstash::SyslogReader - include Syslogstash::Worker + attr_reader :thread - attr_reader :file + def initialize(cfg, logstash, stats) + @file, @logstash, @stats = cfg.syslog_socket, logstash, stats - def initialize(file, config, logstash, metrics) - @file, @logstash, @metrics = file, logstash, metrics - config ||= {} - - @add_fields = config['add_fields'] || {} - @relay_to = config['relay_to'] || [] - - unless @add_fields.is_a? Hash - raise ArgumentError, - "add_fields parameter to socket #{file} must be a hash" - end - - unless @relay_to.is_a? Array - raise ArgumentError, - "relay_to parameter to socket #{file} must be an array" - end - - log { "initialized syslog socket #{file} with config #{config.inspect}" } + @add_fields = cfg.add_fields + @relay_to = cfg.relay_sockets + @cfg = cfg + @logger = cfg.logger end # Start reading from the socket file, parsing entries, and flinging # them at logstash. This method will return, with the operation # continuing in a separate thread. # def run - debug { "#run called" } + @logger.debug("reader") { "#run called" } begin socket = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM, 0) socket.bind(Socket.pack_sockaddr_un(@file)) File.chmod(0666, @file) rescue Errno::EEXIST, Errno::EADDRINUSE - log { "socket file #{@file} already exists; deleting" } + @logger.info("reader") { "socket file #{@file} already exists; deleting" } File.unlink(@file) rescue nil retry - rescue SystemCallError - log { "Error while trying to bind to #{@file}" } - raise + rescue StandardError => ex + raise ex.class, "Error while trying to bind to #{@file}: #{ex.message}", ex.backtrace end - @worker = Thread.new do + @thread = Thread.new do begin loop do msg = socket.recvmsg - debug { "Message received: #{msg.inspect}" } - @metrics.received(@file, Time.now) - process_message msg.first.chomp + @logger.debug("reader") { "Message received: #{msg.inspect}" } + @stats.received(@file) relay_message msg.first + process_message msg.first.chomp end ensure socket.close - log { "removing socket file #{@file}" } + @logger.debug("reader") { "removing socket file #{@file}" } File.unlink(@file) rescue nil end end end @@ -101,11 +85,11 @@ message: message, ).to_json @logstash.send_entry(log_entry) else - log { "Unparseable message: #{msg}" } + @logger.warn("reader") { "Unparseable message: #{msg.inspect}" } end end def log_entry(h) {}.tap do |e| @@ -116,43 +100,58 @@ h[:severity_name] = SEVERITIES[h[:severity]] e.merge!(h.delete_if { |k,v| v.nil? }) e.merge!(@add_fields) - debug { "Log entry is: #{e.inspect}" } + @logger.debug("reader") { "Complete log entry is: #{e.inspect}" } end end def relay_message(msg) @currently_failed ||= {} + if @cfg.relay_to_stdout + # This one's easy + puts msg.sub(/\A<\d+>/, '') + $stdout.flush + end + @relay_to.each do |f| s = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM, 0) begin s.connect(Socket.pack_sockaddr_un(f)) rescue Errno::ENOENT # Socket doesn't exist; we don't care enough about this to bother # reporting it. People will figure it out themselves soon enough. rescue StandardError => ex - log { "Error while connecting to relay socket #{f}: #{ex.message} (#{ex.class})" } + unless @currently_failed[f] + @logger.warn("reader") { "Error while connecting to relay socket #{f}: #{ex.message} (#{ex.class})" } + @currently_failed[f] = true + end next end begin + # We really, *really* don't want to block the world just because + # whoever's on the other end of the relay socket can't process + # messages quick enough. s.sendmsg_nonblock(msg) if @currently_failed[f] - log { "Backlog on socket #{f} has cleared; messages are being delivered again" } + @logger.info("reader") { "Error on socket #{f} has cleared; messages are being delivered again" } @currently_failed[f] = false end rescue Errno::ENOTCONN - # Socket isn't being listened to. Not our problem. + unless @currently_failed[f] + @logger.debug("reader") { "Nothing is listening on socket #{f}" } + @currently_failed[f] = true + end rescue IO::EAGAINWaitWritable unless @currently_failed[f] - log { "Socket #{f} is backlogged; messages to this socket from socket #{@file} are being discarded undelivered" } + @logger.warn("reader") { "Socket #{f} is currently backlogged; messages to this socket are now being discarded undelivered" } @currently_failed[f] = true end rescue StandardError => ex - log { "Failed to relay message to socket #{f} from #{@file}: #{ex.message} (#{ex.class})" } + @logger.warn("reader") { (["Failed to relay message to socket #{f} from #{@file}: #{ex.message} (#{ex.class})"] + ex.backtrace).join("\n ") } end end end FACILITIES = %w{