lib/syslogstash/syslog_reader.rb in syslogstash-0.4.1 vs lib/syslogstash/syslog_reader.rb in syslogstash-1.0.0
- old
+ new
@@ -5,14 +5,28 @@
class Syslogstash::SyslogReader
include Syslogstash::Worker
attr_reader :file
- def initialize(file, tags, logstash, metrics)
- @file, @tags, @logstash, @metrics = file, tags, logstash, metrics
+ def initialize(file, config, logstash, metrics)
+ @file, @logstash, @metrics = file, logstash, metrics
+ config ||= {}
- log { "initializing syslog socket #{file} with tags #{tags.inspect}" }
+ @add_fields = config['add_fields'] || {}
+ @relay_to = config['relay_to'] || []
+
+ unless @add_fields.is_a? Hash
+ raise ArgumentError,
+ "add_fields parameter to socket #{file} must be a hash"
+ end
+
+ unless @relay_to.is_a? Array
+ raise ArgumentError,
+ "relay_to parameter to socket #{file} must be an array"
+ end
+
+ log { "initialized syslog socket #{file} with config #{config.inspect}" }
end
# Start reading from the socket file, parsing entries, and flinging
# them at logstash. This method will return, with the operation
# continuing in a separate thread.
@@ -26,21 +40,22 @@
rescue Errno::EEXIST, Errno::EADDRINUSE
log { "socket file #{@file} already exists; deleting" }
File.unlink(@file) rescue nil
retry
rescue SystemCallError
- $stderr.puts "Error while trying to bind to #{@file}"
+ log { "Error while trying to bind to #{@file}" }
raise
end
@worker = Thread.new do
begin
loop do
msg = socket.recvmsg
debug { "Message received: #{msg.inspect}" }
@metrics.received(@file, Time.now)
process_message msg.first.chomp
+ relay_message msg.first
end
ensure
socket.close
log { "removing socket file #{@file}" }
File.unlink(@file) rescue nil
@@ -85,11 +100,11 @@
message: message,
).to_json
@logstash.send_entry(log_entry)
else
- $stderr.puts "Unparseable message: #{msg}"
+ log { "Unparseable message: #{msg}" }
end
end
def log_entry(h)
{}.tap do |e|
@@ -98,13 +113,46 @@
h[:facility_name] = FACILITIES[h[:facility]]
h[:severity_name] = SEVERITIES[h[:severity]]
e.merge!(h.delete_if { |k,v| v.nil? })
+ e.merge!(@add_fields)
- e.merge!(@tags) if @tags.is_a? Hash
-
debug { "Log entry is: #{e.inspect}" }
+ end
+ end
+
+ def relay_message(msg)
+ @currently_failed ||= {}
+
+ @relay_to.each do |f|
+ s = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM, 0)
+ begin
+ s.connect(Socket.pack_sockaddr_un(f))
+ rescue Errno::ENOENT
+ # Socket doesn't exist; we don't care enough about this to bother
+ # reporting it. People will figure it out themselves soon enough.
+ rescue StandardError => ex
+ log { "Error while connecting to relay socket #{f}: #{ex.message} (#{ex.class})" }
+ next
+ end
+
+ begin
+ s.sendmsg_nonblock(msg)
+ if @currently_failed[f]
+ log { "Backlog on socket #{f} has cleared; messages are being delivered again" }
+ @currently_failed[f] = false
+ end
+ rescue Errno::ENOTCONN
+ # Socket isn't being listened to. Not our problem.
+ rescue IO::EAGAINWaitWritable
+ unless @currently_failed[f]
+ log { "Socket #{f} is backlogged; messages to this socket from socket #{@file} are being discarded undelivered" }
+ @currently_failed = true
+ end
+ rescue StandardError => ex
+ log { "Failed to relay message to socket #{f} from #{@file}: #{ex.message} (#{ex.class})" }
+ end
end
end
FACILITIES = %w{
kern