lib/instrumental/agent.rb in instrumental_agent-0.2.0 vs lib/instrumental/agent.rb in instrumental_agent-0.3.0

- old
+ new

@@ -13,11 +13,11 @@ MAX_RECONNECT_DELAY = 15 MAX_BUFFER = 100 attr_accessor :host, :port attr_reader :connection, :enabled - + def self.logger=(l) @logger = l end def self.logger @@ -37,22 +37,24 @@ # Sets up a connection to the collector. # # Instrumental::Agent.new(API_KEY) # Instrumental::Agent.new(API_KEY, :collector => 'hostname:port') def initialize(api_key, options = {}) - default_options = { :enabled => true } - options = default_options.merge(options) - @api_key = api_key - if options[:collector] - @host, @port = options[:collector].split(':') - @port = (@port || 8000).to_i - else - @host = 'instrumentalapp.com' - @port = 8000 - end + default_options = { + :collector => 'instrumentalapp.com:8000', + :enabled => true, + :test_mode => false, + } + options = default_options.merge(options) + collector = options[:collector].split(':') - @enabled = options[:enabled] + @api_key = api_key + @host = collector[0] + @port = (collector[1] || 8000).to_i + @enabled = options[:enabled] + @test_mode = options[:test_mode] + if @enabled @failures = 0 @queue = Queue.new start_connection_thread end @@ -60,21 +62,34 @@ # Store a gauge for a metric, optionally at a specific time. # # agent.gauge('load', 1.23) def gauge(metric, value, time = Time.now) - if valid?(metric, value, time) - send_command("gauge", metric, value, time.to_i) + if valid?(metric, value, time) && + send_command("gauge", metric, value, time.to_i) + value + else + nil end + rescue Exception => e + report_exception(e) + nil end # Increment a metric, optionally more than one or at a specific time. # # agent.increment('users') def increment(metric, value = 1, time = Time.now) - valid?(metric, value, time) - send_command("increment", metric, value, time.to_i) + if valid?(metric, value, time) && + send_command("increment", metric, value, time.to_i) + value + else + nil + end + rescue Exception => e + report_exception(e) + nil end def enabled? @enabled end @@ -101,18 +116,25 @@ return false end true end + def report_exception(e) + logger.error "Exception occurred: #{e.message}" + logger.error e.backtrace.join("\n") + end + def send_command(cmd, *args) if enabled? cmd = "%s %s\n" % [cmd, args.collect(&:to_s).join(" ")] if @queue.size < MAX_BUFFER logger.debug "Queueing: #{cmd.chomp}" @queue << cmd + cmd else logger.warn "Dropping command, queue full(#{@queue.size}): #{cmd.chomp}" + nil end end end def test_server_connection @@ -123,16 +145,17 @@ # nop end end def start_connection_thread + logger.info "Starting thread" @thread = Thread.new do begin @socket = TCPSocket.new(host, port) @failures = 0 logger.info "connected to collector" - @socket.puts "hello version 0.0" + @socket.puts "hello version #{Instrumental::VERSION} test_mode #{@test_mode}" @socket.puts "authenticate #{@api_key}" loop do command_and_args = @queue.pop begin test_server_connection @@ -141,10 +164,11 @@ raise err end if command_and_args == 'exit' logger.info "exiting, #{@queue.size} commands remain" + @socket.flush Thread.exit else logger.debug "Sending: #{command_and_args.chomp}" @socket.puts command_and_args end @@ -171,6 +195,7 @@ end end end end end + end