lib/instrumental/agent.rb in instrumental_agent-0.13.1 vs lib/instrumental/agent.rb in instrumental_agent-0.13.2
- old
+ new
@@ -1,25 +1,31 @@
require 'instrumental/version'
require 'instrumental/system_timer'
require 'logger'
+require 'openssl' rescue nil
+require 'resolv'
require 'thread'
require 'socket'
module Instrumental
class Agent
- BACKOFF = 2.0
- MAX_RECONNECT_DELAY = 15
- MAX_BUFFER = 5000
- REPLY_TIMEOUT = 10
- CONNECT_TIMEOUT = 20
- EXIT_FLUSH_TIMEOUT = 5
- HOSTNAME = Socket.gethostbyname(Socket.gethostname).first rescue Socket.gethostname
+ BACKOFF = 2.0
+ CONNECT_TIMEOUT = 20
+ EXIT_FLUSH_TIMEOUT = 5
+ HOSTNAME = Socket.gethostbyname(Socket.gethostname).first rescue Socket.gethostname
+ MAX_BUFFER = 5000
+ MAX_RECONNECT_DELAY = 15
+ REPLY_TIMEOUT = 10
+ RESOLUTION_FAILURES_BEFORE_WAITING = 3
+ RESOLUTION_WAIT = 30
+ RESOLVE_TIMEOUT = 1
- attr_accessor :host, :port, :synchronous, :queue
- attr_reader :connection, :enabled
+ attr_accessor :host, :port, :synchronous, :queue, :dns_resolutions, :last_connect_at
+ attr_reader :connection, :enabled, :secure
+
def self.logger=(l)
@logger = l
end
def self.logger
@@ -40,21 +46,39 @@
options.inject({}) { |m, (k, v)| m[(k.to_sym rescue k) || k] = v; m }
)
# defaults
# host: collector.instrumentalapp.com
- # port: 8000
+ # port: 8001
# enabled: true
# synchronous: false
+ # secure: true
+ # verify: true
@api_key = api_key
@host, @port = options[:collector].to_s.split(':')
@host ||= 'collector.instrumentalapp.com'
- @port = (@port || 8000).to_i
+ requested_secure = options[:secure] == true
+ desired_secure = options[:secure].nil? ? allows_secure? : !!options[:secure]
+ if !allows_secure? && desired_secure
+ logger.warn "Cannot connect to Instrumental via encrypted transport, SSL not available"
+ if requested_secure
+ options[:enabled] = false
+ logger.error "You requested secure protocol to connect to Instrumental, but it is not available on this system (OpenSSL is not defined). Connecting to Instrumental has been disabled."
+ end
+ desired_secure = false
+ end
+ @secure = desired_secure
+ @verify_cert = options[:verify_cert].nil? ? true : !!options[:verify_cert]
+ default_port = @secure ? 8001 : 8000
+ @port = (@port || default_port).to_i
@enabled = options.has_key?(:enabled) ? !!options[:enabled] : true
@synchronous = !!options[:synchronous]
@pid = Process.pid
@allow_reconnect = true
+ @certs = certificates
+ @dns_resolutions = 0
+ @last_connect_at = 0
setup_cleanup_at_exit if @enabled
end
# Store a gauge for a metric, optionally at a specific time.
@@ -177,10 +201,13 @@
disconnect
if @thread
@thread.kill
@thread = nil
end
+ if @queue
+ @queue.clear
+ end
end
# Called when a process is exiting to give it some extra time to
# push events to the service. An at_exit handler is automatically
# registered for this method, but can be called manually in cases
@@ -237,26 +264,33 @@
def report_exception(e)
logger.error "Exception occurred: #{e.message}\n#{e.backtrace.join("\n")}"
end
- def ipv4_address_for_host(host, port)
- addresses = Socket.getaddrinfo(host, port, 'AF_INET')
- if (result = addresses.first)
- _, _, _, address, _ = result
- address
- else
- raise Exception.new("Couldn't get address information for host #{host}")
+ def ipv4_address_for_host(host, port, moment_to_connect = Time.now.to_i)
+ self.dns_resolutions = dns_resolutions + 1
+ time_since_last_connect = moment_to_connect - last_connect_at
+ if dns_resolutions < RESOLUTION_FAILURES_BEFORE_WAITING || time_since_last_connect >= RESOLUTION_WAIT
+ self.last_connect_at = moment_to_connect
+ with_timeout(RESOLVE_TIMEOUT) do
+ address = Resolv.getaddresses(host).select { |address| address =~ Resolv::IPv4::Regex }.first
+ self.dns_resolutions = 0
+ address
+ end
end
+ rescue Exception => e
+ logger.warn "Couldn't resolve address for #{host}:#{port}"
+ report_exception(e)
+ nil
end
def send_command(cmd, *args)
cmd = "%s %s\n" % [cmd, args.collect { |a| a.to_s }.join(" ")]
if enabled?
- start_connection_worker if !running?
- if @queue.size < MAX_BUFFER
+ start_connection_worker if !running?
+ if @queue && @queue.size < MAX_BUFFER
@queue_full_warning = false
logger.debug "Queueing: #{cmd.chomp}"
queue_message(cmd, { :synchronous => @synchronous })
else
if !@queue_full_warning
@@ -289,28 +323,63 @@
end
end
message
end
+ def wait_exceptions
+ classes = [Errno::EAGAIN]
+ if defined?(IO::EAGAINWaitReadable)
+ classes << IO::EAGAINWaitReadable
+ end
+ if defined?(IO::EWOULDBLOCKWaitReadable)
+ classes << IO::EWOULDBLOCKWaitReadable
+ end
+ if defined?(IO::WaitReadable)
+ classes << IO::WaitReadable
+ end
+ classes
+ end
+
+
def test_connection
begin
- @socket.read_nonblock(1)
- rescue Errno::EAGAIN
+ # In the case where the socket is an OpenSSL::SSL::SSLSocket,
+ # on Ruby 1.8.6, 1.8.7 or 1.9.1, read_nonblock does not exist,
+ # and so the case of testing socket liveliness via a nonblocking
+ # read that catches a wait condition won't work.
+ #
+ # We grab the SSL socket's underlying IO object and perform the
+ # non blocking read there in order to ensure the socket is still
+ # valid
+ if @socket.respond_to?(:read_nonblock)
+ @socket.read_nonblock(1)
+ elsif @socket.respond_to?(:io)
+ # The SSL Socket may send down additional data at close time,
+ # so we perform two nonblocking reads, one to pull any pending
+ # data on the socket, and the second to actually perform the connection
+ # liveliness test
+ @socket.io.read_nonblock(1024) && @socket.io.read_nonblock(1024)
+ end
+ rescue *wait_exceptions
# noop
end
end
def start_connection_worker
if enabled?
disconnect
- @pid = Process.pid
- @queue = Queue.new
- @sync_mutex = Mutex.new
- @failures = 0
- logger.info "Starting thread"
- @thread = Thread.new do
- run_worker_loop
+ @queue ||= Queue.new
+ address = ipv4_address_for_host(@host, @port)
+ if address
+ @pid = Process.pid
+ @sync_mutex = Mutex.new
+ @failures = 0
+ @sockaddr_in = Socket.pack_sockaddr_in(@port, address)
+ logger.info "Starting thread"
+ @thread = Thread.new do
+ run_worker_loop
+ end
end
end
end
def send_with_reply_timeout(message)
@@ -321,19 +390,34 @@
raise "Bad Response #{response.inspect} to #{message.inspect}"
end
end
end
+ def open_socket(sockaddr_in, secure, verify_cert)
+ sock = Socket.new(Socket::PF_INET, Socket::SOCK_STREAM, 0)
+ sock.connect(sockaddr_in)
+ if secure
+ context = OpenSSL::SSL::SSLContext.new
+ if verify_cert
+ context.set_params(:verify_mode => OpenSSL::SSL::VERIFY_PEER | OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT)
+ else
+ context.set_params(:verify_mode => OpenSSL::SSL::VERIFY_NONE)
+ end
+ ssl_socket = OpenSSL::SSL::SSLSocket.new(sock, context)
+ ssl_socket.sync_close = true
+ ssl_socket.connect
+ sock = ssl_socket
+ end
+ sock
+ end
-
def run_worker_loop
command_and_args = nil
command_options = nil
logger.info "connecting to collector"
- @socket = Socket.new(Socket::PF_INET, Socket::SOCK_STREAM, 0)
with_timeout(CONNECT_TIMEOUT) do
- @socket.connect Socket.pack_sockaddr_in(port, ipv4_address_for_host(host, port))
+ @socket = open_socket(@sockaddr_in, @secure, @verify_cert)
end
logger.info "connected to collector at #{host}:#{port}"
hello_options = {
"version" => "ruby/instrumental_agent/#{VERSION}",
"hostname" => HOSTNAME,
@@ -345,41 +429,50 @@
send_with_reply_timeout "hello #{hello_options}"
send_with_reply_timeout "authenticate #{@api_key}"
@failures = 0
loop do
command_and_args, command_options = @queue.pop
- sync_resource = command_options && command_options[:sync_resource]
- test_connection
- case command_and_args
- when 'exit'
- logger.info "Exiting, #{@queue.size} commands remain"
- return true
- when 'flush'
- release_resource = true
- else
- logger.debug "Sending: #{command_and_args.chomp}"
- @socket.puts command_and_args
- end
- command_and_args = nil
- command_options = nil
- if sync_resource
- @sync_mutex.synchronize do
- sync_resource.signal
+ if command_and_args
+ sync_resource = command_options && command_options[:sync_resource]
+ test_connection
+ case command_and_args
+ when 'exit'
+ logger.info "Exiting, #{@queue.size} commands remain"
+ return true
+ when 'flush'
+ release_resource = true
+ else
+ logger.debug "Sending: #{command_and_args.chomp}"
+ @socket.puts command_and_args
end
+ command_and_args = nil
+ command_options = nil
+ if sync_resource
+ @sync_mutex.synchronize do
+ sync_resource.signal
+ end
+ end
end
end
rescue Exception => err
- if err.is_a?(EOFError)
+ allow_reconnect = @allow_reconnect
+ case err
+ when EOFError
# nop
- elsif err.is_a?(Errno::ECONNREFUSED)
- logger.error "unable to connect to Instrumental."
+ when Errno::ECONNREFUSED, Errno::EHOSTUNREACH, Errno::EADDRINUSE
+ # If the connection has been refused by Instrumental
+ # or we cannot reach the server
+ # or the connection state of this socket is in a race
+ logger.error "unable to connect to Instrumental, hanging up with #{@queue.size} messages remaining"
+ allow_reconnect = false
else
report_exception(err)
end
- if @allow_reconnect == false ||
+ if allow_reconnect == false ||
(command_options && command_options[:allow_reconnect] == false)
logger.info "Not trying to reconnect"
+ @failures = 0
return
end
if command_and_args
logger.debug "requeueing: #{command_and_args}"
@queue << command_and_args
@@ -399,23 +492,44 @@
cleanup
end
end
def running?
- !@thread.nil? && @pid == Process.pid
+ !@thread.nil? && @pid == Process.pid && @thread.alive?
end
+ def flush_socket(socket)
+ socket.flush
+ end
+
def disconnect
if connected?
logger.info "Disconnecting..."
begin
- with_timeout(EXIT_FLUSH_TIMEOUT) { @socket.flush }
+ with_timeout(EXIT_FLUSH_TIMEOUT) do
+ flush_socket(@socket)
+ end
rescue Timeout::Error
logger.info "Timed out flushing socket..."
end
@socket.close
end
@socket = nil
+ end
+
+ def allows_secure?
+ defined?(OpenSSL)
+ end
+
+ def certificates
+ if allows_secure?
+ base_dir = File.expand_path(File.join(File.dirname(__FILE__), "..", ".."))
+ %w{equifax geotrust rapidssl}.map do |name|
+ OpenSSL::X509::Certificate.new(File.open(File.join(base_dir, "certs", "#{name}.ca.pem")))
+ end
+ else
+ []
+ end
end
end
end