lib/instrumental/agent.rb in instrumental_agent-2.1.0 vs lib/instrumental/agent.rb in instrumental_agent-3.0.0.alpha

- old
+ new

@@ -73,11 +73,10 @@ @port = (@port || default_port).to_i @enabled = options.has_key?(:enabled) ? !!options[:enabled] : true @synchronous = !!options[:synchronous] @pid = Process.pid @allow_reconnect = true - @certs = certificates @dns_resolutions = 0 @last_connect_at = 0 @metrician = options[:metrician].nil? ? true : !!options[:metrician] @start_worker_mutex = Mutex.new @queue = Queue.new @@ -269,11 +268,11 @@ increment "agent.invalid_value" logger.warn "Invalid value #{value.inspect} for #{metric}" end def report_exception(e) - logger.error "Exception occurred: #{e.message}\n#{e.backtrace.join("\n")}" + logger.error "Exception of type #{e.class} occurred:\n#{e.message}\n#{e.backtrace.join("\n")}" end def ipv4_address_for_host(host, port, moment_to_connect = Time.now.to_i) self.dns_resolutions = dns_resolutions + 1 time_since_last_connect = moment_to_connect - last_connect_at @@ -348,27 +347,11 @@ end def test_connection begin - # In the case where the socket is an OpenSSL::SSL::SSLSocket, - # on Ruby 1.8.6, 1.8.7 or 1.9.1, read_nonblock does not exist, - # and so the case of testing socket liveliness via a nonblocking - # read that catches a wait condition won't work. - # - # We grab the SSL socket's underlying IO object and perform the - # non blocking read there in order to ensure the socket is still - # valid - if @socket.respond_to?(:read_nonblock) - @socket.read_nonblock(1) - elsif @socket.respond_to?(:io) - # The SSL Socket may send down additional data at close time, - # so we perform two nonblocking reads, one to pull any pending - # data on the socket, and the second to actually perform the connection - # liveliness test - @socket.io.read_nonblock(1024) && @socket.io.read_nonblock(1024) - end + @socket.read_nonblock(1) rescue *wait_exceptions # noop end end @@ -425,85 +408,89 @@ end sock end def run_worker_loop + @failures = 0 + begin + logger.info "connecting to collector" command_and_args = nil command_options = nil - logger.info "connecting to collector" - with_timeout(CONNECT_TIMEOUT) do - @socket = open_socket(@sockaddr_in, @secure, @verify_cert) - end - logger.info "connected to collector at #{host}:#{port}" - hello_options = { - "version" => "ruby/instrumental_agent/#{VERSION}", - "hostname" => HOSTNAME, - "pid" => Process.pid, - "runtime" => "#{defined?(RUBY_ENGINE) ? RUBY_ENGINE : "ruby"}/#{RUBY_VERSION}p#{RUBY_PATCHLEVEL}", - "platform" => RUBY_PLATFORM - }.to_a.flatten.map { |v| v.to_s.gsub(/\s+/, "_") }.join(" ") + with_timeout(CONNECT_TIMEOUT) do + @socket = open_socket(@sockaddr_in, @secure, @verify_cert) + end + logger.info "connected to collector at #{host}:#{port}" + hello_options = { + "version" => "ruby/instrumental_agent/#{VERSION}", + "hostname" => HOSTNAME, + "pid" => Process.pid, + "runtime" => "#{defined?(RUBY_ENGINE) ? RUBY_ENGINE : "ruby"}/#{RUBY_VERSION}p#{RUBY_PATCHLEVEL}", + "platform" => RUBY_PLATFORM + }.to_a.flatten.map { |v| v.to_s.gsub(/\s+/, "_") }.join(" ") - send_with_reply_timeout "hello #{hello_options}" - send_with_reply_timeout "authenticate #{@api_key}" - @failures = 0 - loop do - command_and_args, command_options = @queue.pop - if command_and_args - sync_resource = command_options && command_options[:sync_resource] - test_connection - case command_and_args - when 'exit' - logger.info "Exiting, #{@queue.size} commands remain" - return true - when 'flush' - release_resource = true - else - logger.debug "Sending: #{command_and_args.chomp}" - @socket.puts command_and_args - end - command_and_args = nil - command_options = nil - if sync_resource - @sync_mutex.synchronize do - sync_resource.signal + send_with_reply_timeout "hello #{hello_options}" + send_with_reply_timeout "authenticate #{@api_key}" + + loop do + command_and_args, command_options = @queue.pop + if command_and_args + sync_resource = command_options && command_options[:sync_resource] + test_connection + case command_and_args + when 'exit' + logger.info "Exiting, #{@queue.size} commands remain" + return true + when 'flush' + release_resource = true + else + logger.debug "Sending: #{command_and_args.chomp}" + @socket.puts command_and_args end + command_and_args = nil + command_options = nil + if sync_resource + @sync_mutex.synchronize do + sync_resource.signal + end + end end end - end - rescue Exception => err - allow_reconnect = @allow_reconnect - case err - when EOFError + rescue Exception => err + allow_reconnect = @allow_reconnect + case err + when EOFError # nop - when Errno::ECONNREFUSED, Errno::EHOSTUNREACH, Errno::EADDRINUSE, Timeout::Error - # If the connection has been refused by Instrumental - # or we cannot reach the server - # or the connection state of this socket is in a race - logger.error "unable to connect to Instrumental, hanging up with #{@queue.size} messages remaining" - logger.debug "Exception: #{err.inspect}\n#{err.backtrace.join("\n")}" - allow_reconnect = false - else - report_exception(err) + when Errno::ECONNREFUSED, Errno::EHOSTUNREACH, Errno::EADDRINUSE, Timeout::Error, OpenSSL::SSL::SSLError + # If the connection has been refused by Instrumental + # or we cannot reach the server + # or the connection state of this socket is in a race + # or SSL is not functioning properly for some reason + logger.error "unable to connect to Instrumental, hanging up with #{@queue.size} messages remaining" + logger.debug "Exception: #{err.inspect}\n#{err.backtrace.join("\n")}" + allow_reconnect = false + else + report_exception(err) + end + if allow_reconnect == false || + (command_options && command_options[:allow_reconnect] == false) + logger.info "Not trying to reconnect" + @failures = 0 + return + end + if command_and_args + logger.debug "requeueing: #{command_and_args}" + @queue << command_and_args + end + disconnect + @failures += 1 + delay = [(@failures - 1) ** BACKOFF, MAX_RECONNECT_DELAY].min + logger.error "disconnected, #{@failures} failures in a row, reconnect in #{delay}..." + sleep delay + retry + ensure + disconnect end - if allow_reconnect == false || - (command_options && command_options[:allow_reconnect] == false) - logger.info "Not trying to reconnect" - @failures = 0 - return - end - if command_and_args - logger.debug "requeueing: #{command_and_args}" - @queue << command_and_args - end - disconnect - @failures += 1 - delay = [(@failures - 1) ** BACKOFF, MAX_RECONNECT_DELAY].min - logger.error "disconnected, #{@failures} failures in a row, reconnect in #{delay}..." - sleep delay - retry - ensure - disconnect end def setup_cleanup_at_exit at_exit do cleanup @@ -539,20 +526,7 @@ end def allows_secure? defined?(OpenSSL) end - - def certificates - if allows_secure? - base_dir = File.expand_path(File.join(File.dirname(__FILE__), "..", "..")) - %w{equifax geotrust rapidssl}.map do |name| - OpenSSL::X509::Certificate.new(File.open(File.join(base_dir, "certs", "#{name}.ca.pem"))) - end - else - [] - end - end - end - end