lib/syslogstash/syslog_reader.rb in syslogstash-1.3.0 vs lib/syslogstash/syslog_reader.rb in syslogstash-2.1.0
- old
+ new
@@ -1,66 +1,50 @@
-require_relative 'worker'
-
# A single socket reader.
#
class Syslogstash::SyslogReader
- include Syslogstash::Worker
+ attr_reader :thread
- attr_reader :file
+ def initialize(cfg, logstash, stats)
+ @file, @logstash, @stats = cfg.syslog_socket, logstash, stats
- def initialize(file, config, logstash, metrics)
- @file, @logstash, @metrics = file, logstash, metrics
- config ||= {}
-
- @add_fields = config['add_fields'] || {}
- @relay_to = config['relay_to'] || []
-
- unless @add_fields.is_a? Hash
- raise ArgumentError,
- "add_fields parameter to socket #{file} must be a hash"
- end
-
- unless @relay_to.is_a? Array
- raise ArgumentError,
- "relay_to parameter to socket #{file} must be an array"
- end
-
- log { "initialized syslog socket #{file} with config #{config.inspect}" }
+ @add_fields = cfg.add_fields
+ @relay_to = cfg.relay_sockets
+ @cfg = cfg
+ @logger = cfg.logger
end
# Start reading from the socket file, parsing entries, and flinging
# them at logstash. This method will return, with the operation
# continuing in a separate thread.
#
def run
- debug { "#run called" }
+ @logger.debug("reader") { "#run called" }
begin
socket = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM, 0)
socket.bind(Socket.pack_sockaddr_un(@file))
File.chmod(0666, @file)
rescue Errno::EEXIST, Errno::EADDRINUSE
- log { "socket file #{@file} already exists; deleting" }
+ @logger.info("reader") { "socket file #{@file} already exists; deleting" }
File.unlink(@file) rescue nil
retry
- rescue SystemCallError
- log { "Error while trying to bind to #{@file}" }
- raise
+ rescue StandardError => ex
+ raise ex.class, "Error while trying to bind to #{@file}: #{ex.message}", ex.backtrace
end
- @worker = Thread.new do
+ @thread = Thread.new do
begin
loop do
msg = socket.recvmsg
- debug { "Message received: #{msg.inspect}" }
- @metrics.received(@file, Time.now)
- process_message msg.first.chomp
+ @logger.debug("reader") { "Message received: #{msg.inspect}" }
+ @stats.received(@file)
relay_message msg.first
+ process_message msg.first.chomp
end
ensure
socket.close
- log { "removing socket file #{@file}" }
+ @logger.debug("reader") { "removing socket file #{@file}" }
File.unlink(@file) rescue nil
end
end
end
@@ -101,11 +85,11 @@
message: message,
).to_json
@logstash.send_entry(log_entry)
else
- log { "Unparseable message: #{msg}" }
+ @logger.warn("reader") { "Unparseable message: #{msg.inspect}" }
end
end
def log_entry(h)
{}.tap do |e|
@@ -116,43 +100,58 @@
h[:severity_name] = SEVERITIES[h[:severity]]
e.merge!(h.delete_if { |k,v| v.nil? })
e.merge!(@add_fields)
- debug { "Log entry is: #{e.inspect}" }
+ @logger.debug("reader") { "Complete log entry is: #{e.inspect}" }
end
end
def relay_message(msg)
@currently_failed ||= {}
+ if @cfg.relay_to_stdout
+ # This one's easy
+ puts msg.sub(/\A<\d+>/, '')
+ $stdout.flush
+ end
+
@relay_to.each do |f|
s = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM, 0)
begin
s.connect(Socket.pack_sockaddr_un(f))
rescue Errno::ENOENT
# Socket doesn't exist; we don't care enough about this to bother
# reporting it. People will figure it out themselves soon enough.
rescue StandardError => ex
- log { "Error while connecting to relay socket #{f}: #{ex.message} (#{ex.class})" }
+ unless @currently_failed[f]
+ @logger.warn("reader") { "Error while connecting to relay socket #{f}: #{ex.message} (#{ex.class})" }
+ @currently_failed[f] = true
+ end
next
end
begin
+ # We really, *really* don't want to block the world just because
+ # whoever's on the other end of the relay socket can't process
+ # messages quick enough.
s.sendmsg_nonblock(msg)
if @currently_failed[f]
- log { "Backlog on socket #{f} has cleared; messages are being delivered again" }
+ @logger.info("reader") { "Error on socket #{f} has cleared; messages are being delivered again" }
@currently_failed[f] = false
end
rescue Errno::ENOTCONN
- # Socket isn't being listened to. Not our problem.
+ unless @currently_failed[f]
+ @logger.debug("reader") { "Nothing is listening on socket #{f}" }
+ @currently_failed[f] = true
+ end
rescue IO::EAGAINWaitWritable
unless @currently_failed[f]
- log { "Socket #{f} is backlogged; messages to this socket from socket #{@file} are being discarded undelivered" }
+ @logger.warn("reader") { "Socket #{f} is currently backlogged; messages to this socket are now being discarded undelivered" }
@currently_failed[f] = true
end
rescue StandardError => ex
- log { "Failed to relay message to socket #{f} from #{@file}: #{ex.message} (#{ex.class})" }
+ @logger.warn("reader") { (["Failed to relay message to socket #{f} from #{@file}: #{ex.message} (#{ex.class})"] + ex.backtrace).join("\n ") }
end
end
end
FACILITIES = %w{