lib/remote_syslog/reader.rb in remote_syslog-1.0.0 vs lib/remote_syslog/reader.rb in remote_syslog-1.1.0

- old
+ new

@@ -1,57 +1,69 @@ +require 'eventmachine' +require 'eventmachine-tail' +require 'em-dns-resolver' +require 'syslog_protocol' + module RemoteSyslog class Reader < EventMachine::FileTail - def initialize(path, dest_addr, dest_port, options = {}) - @dest_addr = dest_addr - @dest_port = dest_port.to_i - + COLORED_REGEXP = /\e\[(?:(?:[34][0-7]|[0-9]);){0,2}(?:[34][0-7]|[0-9])m/ + + def initialize(path, destination_address, destination_port, options = {}) + super(path, -1) + + @destination_address = destination_address + @destination_port = destination_port.to_i + + @strip_color = options[:strip_color] + @socket = options[:socket] || EventMachine.open_datagram_socket('0.0.0.0', 0) - @program = options[:program] || File.basename(path) || 'remote_syslog' - @hostname = options[:hostname] || `hostname`.strip - @hostname = 'localhost' unless @hostname && @hostname != '' - - if options[:severity] - @severity = severity_value(options[:severity]) || raise(ArgumentError, "Invalid severity: #{options[:severity]} (valid: #{severities.keys.join(', ')})") - else - @severity = severity_value(:notice) + + @buffer = BufferedTokenizer.new + + @packet = SyslogProtocol::Packet.new + + local_hostname = options[:hostname] || (Socket.gethostname rescue `hostname`.chomp) + if local_hostname.nil? || local_hostname.empty? + local_hostname = 'localhost' end - - if options[:facility] - @facility = facility_value(options[:facility]) || raise(ArgumentError, "Invalid facility: #{options[:facility]} (valid: #{facilities.keys.join(', ')}") - else - @facility = facility_value(:user) + + @packet.hostname = local_hostname + @packet.facility = options[:facility] || 'user' + @packet.severity = options[:severity] || 'notice' + @packet.tag = options[:program] || File.basename(path) || File.basename($0) + + # Try to resolve the destination address + resolve_destination_address + + # Every 60 seconds we'll see if the address has changed + EventMachine.add_periodic_timer(60) do + resolve_destination_address end + end - super(path, -1) - @buffer = BufferedTokenizer.new + def resolve_destination_address + request = EventMachine::DnsResolver.resolve(@destination_address) + request.callback do |addrs| + @cached_destination_ip = addrs.first + end end - + def receive_data(data) @buffer.extract(data).each do |line| transmit(line) end end + def destination_address + @cached_destination_ip || @destination_address + end + def transmit(message) - time ||= Time.now - day = time.strftime('%b %d').sub(/0(\d)/, ' \\1') + message = message.gsub(COLORED_REGEXP, '') if @strip_color - @socket.send_datagram("<#{(@facility) + @severity}>#{day} #{time.strftime('%T')} #{@hostname} #{@program}: #{message}", @dest_addr, @dest_port) - end - - def facility_value(f) - f.is_a?(Integer) ? f*8 : facilities[f.to_sym] - end - - def severity_value(s) - s.is_a?(Integer) ? s : severities[s.to_sym] - end - - def facilities - Levels::FACILITIES - end - - def severities - Levels::SEVERITIES + packet = @packet.dup + packet.content = message + + @socket.send_datagram(packet.assemble, destination_address, @destination_port) end end end