lib/remote_syslog/reader.rb in remote_syslog-1.0.0 vs lib/remote_syslog/reader.rb in remote_syslog-1.1.0
- old
+ new
@@ -1,57 +1,69 @@
+require 'eventmachine'
+require 'eventmachine-tail'
+require 'em-dns-resolver'
+require 'syslog_protocol'
+
module RemoteSyslog
class Reader < EventMachine::FileTail
- def initialize(path, dest_addr, dest_port, options = {})
- @dest_addr = dest_addr
- @dest_port = dest_port.to_i
-
+ COLORED_REGEXP = /\e\[(?:(?:[34][0-7]|[0-9]);){0,2}(?:[34][0-7]|[0-9])m/
+
+ def initialize(path, destination_address, destination_port, options = {})
+ super(path, -1)
+
+ @destination_address = destination_address
+ @destination_port = destination_port.to_i
+
+ @strip_color = options[:strip_color]
+
@socket = options[:socket] || EventMachine.open_datagram_socket('0.0.0.0', 0)
- @program = options[:program] || File.basename(path) || 'remote_syslog'
- @hostname = options[:hostname] || `hostname`.strip
- @hostname = 'localhost' unless @hostname && @hostname != ''
-
- if options[:severity]
- @severity = severity_value(options[:severity]) || raise(ArgumentError, "Invalid severity: #{options[:severity]} (valid: #{severities.keys.join(', ')})")
- else
- @severity = severity_value(:notice)
+
+ @buffer = BufferedTokenizer.new
+
+ @packet = SyslogProtocol::Packet.new
+
+ local_hostname = options[:hostname] || (Socket.gethostname rescue `hostname`.chomp)
+ if local_hostname.nil? || local_hostname.empty?
+ local_hostname = 'localhost'
end
-
- if options[:facility]
- @facility = facility_value(options[:facility]) || raise(ArgumentError, "Invalid facility: #{options[:facility]} (valid: #{facilities.keys.join(', ')}")
- else
- @facility = facility_value(:user)
+
+ @packet.hostname = local_hostname
+ @packet.facility = options[:facility] || 'user'
+ @packet.severity = options[:severity] || 'notice'
+ @packet.tag = options[:program] || File.basename(path) || File.basename($0)
+
+ # Try to resolve the destination address
+ resolve_destination_address
+
+ # Every 60 seconds we'll see if the address has changed
+ EventMachine.add_periodic_timer(60) do
+ resolve_destination_address
end
+ end
- super(path, -1)
- @buffer = BufferedTokenizer.new
+ def resolve_destination_address
+ request = EventMachine::DnsResolver.resolve(@destination_address)
+ request.callback do |addrs|
+ @cached_destination_ip = addrs.first
+ end
end
-
+
def receive_data(data)
@buffer.extract(data).each do |line|
transmit(line)
end
end
+ def destination_address
+ @cached_destination_ip || @destination_address
+ end
+
def transmit(message)
- time ||= Time.now
- day = time.strftime('%b %d').sub(/0(\d)/, ' \\1')
+ message = message.gsub(COLORED_REGEXP, '') if @strip_color
- @socket.send_datagram("<#{(@facility) + @severity}>#{day} #{time.strftime('%T')} #{@hostname} #{@program}: #{message}", @dest_addr, @dest_port)
- end
-
- def facility_value(f)
- f.is_a?(Integer) ? f*8 : facilities[f.to_sym]
- end
-
- def severity_value(s)
- s.is_a?(Integer) ? s : severities[s.to_sym]
- end
-
- def facilities
- Levels::FACILITIES
- end
-
- def severities
- Levels::SEVERITIES
+ packet = @packet.dup
+ packet.content = message
+
+ @socket.send_datagram(packet.assemble, destination_address, @destination_port)
end
end
end