lib/syslogstash/syslog_reader.rb in syslogstash-2.2.0 vs lib/syslogstash/syslog_reader.rb in syslogstash-3.0.0
- old
+ new
@@ -1,184 +1,216 @@
# A single socket reader.
#
class Syslogstash::SyslogReader
- attr_reader :thread
+ include ServiceSkeleton::BackgroundWorker
- def initialize(cfg, logstash, stats)
- @file, @logstash, @stats = cfg.syslog_socket, logstash, stats
+ def initialize(config, logstash, metrics)
+ @config, @logstash, @metrics = config, logstash, metrics
- @add_fields = cfg.add_fields
- @relay_to = cfg.relay_sockets
- @cfg = cfg
- @logger = cfg.logger
- end
+ @logger = config.logger
- # Start reading from the socket file, parsing entries, and flinging
- # them at logstash. This method will return, with the operation
- # continuing in a separate thread.
- #
- def run
- @logger.debug("reader") { "#run called" }
+ @shutdown_reader, @shutdown_writer = IO.pipe
- begin
- socket = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM, 0)
- socket.bind(Socket.pack_sockaddr_un(@file))
- File.chmod(0666, @file)
- rescue Errno::EEXIST, Errno::EADDRINUSE
- @logger.info("reader") { "socket file #{@file} already exists; deleting" }
- File.unlink(@file) rescue nil
- retry
- rescue StandardError => ex
- raise ex.class, "Error while trying to bind to #{@file}: #{ex.message}", ex.backtrace
- end
+ super
+ end
- @thread = Thread.new do
- begin
- loop do
- msg = socket.recvmsg
- @logger.debug("reader") { "Message received: #{msg.inspect}" }
- @stats.received(@file)
- relay_message msg.first
- process_message msg.first.chomp
- end
- ensure
- socket.close
- @logger.debug("reader") { "removing socket file #{@file}" }
- File.unlink(@file) rescue nil
- end
- end
- end
+ # Start reading from the socket file, parsing entries, and flinging
+ # them at logstash.
+ #
+ def start
+ config.logger.debug(logloc) { "off we go!" }
- private
+ begin
+ socket = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM, 0)
+ socket.bind(Socket.pack_sockaddr_un(config.syslog_socket))
+ File.chmod(0666, config.syslog_socket)
+ rescue Errno::EEXIST, Errno::EADDRINUSE
+ config.logger.info(logloc) { "socket file #{config.syslog_socket} already exists; deleting" }
+ File.unlink(config.syslog_socket) rescue nil
+ retry
+ rescue StandardError => ex
+ raise ex.class, "Error while trying to bind to #{config.syslog_socket}: #{ex.message}", ex.backtrace
+ end
- def process_message(msg)
- if msg =~ /^<(\d+)>(\w{3} [ 0-9]{2} [0-9:]{8}) (.*)$/
- flags = $1.to_i
- timestamp = $2
- content = $3
+ begin
+ loop do
+ IO.select([@shutdown_reader, socket]).first.each do |fd|
+ if fd == socket
+ begin
+ msg = socket.recvmsg_nonblock
+ rescue IO::WaitWritable
+ config.logger.debug(logloc) { "select said a message was waiting, but it wasn't. o.O" }
+ else
+ config.logger.debug(logloc) { "Message received: #{msg.inspect}" }
+ @metrics.messages_received_total.increment(socket_path: config.syslog_socket)
+ @metrics.queue_size.increment({})
+ relay_message msg.first
+ process_message msg.first.chomp
+ end
+ elsif fd == @shutdown_reader
+ @shutdown_reader.close
+ config.logger.debug(logloc) { "Tripped over shutdown reader" }
+ break
+ end
+ end
+ end
+ ensure
+ socket.close
+ config.logger.debug(logloc) { "removing socket file #{config.syslog_socket}" }
+ File.unlink(config.syslog_socket) rescue nil
+ end
+ end
- # Lo! the many ways that syslog messages can be formatted
- hostname, program, pid, message = case content
- # the gold standard: hostname, program name with optional PID
- when /^([a-zA-Z0-9._-]*[^:]) (\S+?)(\[(\d+)\])?: (.*)$/
- [$1, $2, $4, $5]
- # hostname, no program name
- when /^([a-zA-Z0-9._-]+) (\S+[^:] .*)$/
- [$1, nil, nil, $2]
- # program name, no hostname (yeah, you heard me, non-RFC compliant!)
- when /^(\S+?)(\[(\d+)\])?: (.*)$/
- [nil, $1, $3, $4]
- else
- # I have NFI
- [nil, nil, nil, content]
- end
+ def shutdown
+ @shutdown_writer.close
+ end
- severity = flags % 8
- facility = flags / 8
+ private
- log_entry = log_entry(
- syslog_timestamp: timestamp,
- severity: severity,
- facility: facility,
- hostname: hostname,
- program: program,
- pid: pid.nil? ? nil : pid.to_i,
- message: message,
- ).to_json
+ attr_reader :config, :logger
- @logstash.send_entry(log_entry)
- else
- @logger.warn("reader") { "Unparseable message: #{msg.inspect}" }
- end
- end
+ def process_message(msg)
+ if msg =~ /^<(\d+)>(\w{3} [ 0-9]{2} [0-9:]{8}) (.*)$/
+ flags = $1.to_i
+ timestamp = $2
+ content = $3
- def log_entry(h)
- {}.tap do |e|
- e['@version'] = '1'
- e['@timestamp'] = Time.now.utc.strftime("%FT%T.%LZ")
+ # Lo! the many ways that syslog messages can be formatted
+ hostname, program, pid, message = case content
+ # the gold standard: hostname, program name with optional PID
+ when /^([a-zA-Z0-9._-]*[^:]) (\S+?)(\[(\d+)\])?: (.*)$/
+ [$1, $2, $4, $5]
+ # hostname, no program name
+ when /^([a-zA-Z0-9._-]+) (\S+[^:] .*)$/
+ [$1, nil, nil, $2]
+ # program name, no hostname (yeah, you heard me, non-RFC compliant!)
+ when /^(\S+?)(\[(\d+)\])?: (.*)$/
+ [nil, $1, $3, $4]
+ else
+ # I have NFI
+ [nil, nil, nil, content]
+ end
- h[:facility_name] = FACILITIES[h[:facility]]
- h[:severity_name] = SEVERITIES[h[:severity]]
+ if config.drop_regex && message && message.match?(config.drop_regex)
+ @metrics.dropped_total.increment({})
+ config.logger.debug(logloc) { "dropping message #{message}" }
+ return
+ end
- e.merge!(h.delete_if { |k,v| v.nil? })
- e.merge!(@add_fields)
+ severity = flags % 8
+ facility = flags / 8
- @logger.debug("reader") { "Complete log entry is: #{e.inspect}" }
- end
- end
+ log_entry = log_entry(
+ syslog_timestamp: timestamp,
+ severity: severity,
+ facility: facility,
+ hostname: hostname,
+ program: program,
+ pid: pid.nil? ? nil : pid.to_i,
+ message: message,
+ )
- def relay_message(msg)
- @currently_failed ||= {}
+ @logstash.send_event(log_entry)
+ else
+ config.logger.warn(logloc) { "Unparseable message: #{msg.inspect}" }
+ end
+ end
- if @cfg.relay_to_stdout
- # This one's easy
- puts msg.sub(/\A<\d+>/, '')
- $stdout.flush
- end
+ def log_entry(h)
+ {}.tap do |e|
+ e['@version'] = '1'
+ e['@timestamp'] = Time.now.utc.strftime("%FT%T.%LZ")
- @relay_to.each do |f|
- s = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM, 0)
- begin
- s.connect(Socket.pack_sockaddr_un(f))
- rescue Errno::ENOENT
- # Socket doesn't exist; we don't care enough about this to bother
- # reporting it. People will figure it out themselves soon enough.
- rescue StandardError => ex
- unless @currently_failed[f]
- @logger.warn("reader") { "Error while connecting to relay socket #{f}: #{ex.message} (#{ex.class})" }
- @currently_failed[f] = true
- end
- next
- end
+ h[:facility_name] = FACILITIES[h[:facility]]
+ h[:severity_name] = SEVERITIES[h[:severity]]
- begin
- # We really, *really* don't want to block the world just because
- # whoever's on the other end of the relay socket can't process
- # messages quick enough.
- s.sendmsg_nonblock(msg)
- if @currently_failed[f]
- @logger.info("reader") { "Error on socket #{f} has cleared; messages are being delivered again" }
- @currently_failed[f] = false
- end
- rescue Errno::ENOTCONN
- unless @currently_failed[f]
- @logger.debug("reader") { "Nothing is listening on socket #{f}" }
- @currently_failed[f] = true
- end
- rescue IO::EAGAINWaitWritable
- unless @currently_failed[f]
- @logger.warn("reader") { "Socket #{f} is currently backlogged; messages to this socket are now being discarded undelivered" }
- @currently_failed[f] = true
- end
- rescue StandardError => ex
- @logger.warn("reader") { (["Failed to relay message to socket #{f} from #{@file}: #{ex.message} (#{ex.class})"] + ex.backtrace).join("\n ") }
- end
- end
- end
+ e.merge!(h.delete_if { |k,v| v.nil? })
+ e.merge!(config.add_fields)
- FACILITIES = %w{
- kern
- user
- mail
- daemon
- auth
- syslog
- lpr
- news
- uucp
- cron
- authpriv
- ftp
- local0 local1 local2 local3 local4 local5 local6 local7
- }
+ config.logger.debug(logloc) { "Complete log entry is: #{e.inspect}" }
+ end
+ end
- SEVERITIES = %w{
- emerg
- alert
- crit
- err
- warning
- notice
- info
- debug
- }
+ def relay_message(msg)
+ @currently_failed ||= {}
+
+ if config.relay_to_stdout
+ # This one's easy
+ puts msg.sub(/\A<\d+>/, '')
+ $stdout.flush
+ end
+
+ config.relay_sockets.each do |f|
+ relay_to_socket(f)
+ end
+ end
+
+ def relay_to_socket(f)
+ begin
+ s = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM, 0)
+ begin
+ s.connect(Socket.pack_sockaddr_un(f))
+ rescue Errno::ENOENT
+ # Socket doesn't exist; we don't care enough about this to bother
+ # reporting it. People will figure it out themselves soon enough.
+ rescue StandardError => ex
+ unless @currently_failed[f]
+ config.logger.warn(logloc) { "Error while connecting to relay socket #{f}: #{ex.message} (#{ex.class})" }
+ @currently_failed[f] = true
+ end
+ return
+ end
+
+ begin
+ # We really, *really* don't want to block the world just because
+ # whoever's on the other end of the relay socket can't process
+ # messages quick enough.
+ s.sendmsg_nonblock(msg)
+ if @currently_failed[f]
+ config.logger.info(logloc) { "Error on socket #{f} has cleared; messages are being delivered again" }
+ @currently_failed[f] = false
+ end
+ rescue Errno::ENOTCONN
+ unless @currently_failed[f]
+ config.logger.debug(logloc) { "Nothing is listening on socket #{f}" }
+ @currently_failed[f] = true
+ end
+ rescue IO::EAGAINWaitWritable
+ unless @currently_failed[f]
+ config.logger.warn(logloc) { "Socket #{f} is currently backlogged; messages to this socket are now being discarded undelivered" }
+ @currently_failed[f] = true
+ end
+ rescue StandardError => ex
+ config.logger.warn(logloc) { (["Failed to relay message to socket #{f} from #{config.syslog_socket}: #{ex.message} (#{ex.class})"] + ex.backtrace).join("\n ") }
+ end
+ ensure
+ s.close
+ end
+ end
+
+ FACILITIES = %w{
+ kern
+ user
+ mail
+ daemon
+ auth
+ syslog
+ lpr
+ news
+ uucp
+ cron
+ authpriv
+ ftp
+ local0 local1 local2 local3 local4 local5 local6 local7
+ }
+
+ SEVERITIES = %w{
+ emerg
+ alert
+ crit
+ err
+ warning
+ notice
+ info
+ debug
+ }
end