lib/instrumental/agent.rb in instrumental_agent-2.1.0 vs lib/instrumental/agent.rb in instrumental_agent-3.0.0.alpha
- old
+ new
@@ -73,11 +73,10 @@
@port = (@port || default_port).to_i
@enabled = options.has_key?(:enabled) ? !!options[:enabled] : true
@synchronous = !!options[:synchronous]
@pid = Process.pid
@allow_reconnect = true
- @certs = certificates
@dns_resolutions = 0
@last_connect_at = 0
@metrician = options[:metrician].nil? ? true : !!options[:metrician]
@start_worker_mutex = Mutex.new
@queue = Queue.new
@@ -269,11 +268,11 @@
increment "agent.invalid_value"
logger.warn "Invalid value #{value.inspect} for #{metric}"
end
def report_exception(e)
- logger.error "Exception occurred: #{e.message}\n#{e.backtrace.join("\n")}"
+ logger.error "Exception of type #{e.class} occurred:\n#{e.message}\n#{e.backtrace.join("\n")}"
end
def ipv4_address_for_host(host, port, moment_to_connect = Time.now.to_i)
self.dns_resolutions = dns_resolutions + 1
time_since_last_connect = moment_to_connect - last_connect_at
@@ -348,27 +347,11 @@
end
def test_connection
begin
- # In the case where the socket is an OpenSSL::SSL::SSLSocket,
- # on Ruby 1.8.6, 1.8.7 or 1.9.1, read_nonblock does not exist,
- # and so the case of testing socket liveliness via a nonblocking
- # read that catches a wait condition won't work.
- #
- # We grab the SSL socket's underlying IO object and perform the
- # non blocking read there in order to ensure the socket is still
- # valid
- if @socket.respond_to?(:read_nonblock)
- @socket.read_nonblock(1)
- elsif @socket.respond_to?(:io)
- # The SSL Socket may send down additional data at close time,
- # so we perform two nonblocking reads, one to pull any pending
- # data on the socket, and the second to actually perform the connection
- # liveliness test
- @socket.io.read_nonblock(1024) && @socket.io.read_nonblock(1024)
- end
+ @socket.read_nonblock(1)
rescue *wait_exceptions
# noop
end
end
@@ -425,85 +408,89 @@
end
sock
end
def run_worker_loop
+ @failures = 0
+ begin
+ logger.info "connecting to collector"
command_and_args = nil
command_options = nil
- logger.info "connecting to collector"
- with_timeout(CONNECT_TIMEOUT) do
- @socket = open_socket(@sockaddr_in, @secure, @verify_cert)
- end
- logger.info "connected to collector at #{host}:#{port}"
- hello_options = {
- "version" => "ruby/instrumental_agent/#{VERSION}",
- "hostname" => HOSTNAME,
- "pid" => Process.pid,
- "runtime" => "#{defined?(RUBY_ENGINE) ? RUBY_ENGINE : "ruby"}/#{RUBY_VERSION}p#{RUBY_PATCHLEVEL}",
- "platform" => RUBY_PLATFORM
- }.to_a.flatten.map { |v| v.to_s.gsub(/\s+/, "_") }.join(" ")
+ with_timeout(CONNECT_TIMEOUT) do
+ @socket = open_socket(@sockaddr_in, @secure, @verify_cert)
+ end
+ logger.info "connected to collector at #{host}:#{port}"
+ hello_options = {
+ "version" => "ruby/instrumental_agent/#{VERSION}",
+ "hostname" => HOSTNAME,
+ "pid" => Process.pid,
+ "runtime" => "#{defined?(RUBY_ENGINE) ? RUBY_ENGINE : "ruby"}/#{RUBY_VERSION}p#{RUBY_PATCHLEVEL}",
+ "platform" => RUBY_PLATFORM
+ }.to_a.flatten.map { |v| v.to_s.gsub(/\s+/, "_") }.join(" ")
- send_with_reply_timeout "hello #{hello_options}"
- send_with_reply_timeout "authenticate #{@api_key}"
- @failures = 0
- loop do
- command_and_args, command_options = @queue.pop
- if command_and_args
- sync_resource = command_options && command_options[:sync_resource]
- test_connection
- case command_and_args
- when 'exit'
- logger.info "Exiting, #{@queue.size} commands remain"
- return true
- when 'flush'
- release_resource = true
- else
- logger.debug "Sending: #{command_and_args.chomp}"
- @socket.puts command_and_args
- end
- command_and_args = nil
- command_options = nil
- if sync_resource
- @sync_mutex.synchronize do
- sync_resource.signal
+ send_with_reply_timeout "hello #{hello_options}"
+ send_with_reply_timeout "authenticate #{@api_key}"
+
+ loop do
+ command_and_args, command_options = @queue.pop
+ if command_and_args
+ sync_resource = command_options && command_options[:sync_resource]
+ test_connection
+ case command_and_args
+ when 'exit'
+ logger.info "Exiting, #{@queue.size} commands remain"
+ return true
+ when 'flush'
+ release_resource = true
+ else
+ logger.debug "Sending: #{command_and_args.chomp}"
+ @socket.puts command_and_args
end
+ command_and_args = nil
+ command_options = nil
+ if sync_resource
+ @sync_mutex.synchronize do
+ sync_resource.signal
+ end
+ end
end
end
- end
- rescue Exception => err
- allow_reconnect = @allow_reconnect
- case err
- when EOFError
+ rescue Exception => err
+ allow_reconnect = @allow_reconnect
+ case err
+ when EOFError
# nop
- when Errno::ECONNREFUSED, Errno::EHOSTUNREACH, Errno::EADDRINUSE, Timeout::Error
- # If the connection has been refused by Instrumental
- # or we cannot reach the server
- # or the connection state of this socket is in a race
- logger.error "unable to connect to Instrumental, hanging up with #{@queue.size} messages remaining"
- logger.debug "Exception: #{err.inspect}\n#{err.backtrace.join("\n")}"
- allow_reconnect = false
- else
- report_exception(err)
+ when Errno::ECONNREFUSED, Errno::EHOSTUNREACH, Errno::EADDRINUSE, Timeout::Error, OpenSSL::SSL::SSLError
+ # If the connection has been refused by Instrumental
+ # or we cannot reach the server
+ # or the connection state of this socket is in a race
+ # or SSL is not functioning properly for some reason
+ logger.error "unable to connect to Instrumental, hanging up with #{@queue.size} messages remaining"
+ logger.debug "Exception: #{err.inspect}\n#{err.backtrace.join("\n")}"
+ allow_reconnect = false
+ else
+ report_exception(err)
+ end
+ if allow_reconnect == false ||
+ (command_options && command_options[:allow_reconnect] == false)
+ logger.info "Not trying to reconnect"
+ @failures = 0
+ return
+ end
+ if command_and_args
+ logger.debug "requeueing: #{command_and_args}"
+ @queue << command_and_args
+ end
+ disconnect
+ @failures += 1
+ delay = [(@failures - 1) ** BACKOFF, MAX_RECONNECT_DELAY].min
+ logger.error "disconnected, #{@failures} failures in a row, reconnect in #{delay}..."
+ sleep delay
+ retry
+ ensure
+ disconnect
end
- if allow_reconnect == false ||
- (command_options && command_options[:allow_reconnect] == false)
- logger.info "Not trying to reconnect"
- @failures = 0
- return
- end
- if command_and_args
- logger.debug "requeueing: #{command_and_args}"
- @queue << command_and_args
- end
- disconnect
- @failures += 1
- delay = [(@failures - 1) ** BACKOFF, MAX_RECONNECT_DELAY].min
- logger.error "disconnected, #{@failures} failures in a row, reconnect in #{delay}..."
- sleep delay
- retry
- ensure
- disconnect
end
def setup_cleanup_at_exit
at_exit do
cleanup
@@ -539,20 +526,7 @@
end
def allows_secure?
defined?(OpenSSL)
end
-
- def certificates
- if allows_secure?
- base_dir = File.expand_path(File.join(File.dirname(__FILE__), "..", ".."))
- %w{equifax geotrust rapidssl}.map do |name|
- OpenSSL::X509::Certificate.new(File.open(File.join(base_dir, "certs", "#{name}.ca.pem")))
- end
- else
- []
- end
- end
-
end
-
end