lib/instrumental/agent.rb in instrumental_agent-0.2.0 vs lib/instrumental/agent.rb in instrumental_agent-0.3.0
- old
+ new
@@ -13,11 +13,11 @@
MAX_RECONNECT_DELAY = 15
MAX_BUFFER = 100
attr_accessor :host, :port
attr_reader :connection, :enabled
-
+
def self.logger=(l)
@logger = l
end
def self.logger
@@ -37,22 +37,24 @@
# Sets up a connection to the collector.
#
# Instrumental::Agent.new(API_KEY)
# Instrumental::Agent.new(API_KEY, :collector => 'hostname:port')
def initialize(api_key, options = {})
- default_options = { :enabled => true }
- options = default_options.merge(options)
- @api_key = api_key
- if options[:collector]
- @host, @port = options[:collector].split(':')
- @port = (@port || 8000).to_i
- else
- @host = 'instrumentalapp.com'
- @port = 8000
- end
+ default_options = {
+ :collector => 'instrumentalapp.com:8000',
+ :enabled => true,
+ :test_mode => false,
+ }
+ options = default_options.merge(options)
+ collector = options[:collector].split(':')
- @enabled = options[:enabled]
+ @api_key = api_key
+ @host = collector[0]
+ @port = (collector[1] || 8000).to_i
+ @enabled = options[:enabled]
+ @test_mode = options[:test_mode]
+
if @enabled
@failures = 0
@queue = Queue.new
start_connection_thread
end
@@ -60,21 +62,34 @@
# Store a gauge for a metric, optionally at a specific time.
#
# agent.gauge('load', 1.23)
def gauge(metric, value, time = Time.now)
- if valid?(metric, value, time)
- send_command("gauge", metric, value, time.to_i)
+ if valid?(metric, value, time) &&
+ send_command("gauge", metric, value, time.to_i)
+ value
+ else
+ nil
end
+ rescue Exception => e
+ report_exception(e)
+ nil
end
# Increment a metric, optionally more than one or at a specific time.
#
# agent.increment('users')
def increment(metric, value = 1, time = Time.now)
- valid?(metric, value, time)
- send_command("increment", metric, value, time.to_i)
+ if valid?(metric, value, time) &&
+ send_command("increment", metric, value, time.to_i)
+ value
+ else
+ nil
+ end
+ rescue Exception => e
+ report_exception(e)
+ nil
end
def enabled?
@enabled
end
@@ -101,18 +116,25 @@
return false
end
true
end
+ def report_exception(e)
+ logger.error "Exception occurred: #{e.message}"
+ logger.error e.backtrace.join("\n")
+ end
+
def send_command(cmd, *args)
if enabled?
cmd = "%s %s\n" % [cmd, args.collect(&:to_s).join(" ")]
if @queue.size < MAX_BUFFER
logger.debug "Queueing: #{cmd.chomp}"
@queue << cmd
+ cmd
else
logger.warn "Dropping command, queue full(#{@queue.size}): #{cmd.chomp}"
+ nil
end
end
end
def test_server_connection
@@ -123,16 +145,17 @@
# nop
end
end
def start_connection_thread
+ logger.info "Starting thread"
@thread = Thread.new do
begin
@socket = TCPSocket.new(host, port)
@failures = 0
logger.info "connected to collector"
- @socket.puts "hello version 0.0"
+ @socket.puts "hello version #{Instrumental::VERSION} test_mode #{@test_mode}"
@socket.puts "authenticate #{@api_key}"
loop do
command_and_args = @queue.pop
begin
test_server_connection
@@ -141,10 +164,11 @@
raise err
end
if command_and_args == 'exit'
logger.info "exiting, #{@queue.size} commands remain"
+ @socket.flush
Thread.exit
else
logger.debug "Sending: #{command_and_args.chomp}"
@socket.puts command_and_args
end
@@ -171,6 +195,7 @@
end
end
end
end
end
+
end