lib/instrumental/agent.rb in instrumental_agent-0.11.1 vs lib/instrumental/agent.rb in instrumental_agent-0.12.0

- old
+ new

@@ -1,47 +1,12 @@ -require 'instrumental/rack/middleware' require 'instrumental/version' +require 'instrumental/system_timer' require 'logger' require 'thread' require 'socket' -if RUBY_VERSION < "1.9" && RUBY_PLATFORM != "java" - timeout_lib = nil - ["SystemTimer", "system_timer"].each do |lib| - begin - unless timeout_lib - gem lib - require "system_timer" - timeout_lib = SystemTimer - end - rescue Exception => e - end - end - if !timeout_lib - puts <<-EOMSG -WARNING:: You do not currently have system_timer installed. -It is strongly advised that you install this gem when using -instrumental_agent with Ruby 1.8.x. You can install it in -your Gemfile via: -gem 'system_timer' -or manually via: -gem install system_timer - EOMSG - require 'timeout' - InstrumentalTimeout = Timeout - else - InstrumentalTimeout = timeout_lib - end -else - require 'timeout' - InstrumentalTimeout = Timeout -end - -# Sets up a connection to the collector. -# -# Instrumental::Agent.new(API_KEY) module Instrumental class Agent BACKOFF = 2.0 MAX_RECONNECT_DELAY = 15 MAX_BUFFER = 5000 @@ -62,20 +27,10 @@ @logger.level = Logger::WARN end @logger end - def self.all - @agents ||= [] - end - - def self.new(*args) - inst = super - all << inst - inst - end - # Sets up a connection to the collector. # # Instrumental::Agent.new(API_KEY) # Instrumental::Agent.new(API_KEY, :collector => 'hostname:port') def initialize(api_key, options = {}) @@ -203,11 +158,11 @@ def logger=(logger) @logger = logger end def logger - @logger ||= self.class.logger + @logger || self.class.logger end # Stopping the agent will immediately stop all communication # to Instrumental. If you call this and submit another metric, # the agent will start again. @@ -288,14 +243,19 @@ if enabled? start_connection_worker if !running? cmd = "%s %s\n" % [cmd, args.collect { |a| a.to_s }.join(" ")] if @queue.size < MAX_BUFFER + @queue_full_warning = false logger.debug "Queueing: #{cmd.chomp}" queue_message(cmd, { :synchronous => @synchronous }) else - logger.warn "Dropping command, queue full(#{@queue.size}): #{cmd.chomp}" + if !@queue_full_warning + @queue_full_warning = true + logger.warn "Queue full(#{@queue.size}), dropping commands..." + end + logger.debug "Dropping command, queue full(#{@queue.size}): #{cmd.chomp}" nil end end end @@ -318,13 +278,12 @@ end message end def test_connection - # FIXME: Test connection state hack begin - @socket.read_nonblock(1) # TODO: put data back? + @socket.read_nonblock(1) rescue Errno::EAGAIN # noop end end @@ -356,11 +315,11 @@ command_and_args = nil command_options = nil logger.info "connecting to collector" @socket = with_timeout(CONNECT_TIMEOUT) { TCPSocket.new(host, port) } logger.info "connected to collector at #{host}:#{port}" - send_with_reply_timeout "hello version #{Instrumental::VERSION} hostname #{Socket.gethostname}" + send_with_reply_timeout "hello version #{Instrumental::VERSION} hostname #{Socket.gethostname} pid #{Process.pid}" send_with_reply_timeout "authenticate #{@api_key}" @failures = 0 loop do command_and_args, command_options = @queue.pop sync_resource = command_options && command_options[:sync_resource] @@ -416,10 +375,14 @@ end def disconnect if connected? logger.info "Disconnecting..." - @socket.flush + begin + with_timeout(EXIT_FLUSH_TIMEOUT) { @socket.flush } + rescue Timeout::Error + logger.info "Timed out flushing socket..." + end @socket.close end @socket = nil end