lib/instrumental/agent.rb in instrumental_agent-0.11.1 vs lib/instrumental/agent.rb in instrumental_agent-0.12.0
- old
+ new
@@ -1,47 +1,12 @@
-require 'instrumental/rack/middleware'
require 'instrumental/version'
+require 'instrumental/system_timer'
require 'logger'
require 'thread'
require 'socket'
-if RUBY_VERSION < "1.9" && RUBY_PLATFORM != "java"
- timeout_lib = nil
- ["SystemTimer", "system_timer"].each do |lib|
- begin
- unless timeout_lib
- gem lib
- require "system_timer"
- timeout_lib = SystemTimer
- end
- rescue Exception => e
- end
- end
- if !timeout_lib
- puts <<-EOMSG
-WARNING:: You do not currently have system_timer installed.
-It is strongly advised that you install this gem when using
-instrumental_agent with Ruby 1.8.x. You can install it in
-your Gemfile via:
-gem 'system_timer'
-or manually via:
-gem install system_timer
- EOMSG
- require 'timeout'
- InstrumentalTimeout = Timeout
- else
- InstrumentalTimeout = timeout_lib
- end
-else
- require 'timeout'
- InstrumentalTimeout = Timeout
-end
-
-# Sets up a connection to the collector.
-#
-# Instrumental::Agent.new(API_KEY)
module Instrumental
class Agent
BACKOFF = 2.0
MAX_RECONNECT_DELAY = 15
MAX_BUFFER = 5000
@@ -62,20 +27,10 @@
@logger.level = Logger::WARN
end
@logger
end
- def self.all
- @agents ||= []
- end
-
- def self.new(*args)
- inst = super
- all << inst
- inst
- end
-
# Sets up a connection to the collector.
#
# Instrumental::Agent.new(API_KEY)
# Instrumental::Agent.new(API_KEY, :collector => 'hostname:port')
def initialize(api_key, options = {})
@@ -203,11 +158,11 @@
def logger=(logger)
@logger = logger
end
def logger
- @logger ||= self.class.logger
+ @logger || self.class.logger
end
# Stopping the agent will immediately stop all communication
# to Instrumental. If you call this and submit another metric,
# the agent will start again.
@@ -288,14 +243,19 @@
if enabled?
start_connection_worker if !running?
cmd = "%s %s\n" % [cmd, args.collect { |a| a.to_s }.join(" ")]
if @queue.size < MAX_BUFFER
+ @queue_full_warning = false
logger.debug "Queueing: #{cmd.chomp}"
queue_message(cmd, { :synchronous => @synchronous })
else
- logger.warn "Dropping command, queue full(#{@queue.size}): #{cmd.chomp}"
+ if !@queue_full_warning
+ @queue_full_warning = true
+ logger.warn "Queue full(#{@queue.size}), dropping commands..."
+ end
+ logger.debug "Dropping command, queue full(#{@queue.size}): #{cmd.chomp}"
nil
end
end
end
@@ -318,13 +278,12 @@
end
message
end
def test_connection
- # FIXME: Test connection state hack
begin
- @socket.read_nonblock(1) # TODO: put data back?
+ @socket.read_nonblock(1)
rescue Errno::EAGAIN
# noop
end
end
@@ -356,11 +315,11 @@
command_and_args = nil
command_options = nil
logger.info "connecting to collector"
@socket = with_timeout(CONNECT_TIMEOUT) { TCPSocket.new(host, port) }
logger.info "connected to collector at #{host}:#{port}"
- send_with_reply_timeout "hello version #{Instrumental::VERSION} hostname #{Socket.gethostname}"
+ send_with_reply_timeout "hello version #{Instrumental::VERSION} hostname #{Socket.gethostname} pid #{Process.pid}"
send_with_reply_timeout "authenticate #{@api_key}"
@failures = 0
loop do
command_and_args, command_options = @queue.pop
sync_resource = command_options && command_options[:sync_resource]
@@ -416,10 +375,14 @@
end
def disconnect
if connected?
logger.info "Disconnecting..."
- @socket.flush
+ begin
+ with_timeout(EXIT_FLUSH_TIMEOUT) { @socket.flush }
+ rescue Timeout::Error
+ logger.info "Timed out flushing socket..."
+ end
@socket.close
end
@socket = nil
end