lib/instrumental/agent.rb in instrumental_agent-0.13.1 vs lib/instrumental/agent.rb in instrumental_agent-0.13.2

- old
+ new

@@ -1,25 +1,31 @@ require 'instrumental/version' require 'instrumental/system_timer' require 'logger' +require 'openssl' rescue nil +require 'resolv' require 'thread' require 'socket' module Instrumental class Agent - BACKOFF = 2.0 - MAX_RECONNECT_DELAY = 15 - MAX_BUFFER = 5000 - REPLY_TIMEOUT = 10 - CONNECT_TIMEOUT = 20 - EXIT_FLUSH_TIMEOUT = 5 - HOSTNAME = Socket.gethostbyname(Socket.gethostname).first rescue Socket.gethostname + BACKOFF = 2.0 + CONNECT_TIMEOUT = 20 + EXIT_FLUSH_TIMEOUT = 5 + HOSTNAME = Socket.gethostbyname(Socket.gethostname).first rescue Socket.gethostname + MAX_BUFFER = 5000 + MAX_RECONNECT_DELAY = 15 + REPLY_TIMEOUT = 10 + RESOLUTION_FAILURES_BEFORE_WAITING = 3 + RESOLUTION_WAIT = 30 + RESOLVE_TIMEOUT = 1 - attr_accessor :host, :port, :synchronous, :queue - attr_reader :connection, :enabled + attr_accessor :host, :port, :synchronous, :queue, :dns_resolutions, :last_connect_at + attr_reader :connection, :enabled, :secure + def self.logger=(l) @logger = l end def self.logger @@ -40,21 +46,39 @@ options.inject({}) { |m, (k, v)| m[(k.to_sym rescue k) || k] = v; m } ) # defaults # host: collector.instrumentalapp.com - # port: 8000 + # port: 8001 # enabled: true # synchronous: false + # secure: true + # verify: true @api_key = api_key @host, @port = options[:collector].to_s.split(':') @host ||= 'collector.instrumentalapp.com' - @port = (@port || 8000).to_i + requested_secure = options[:secure] == true + desired_secure = options[:secure].nil? ? allows_secure? : !!options[:secure] + if !allows_secure? && desired_secure + logger.warn "Cannot connect to Instrumental via encrypted transport, SSL not available" + if requested_secure + options[:enabled] = false + logger.error "You requested secure protocol to connect to Instrumental, but it is not available on this system (OpenSSL is not defined). Connecting to Instrumental has been disabled." + end + desired_secure = false + end + @secure = desired_secure + @verify_cert = options[:verify_cert].nil? ? true : !!options[:verify_cert] + default_port = @secure ? 8001 : 8000 + @port = (@port || default_port).to_i @enabled = options.has_key?(:enabled) ? !!options[:enabled] : true @synchronous = !!options[:synchronous] @pid = Process.pid @allow_reconnect = true + @certs = certificates + @dns_resolutions = 0 + @last_connect_at = 0 setup_cleanup_at_exit if @enabled end # Store a gauge for a metric, optionally at a specific time. @@ -177,10 +201,13 @@ disconnect if @thread @thread.kill @thread = nil end + if @queue + @queue.clear + end end # Called when a process is exiting to give it some extra time to # push events to the service. An at_exit handler is automatically # registered for this method, but can be called manually in cases @@ -237,26 +264,33 @@ def report_exception(e) logger.error "Exception occurred: #{e.message}\n#{e.backtrace.join("\n")}" end - def ipv4_address_for_host(host, port) - addresses = Socket.getaddrinfo(host, port, 'AF_INET') - if (result = addresses.first) - _, _, _, address, _ = result - address - else - raise Exception.new("Couldn't get address information for host #{host}") + def ipv4_address_for_host(host, port, moment_to_connect = Time.now.to_i) + self.dns_resolutions = dns_resolutions + 1 + time_since_last_connect = moment_to_connect - last_connect_at + if dns_resolutions < RESOLUTION_FAILURES_BEFORE_WAITING || time_since_last_connect >= RESOLUTION_WAIT + self.last_connect_at = moment_to_connect + with_timeout(RESOLVE_TIMEOUT) do + address = Resolv.getaddresses(host).select { |address| address =~ Resolv::IPv4::Regex }.first + self.dns_resolutions = 0 + address + end end + rescue Exception => e + logger.warn "Couldn't resolve address for #{host}:#{port}" + report_exception(e) + nil end def send_command(cmd, *args) cmd = "%s %s\n" % [cmd, args.collect { |a| a.to_s }.join(" ")] if enabled? - start_connection_worker if !running? - if @queue.size < MAX_BUFFER + start_connection_worker if !running? + if @queue && @queue.size < MAX_BUFFER @queue_full_warning = false logger.debug "Queueing: #{cmd.chomp}" queue_message(cmd, { :synchronous => @synchronous }) else if !@queue_full_warning @@ -289,28 +323,63 @@ end end message end + def wait_exceptions + classes = [Errno::EAGAIN] + if defined?(IO::EAGAINWaitReadable) + classes << IO::EAGAINWaitReadable + end + if defined?(IO::EWOULDBLOCKWaitReadable) + classes << IO::EWOULDBLOCKWaitReadable + end + if defined?(IO::WaitReadable) + classes << IO::WaitReadable + end + classes + end + + def test_connection begin - @socket.read_nonblock(1) - rescue Errno::EAGAIN + # In the case where the socket is an OpenSSL::SSL::SSLSocket, + # on Ruby 1.8.6, 1.8.7 or 1.9.1, read_nonblock does not exist, + # and so the case of testing socket liveliness via a nonblocking + # read that catches a wait condition won't work. + # + # We grab the SSL socket's underlying IO object and perform the + # non blocking read there in order to ensure the socket is still + # valid + if @socket.respond_to?(:read_nonblock) + @socket.read_nonblock(1) + elsif @socket.respond_to?(:io) + # The SSL Socket may send down additional data at close time, + # so we perform two nonblocking reads, one to pull any pending + # data on the socket, and the second to actually perform the connection + # liveliness test + @socket.io.read_nonblock(1024) && @socket.io.read_nonblock(1024) + end + rescue *wait_exceptions # noop end end def start_connection_worker if enabled? disconnect - @pid = Process.pid - @queue = Queue.new - @sync_mutex = Mutex.new - @failures = 0 - logger.info "Starting thread" - @thread = Thread.new do - run_worker_loop + @queue ||= Queue.new + address = ipv4_address_for_host(@host, @port) + if address + @pid = Process.pid + @sync_mutex = Mutex.new + @failures = 0 + @sockaddr_in = Socket.pack_sockaddr_in(@port, address) + logger.info "Starting thread" + @thread = Thread.new do + run_worker_loop + end end end end def send_with_reply_timeout(message) @@ -321,19 +390,34 @@ raise "Bad Response #{response.inspect} to #{message.inspect}" end end end + def open_socket(sockaddr_in, secure, verify_cert) + sock = Socket.new(Socket::PF_INET, Socket::SOCK_STREAM, 0) + sock.connect(sockaddr_in) + if secure + context = OpenSSL::SSL::SSLContext.new + if verify_cert + context.set_params(:verify_mode => OpenSSL::SSL::VERIFY_PEER | OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT) + else + context.set_params(:verify_mode => OpenSSL::SSL::VERIFY_NONE) + end + ssl_socket = OpenSSL::SSL::SSLSocket.new(sock, context) + ssl_socket.sync_close = true + ssl_socket.connect + sock = ssl_socket + end + sock + end - def run_worker_loop command_and_args = nil command_options = nil logger.info "connecting to collector" - @socket = Socket.new(Socket::PF_INET, Socket::SOCK_STREAM, 0) with_timeout(CONNECT_TIMEOUT) do - @socket.connect Socket.pack_sockaddr_in(port, ipv4_address_for_host(host, port)) + @socket = open_socket(@sockaddr_in, @secure, @verify_cert) end logger.info "connected to collector at #{host}:#{port}" hello_options = { "version" => "ruby/instrumental_agent/#{VERSION}", "hostname" => HOSTNAME, @@ -345,41 +429,50 @@ send_with_reply_timeout "hello #{hello_options}" send_with_reply_timeout "authenticate #{@api_key}" @failures = 0 loop do command_and_args, command_options = @queue.pop - sync_resource = command_options && command_options[:sync_resource] - test_connection - case command_and_args - when 'exit' - logger.info "Exiting, #{@queue.size} commands remain" - return true - when 'flush' - release_resource = true - else - logger.debug "Sending: #{command_and_args.chomp}" - @socket.puts command_and_args - end - command_and_args = nil - command_options = nil - if sync_resource - @sync_mutex.synchronize do - sync_resource.signal + if command_and_args + sync_resource = command_options && command_options[:sync_resource] + test_connection + case command_and_args + when 'exit' + logger.info "Exiting, #{@queue.size} commands remain" + return true + when 'flush' + release_resource = true + else + logger.debug "Sending: #{command_and_args.chomp}" + @socket.puts command_and_args end + command_and_args = nil + command_options = nil + if sync_resource + @sync_mutex.synchronize do + sync_resource.signal + end + end end end rescue Exception => err - if err.is_a?(EOFError) + allow_reconnect = @allow_reconnect + case err + when EOFError # nop - elsif err.is_a?(Errno::ECONNREFUSED) - logger.error "unable to connect to Instrumental." + when Errno::ECONNREFUSED, Errno::EHOSTUNREACH, Errno::EADDRINUSE + # If the connection has been refused by Instrumental + # or we cannot reach the server + # or the connection state of this socket is in a race + logger.error "unable to connect to Instrumental, hanging up with #{@queue.size} messages remaining" + allow_reconnect = false else report_exception(err) end - if @allow_reconnect == false || + if allow_reconnect == false || (command_options && command_options[:allow_reconnect] == false) logger.info "Not trying to reconnect" + @failures = 0 return end if command_and_args logger.debug "requeueing: #{command_and_args}" @queue << command_and_args @@ -399,23 +492,44 @@ cleanup end end def running? - !@thread.nil? && @pid == Process.pid + !@thread.nil? && @pid == Process.pid && @thread.alive? end + def flush_socket(socket) + socket.flush + end + def disconnect if connected? logger.info "Disconnecting..." begin - with_timeout(EXIT_FLUSH_TIMEOUT) { @socket.flush } + with_timeout(EXIT_FLUSH_TIMEOUT) do + flush_socket(@socket) + end rescue Timeout::Error logger.info "Timed out flushing socket..." end @socket.close end @socket = nil + end + + def allows_secure? + defined?(OpenSSL) + end + + def certificates + if allows_secure? + base_dir = File.expand_path(File.join(File.dirname(__FILE__), "..", "..")) + %w{equifax geotrust rapidssl}.map do |name| + OpenSSL::X509::Certificate.new(File.open(File.join(base_dir, "certs", "#{name}.ca.pem"))) + end + else + [] + end end end end