lib/instrumental/agent.rb in instrumental_agent-0.12.7 vs lib/instrumental/agent.rb in instrumental_agent-0.13.0
- old
+ new
@@ -1,24 +1,26 @@
require 'instrumental/version'
require 'instrumental/system_timer'
require 'logger'
+require 'openssl' rescue nil
require 'thread'
require 'socket'
module Instrumental
class Agent
- BACKOFF = 2.0
+ BACKOFF = 2.0
+ CONNECT_TIMEOUT = 20
+ EXIT_FLUSH_TIMEOUT = 5
+ HOSTNAME = Socket.gethostbyname(Socket.gethostname).first rescue Socket.gethostname
+ MAX_BUFFER = 5000
MAX_RECONNECT_DELAY = 15
- MAX_BUFFER = 5000
- REPLY_TIMEOUT = 10
- CONNECT_TIMEOUT = 20
- EXIT_FLUSH_TIMEOUT = 5
- HOSTNAME = Socket.gethostbyname(Socket.gethostname).first rescue Socket.gethostname
+ REPLY_TIMEOUT = 10
+
attr_accessor :host, :port, :synchronous, :queue
- attr_reader :connection, :enabled
+ attr_reader :connection, :enabled, :secure
def self.logger=(l)
@logger = l
end
@@ -40,22 +42,37 @@
options.inject({}) { |m, (k, v)| m[(k.to_sym rescue k) || k] = v; m }
)
# defaults
# host: collector.instrumentalapp.com
- # port: 8000
+ # port: 8001
# enabled: true
# synchronous: false
+ # secure: true
+ # verify: true
@api_key = api_key
@host, @port = options[:collector].to_s.split(':')
@host ||= 'collector.instrumentalapp.com'
- @port = (@port || 8000).to_i
+ requested_secure = options[:secure] == true
+ desired_secure = options[:secure].nil? ? allows_secure? : !!options[:secure]
+ if !allows_secure? && desired_secure
+ logger.warn "Cannot connect to Instrumental via encrypted transport, SSL not available"
+ if requested_secure
+ options[:enabled] = false
+ logger.error "You requested secure protocol to connect to Instrumental, but it is not available on this system (OpenSSL is not defined). Connecting to Instrumental has been disabled."
+ end
+ desired_secure = false
+ end
+ @secure = desired_secure
+ @verify_cert = options[:verify_cert].nil? ? true : !!options[:verify_cert]
+ default_port = @secure ? 8001 : 8000
+ @port = (@port || default_port).to_i
@enabled = options.has_key?(:enabled) ? !!options[:enabled] : true
@synchronous = !!options[:synchronous]
@pid = Process.pid
@allow_reconnect = true
-
+ @certs = certificates
setup_cleanup_at_exit if @enabled
end
# Store a gauge for a metric, optionally at a specific time.
#
@@ -289,14 +306,45 @@
end
end
message
end
+ def wait_exceptions
+ classes = [Errno::EAGAIN]
+ if defined?(IO::EAGAINWaitReadable)
+ classes << IO::EAGAINWaitReadable
+ end
+ if defined?(IO::EWOULDBLOCKWaitReadable)
+ classes << IO::EWOULDBLOCKWaitReadable
+ end
+ if defined?(IO::WaitReadable)
+ classes << IO::WaitReadable
+ end
+ classes
+ end
+
+
def test_connection
begin
- @socket.read_nonblock(1)
- rescue Errno::EAGAIN
+ # In the case where the socket is an OpenSSL::SSL::SSLSocket,
+ # on Ruby 1.8.6, 1.8.7 or 1.9.1, read_nonblock does not exist,
+ # and so the case of testing socket liveliness via a nonblocking
+ # read that catches a wait condition won't work.
+ #
+ # We grab the SSL socket's underlying IO object and perform the
+ # non blocking read there in order to ensure the socket is still
+ # valid
+ if @socket.respond_to?(:read_nonblock)
+ @socket.read_nonblock(1)
+ elsif @socket.respond_to?(:io)
+ # The SSL Socket may send down additional data at close time,
+ # so we perform two nonblocking reads, one to pull any pending
+ # data on the socket, and the second to actually perform the connection
+ # liveliness test
+ @socket.io.read_nonblock(1024) && @socket.io.read_nonblock(1024)
+ end
+ rescue *wait_exceptions
# noop
end
end
def start_connection_worker
@@ -321,19 +369,33 @@
raise "Bad Response #{response.inspect} to #{message.inspect}"
end
end
end
+ def open_socket(remote_host, remote_port, secure, verify_cert)
+ sock = TCPSocket.open(remote_host, remote_port)
+ if secure
+ context = OpenSSL::SSL::SSLContext.new()
+ if verify_cert
+ context.set_params(:verify_mode => OpenSSL::SSL::VERIFY_PEER | OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT)
+ else
+ context.set_params(:verify_mode => OpenSSL::SSL::VERIFY_NONE)
+ end
+ ssl_socket = OpenSSL::SSL::SSLSocket.new(sock, context)
+ ssl_socket.sync_close = true
+ ssl_socket.connect
+ sock = ssl_socket
+ end
+ sock
+ end
-
def run_worker_loop
command_and_args = nil
command_options = nil
logger.info "connecting to collector"
- @socket = Socket.new(Socket::PF_INET, Socket::SOCK_STREAM, 0)
with_timeout(CONNECT_TIMEOUT) do
- @socket.connect Socket.pack_sockaddr_in(port, ipv4_address_for_host(host, port))
+ @socket = open_socket(host, port, @secure, @verify_cert)
end
logger.info "connected to collector at #{host}:#{port}"
hello_options = {
"version" => "ruby/instrumental_agent/#{VERSION}",
"hostname" => HOSTNAME,
@@ -402,20 +464,41 @@
def running?
!@thread.nil? && @pid == Process.pid
end
+ def flush_socket(socket)
+ socket.flush
+ end
+
def disconnect
if connected?
logger.info "Disconnecting..."
begin
- with_timeout(EXIT_FLUSH_TIMEOUT) { @socket.flush }
+ with_timeout(EXIT_FLUSH_TIMEOUT) do
+ flush_socket(@socket)
+ end
rescue Timeout::Error
logger.info "Timed out flushing socket..."
end
@socket.close
end
@socket = nil
+ end
+
+ def allows_secure?
+ defined?(OpenSSL)
+ end
+
+ def certificates
+ if allows_secure?
+ base_dir = File.expand_path(File.join(File.dirname(__FILE__), "..", ".."))
+ %w{equifax geotrust rapidssl}.map do |name|
+ OpenSSL::X509::Certificate.new(File.open(File.join(base_dir, "certs", "#{name}.ca.pem")))
+ end
+ else
+ []
+ end
end
end
end