lib/instrumental/agent.rb in instrumental_agent-0.12.7 vs lib/instrumental/agent.rb in instrumental_agent-0.13.0

- old
+ new

@@ -1,24 +1,26 @@ require 'instrumental/version' require 'instrumental/system_timer' require 'logger' +require 'openssl' rescue nil require 'thread' require 'socket' module Instrumental class Agent - BACKOFF = 2.0 + BACKOFF = 2.0 + CONNECT_TIMEOUT = 20 + EXIT_FLUSH_TIMEOUT = 5 + HOSTNAME = Socket.gethostbyname(Socket.gethostname).first rescue Socket.gethostname + MAX_BUFFER = 5000 MAX_RECONNECT_DELAY = 15 - MAX_BUFFER = 5000 - REPLY_TIMEOUT = 10 - CONNECT_TIMEOUT = 20 - EXIT_FLUSH_TIMEOUT = 5 - HOSTNAME = Socket.gethostbyname(Socket.gethostname).first rescue Socket.gethostname + REPLY_TIMEOUT = 10 + attr_accessor :host, :port, :synchronous, :queue - attr_reader :connection, :enabled + attr_reader :connection, :enabled, :secure def self.logger=(l) @logger = l end @@ -40,22 +42,37 @@ options.inject({}) { |m, (k, v)| m[(k.to_sym rescue k) || k] = v; m } ) # defaults # host: collector.instrumentalapp.com - # port: 8000 + # port: 8001 # enabled: true # synchronous: false + # secure: true + # verify: true @api_key = api_key @host, @port = options[:collector].to_s.split(':') @host ||= 'collector.instrumentalapp.com' - @port = (@port || 8000).to_i + requested_secure = options[:secure] == true + desired_secure = options[:secure].nil? ? allows_secure? : !!options[:secure] + if !allows_secure? && desired_secure + logger.warn "Cannot connect to Instrumental via encrypted transport, SSL not available" + if requested_secure + options[:enabled] = false + logger.error "You requested secure protocol to connect to Instrumental, but it is not available on this system (OpenSSL is not defined). Connecting to Instrumental has been disabled." + end + desired_secure = false + end + @secure = desired_secure + @verify_cert = options[:verify_cert].nil? ? true : !!options[:verify_cert] + default_port = @secure ? 8001 : 8000 + @port = (@port || default_port).to_i @enabled = options.has_key?(:enabled) ? !!options[:enabled] : true @synchronous = !!options[:synchronous] @pid = Process.pid @allow_reconnect = true - + @certs = certificates setup_cleanup_at_exit if @enabled end # Store a gauge for a metric, optionally at a specific time. # @@ -289,14 +306,45 @@ end end message end + def wait_exceptions + classes = [Errno::EAGAIN] + if defined?(IO::EAGAINWaitReadable) + classes << IO::EAGAINWaitReadable + end + if defined?(IO::EWOULDBLOCKWaitReadable) + classes << IO::EWOULDBLOCKWaitReadable + end + if defined?(IO::WaitReadable) + classes << IO::WaitReadable + end + classes + end + + def test_connection begin - @socket.read_nonblock(1) - rescue Errno::EAGAIN + # In the case where the socket is an OpenSSL::SSL::SSLSocket, + # on Ruby 1.8.6, 1.8.7 or 1.9.1, read_nonblock does not exist, + # and so the case of testing socket liveliness via a nonblocking + # read that catches a wait condition won't work. + # + # We grab the SSL socket's underlying IO object and perform the + # non blocking read there in order to ensure the socket is still + # valid + if @socket.respond_to?(:read_nonblock) + @socket.read_nonblock(1) + elsif @socket.respond_to?(:io) + # The SSL Socket may send down additional data at close time, + # so we perform two nonblocking reads, one to pull any pending + # data on the socket, and the second to actually perform the connection + # liveliness test + @socket.io.read_nonblock(1024) && @socket.io.read_nonblock(1024) + end + rescue *wait_exceptions # noop end end def start_connection_worker @@ -321,19 +369,33 @@ raise "Bad Response #{response.inspect} to #{message.inspect}" end end end + def open_socket(remote_host, remote_port, secure, verify_cert) + sock = TCPSocket.open(remote_host, remote_port) + if secure + context = OpenSSL::SSL::SSLContext.new() + if verify_cert + context.set_params(:verify_mode => OpenSSL::SSL::VERIFY_PEER | OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT) + else + context.set_params(:verify_mode => OpenSSL::SSL::VERIFY_NONE) + end + ssl_socket = OpenSSL::SSL::SSLSocket.new(sock, context) + ssl_socket.sync_close = true + ssl_socket.connect + sock = ssl_socket + end + sock + end - def run_worker_loop command_and_args = nil command_options = nil logger.info "connecting to collector" - @socket = Socket.new(Socket::PF_INET, Socket::SOCK_STREAM, 0) with_timeout(CONNECT_TIMEOUT) do - @socket.connect Socket.pack_sockaddr_in(port, ipv4_address_for_host(host, port)) + @socket = open_socket(host, port, @secure, @verify_cert) end logger.info "connected to collector at #{host}:#{port}" hello_options = { "version" => "ruby/instrumental_agent/#{VERSION}", "hostname" => HOSTNAME, @@ -402,20 +464,41 @@ def running? !@thread.nil? && @pid == Process.pid end + def flush_socket(socket) + socket.flush + end + def disconnect if connected? logger.info "Disconnecting..." begin - with_timeout(EXIT_FLUSH_TIMEOUT) { @socket.flush } + with_timeout(EXIT_FLUSH_TIMEOUT) do + flush_socket(@socket) + end rescue Timeout::Error logger.info "Timed out flushing socket..." end @socket.close end @socket = nil + end + + def allows_secure? + defined?(OpenSSL) + end + + def certificates + if allows_secure? + base_dir = File.expand_path(File.join(File.dirname(__FILE__), "..", "..")) + %w{equifax geotrust rapidssl}.map do |name| + OpenSSL::X509::Certificate.new(File.open(File.join(base_dir, "certs", "#{name}.ca.pem"))) + end + else + [] + end end end end