lib/syslogstash/syslog_reader.rb in syslogstash-2.2.0 vs lib/syslogstash/syslog_reader.rb in syslogstash-3.0.0

- old
+ new

@@ -1,184 +1,216 @@ # A single socket reader. # class Syslogstash::SyslogReader - attr_reader :thread + include ServiceSkeleton::BackgroundWorker - def initialize(cfg, logstash, stats) - @file, @logstash, @stats = cfg.syslog_socket, logstash, stats + def initialize(config, logstash, metrics) + @config, @logstash, @metrics = config, logstash, metrics - @add_fields = cfg.add_fields - @relay_to = cfg.relay_sockets - @cfg = cfg - @logger = cfg.logger - end + @logger = config.logger - # Start reading from the socket file, parsing entries, and flinging - # them at logstash. This method will return, with the operation - # continuing in a separate thread. - # - def run - @logger.debug("reader") { "#run called" } + @shutdown_reader, @shutdown_writer = IO.pipe - begin - socket = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM, 0) - socket.bind(Socket.pack_sockaddr_un(@file)) - File.chmod(0666, @file) - rescue Errno::EEXIST, Errno::EADDRINUSE - @logger.info("reader") { "socket file #{@file} already exists; deleting" } - File.unlink(@file) rescue nil - retry - rescue StandardError => ex - raise ex.class, "Error while trying to bind to #{@file}: #{ex.message}", ex.backtrace - end + super + end - @thread = Thread.new do - begin - loop do - msg = socket.recvmsg - @logger.debug("reader") { "Message received: #{msg.inspect}" } - @stats.received(@file) - relay_message msg.first - process_message msg.first.chomp - end - ensure - socket.close - @logger.debug("reader") { "removing socket file #{@file}" } - File.unlink(@file) rescue nil - end - end - end + # Start reading from the socket file, parsing entries, and flinging + # them at logstash. + # + def start + config.logger.debug(logloc) { "off we go!" } - private + begin + socket = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM, 0) + socket.bind(Socket.pack_sockaddr_un(config.syslog_socket)) + File.chmod(0666, config.syslog_socket) + rescue Errno::EEXIST, Errno::EADDRINUSE + config.logger.info(logloc) { "socket file #{config.syslog_socket} already exists; deleting" } + File.unlink(config.syslog_socket) rescue nil + retry + rescue StandardError => ex + raise ex.class, "Error while trying to bind to #{config.syslog_socket}: #{ex.message}", ex.backtrace + end - def process_message(msg) - if msg =~ /^<(\d+)>(\w{3} [ 0-9]{2} [0-9:]{8}) (.*)$/ - flags = $1.to_i - timestamp = $2 - content = $3 + begin + loop do + IO.select([@shutdown_reader, socket]).first.each do |fd| + if fd == socket + begin + msg = socket.recvmsg_nonblock + rescue IO::WaitWritable + config.logger.debug(logloc) { "select said a message was waiting, but it wasn't. o.O" } + else + config.logger.debug(logloc) { "Message received: #{msg.inspect}" } + @metrics.messages_received_total.increment(socket_path: config.syslog_socket) + @metrics.queue_size.increment({}) + relay_message msg.first + process_message msg.first.chomp + end + elsif fd == @shutdown_reader + @shutdown_reader.close + config.logger.debug(logloc) { "Tripped over shutdown reader" } + break + end + end + end + ensure + socket.close + config.logger.debug(logloc) { "removing socket file #{config.syslog_socket}" } + File.unlink(config.syslog_socket) rescue nil + end + end - # Lo! the many ways that syslog messages can be formatted - hostname, program, pid, message = case content - # the gold standard: hostname, program name with optional PID - when /^([a-zA-Z0-9._-]*[^:]) (\S+?)(\[(\d+)\])?: (.*)$/ - [$1, $2, $4, $5] - # hostname, no program name - when /^([a-zA-Z0-9._-]+) (\S+[^:] .*)$/ - [$1, nil, nil, $2] - # program name, no hostname (yeah, you heard me, non-RFC compliant!) - when /^(\S+?)(\[(\d+)\])?: (.*)$/ - [nil, $1, $3, $4] - else - # I have NFI - [nil, nil, nil, content] - end + def shutdown + @shutdown_writer.close + end - severity = flags % 8 - facility = flags / 8 + private - log_entry = log_entry( - syslog_timestamp: timestamp, - severity: severity, - facility: facility, - hostname: hostname, - program: program, - pid: pid.nil? ? nil : pid.to_i, - message: message, - ).to_json + attr_reader :config, :logger - @logstash.send_entry(log_entry) - else - @logger.warn("reader") { "Unparseable message: #{msg.inspect}" } - end - end + def process_message(msg) + if msg =~ /^<(\d+)>(\w{3} [ 0-9]{2} [0-9:]{8}) (.*)$/ + flags = $1.to_i + timestamp = $2 + content = $3 - def log_entry(h) - {}.tap do |e| - e['@version'] = '1' - e['@timestamp'] = Time.now.utc.strftime("%FT%T.%LZ") + # Lo! the many ways that syslog messages can be formatted + hostname, program, pid, message = case content + # the gold standard: hostname, program name with optional PID + when /^([a-zA-Z0-9._-]*[^:]) (\S+?)(\[(\d+)\])?: (.*)$/ + [$1, $2, $4, $5] + # hostname, no program name + when /^([a-zA-Z0-9._-]+) (\S+[^:] .*)$/ + [$1, nil, nil, $2] + # program name, no hostname (yeah, you heard me, non-RFC compliant!) + when /^(\S+?)(\[(\d+)\])?: (.*)$/ + [nil, $1, $3, $4] + else + # I have NFI + [nil, nil, nil, content] + end - h[:facility_name] = FACILITIES[h[:facility]] - h[:severity_name] = SEVERITIES[h[:severity]] + if config.drop_regex && message && message.match?(config.drop_regex) + @metrics.dropped_total.increment({}) + config.logger.debug(logloc) { "dropping message #{message}" } + return + end - e.merge!(h.delete_if { |k,v| v.nil? }) - e.merge!(@add_fields) + severity = flags % 8 + facility = flags / 8 - @logger.debug("reader") { "Complete log entry is: #{e.inspect}" } - end - end + log_entry = log_entry( + syslog_timestamp: timestamp, + severity: severity, + facility: facility, + hostname: hostname, + program: program, + pid: pid.nil? ? nil : pid.to_i, + message: message, + ) - def relay_message(msg) - @currently_failed ||= {} + @logstash.send_event(log_entry) + else + config.logger.warn(logloc) { "Unparseable message: #{msg.inspect}" } + end + end - if @cfg.relay_to_stdout - # This one's easy - puts msg.sub(/\A<\d+>/, '') - $stdout.flush - end + def log_entry(h) + {}.tap do |e| + e['@version'] = '1' + e['@timestamp'] = Time.now.utc.strftime("%FT%T.%LZ") - @relay_to.each do |f| - s = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM, 0) - begin - s.connect(Socket.pack_sockaddr_un(f)) - rescue Errno::ENOENT - # Socket doesn't exist; we don't care enough about this to bother - # reporting it. People will figure it out themselves soon enough. - rescue StandardError => ex - unless @currently_failed[f] - @logger.warn("reader") { "Error while connecting to relay socket #{f}: #{ex.message} (#{ex.class})" } - @currently_failed[f] = true - end - next - end + h[:facility_name] = FACILITIES[h[:facility]] + h[:severity_name] = SEVERITIES[h[:severity]] - begin - # We really, *really* don't want to block the world just because - # whoever's on the other end of the relay socket can't process - # messages quick enough. - s.sendmsg_nonblock(msg) - if @currently_failed[f] - @logger.info("reader") { "Error on socket #{f} has cleared; messages are being delivered again" } - @currently_failed[f] = false - end - rescue Errno::ENOTCONN - unless @currently_failed[f] - @logger.debug("reader") { "Nothing is listening on socket #{f}" } - @currently_failed[f] = true - end - rescue IO::EAGAINWaitWritable - unless @currently_failed[f] - @logger.warn("reader") { "Socket #{f} is currently backlogged; messages to this socket are now being discarded undelivered" } - @currently_failed[f] = true - end - rescue StandardError => ex - @logger.warn("reader") { (["Failed to relay message to socket #{f} from #{@file}: #{ex.message} (#{ex.class})"] + ex.backtrace).join("\n ") } - end - end - end + e.merge!(h.delete_if { |k,v| v.nil? }) + e.merge!(config.add_fields) - FACILITIES = %w{ - kern - user - mail - daemon - auth - syslog - lpr - news - uucp - cron - authpriv - ftp - local0 local1 local2 local3 local4 local5 local6 local7 - } + config.logger.debug(logloc) { "Complete log entry is: #{e.inspect}" } + end + end - SEVERITIES = %w{ - emerg - alert - crit - err - warning - notice - info - debug - } + def relay_message(msg) + @currently_failed ||= {} + + if config.relay_to_stdout + # This one's easy + puts msg.sub(/\A<\d+>/, '') + $stdout.flush + end + + config.relay_sockets.each do |f| + relay_to_socket(f) + end + end + + def relay_to_socket(f) + begin + s = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM, 0) + begin + s.connect(Socket.pack_sockaddr_un(f)) + rescue Errno::ENOENT + # Socket doesn't exist; we don't care enough about this to bother + # reporting it. People will figure it out themselves soon enough. + rescue StandardError => ex + unless @currently_failed[f] + config.logger.warn(logloc) { "Error while connecting to relay socket #{f}: #{ex.message} (#{ex.class})" } + @currently_failed[f] = true + end + return + end + + begin + # We really, *really* don't want to block the world just because + # whoever's on the other end of the relay socket can't process + # messages quick enough. + s.sendmsg_nonblock(msg) + if @currently_failed[f] + config.logger.info(logloc) { "Error on socket #{f} has cleared; messages are being delivered again" } + @currently_failed[f] = false + end + rescue Errno::ENOTCONN + unless @currently_failed[f] + config.logger.debug(logloc) { "Nothing is listening on socket #{f}" } + @currently_failed[f] = true + end + rescue IO::EAGAINWaitWritable + unless @currently_failed[f] + config.logger.warn(logloc) { "Socket #{f} is currently backlogged; messages to this socket are now being discarded undelivered" } + @currently_failed[f] = true + end + rescue StandardError => ex + config.logger.warn(logloc) { (["Failed to relay message to socket #{f} from #{config.syslog_socket}: #{ex.message} (#{ex.class})"] + ex.backtrace).join("\n ") } + end + ensure + s.close + end + end + + FACILITIES = %w{ + kern + user + mail + daemon + auth + syslog + lpr + news + uucp + cron + authpriv + ftp + local0 local1 local2 local3 local4 local5 local6 local7 + } + + SEVERITIES = %w{ + emerg + alert + crit + err + warning + notice + info + debug + } end