lib/instrumental/agent.rb in instrumental_agent-0.9.1 vs lib/instrumental/agent.rb in instrumental_agent-0.9.5
- old
+ new
@@ -1,10 +1,11 @@
require 'instrumental/rack/middleware'
require 'instrumental/version'
require 'logger'
require 'thread'
require 'socket'
+
if RUBY_VERSION < "1.9" && RUBY_PLATFORM != "java"
begin
gem 'system_timer'
require 'system_timer'
InstrumentalTimeout = SystemTimer
@@ -67,36 +68,32 @@
# Sets up a connection to the collector.
#
# Instrumental::Agent.new(API_KEY)
# Instrumental::Agent.new(API_KEY, :collector => 'hostname:port')
def initialize(api_key, options = {})
- default_options = {
- :collector => 'instrumentalapp.com:8000',
- :enabled => true,
- :test_mode => false,
- :synchronous => false
- }
- options = default_options.merge(options)
- collector = options[:collector].split(':')
+ # symbolize options keys
+ options.replace(
+ options.inject({}) { |m, (k, v)| m[(k.to_sym rescue k) || k] = v; m }
+ )
- @api_key = api_key
- @host = collector[0]
- @port = (collector[1] || 8000).to_i
- @enabled = options[:enabled]
- @test_mode = options[:test_mode]
- @synchronous = options[:synchronous]
+ # defaults
+ # host: instrumentalapp.com
+ # port: 8000
+ # enabled: true
+ # test_mode: false
+ # synchronous: false
+ @api_key = api_key
+ @host, @port = options[:collector].to_s.split(':')
+ @host ||= 'instrumentalapp.com'
+ @port = (@port || 8000).to_i
+ @enabled = options.has_key?(:enabled) ? !!options[:enabled] : true
+ @test_mode = !!options[:test_mode]
+ @synchronous = !!options[:synchronous]
+ @pid = Process.pid
@allow_reconnect = true
- @pid = Process.pid
-
- if @enabled
- @failures = 0
- @queue = Queue.new
- @sync_mutex = Mutex.new
- start_connection_worker
- setup_cleanup_at_exit
- end
+ setup_cleanup_at_exit if @enabled
end
# Store a gauge for a metric, optionally at a specific time.
#
# agent.gauge('load', 1.23)
@@ -202,10 +199,27 @@
def logger
@logger ||= self.class.logger
end
+ # Stopping the agent will immediately stop all communication
+ # to Instrumental. If you call this and submit another metric,
+ # the agent will start again.
+ #
+ # Calling stop will cause all metrics waiting to be sent to be
+ # discarded. Don't call it unless you are expecting this behavior.
+ #
+ # agent.stop
+ #
+ def stop
+ disconnect
+ if @thread
+ @thread.kill
+ @thread = nil
+ end
+ end
+
private
def with_timeout(time, &block)
InstrumentalTimeout.timeout(time) { yield }
end
@@ -240,17 +254,11 @@
logger.error e.backtrace.join("\n")
end
def send_command(cmd, *args)
if enabled?
- if @pid != Process.pid
- logger.info "Detected fork"
- @pid = Process.pid
- @socket = nil
- @queue = Queue.new
- start_connection_worker
- end
+ start_connection_worker if !running?
cmd = "%s %s\n" % [cmd, args.collect { |a| a.to_s }.join(" ")]
if @queue.size < MAX_BUFFER
logger.debug "Queueing: #{cmd.chomp}"
queue_message(cmd, { :synchronous => @synchronous })
@@ -291,10 +299,14 @@
end
def start_connection_worker
if enabled?
disconnect
+ @pid = Process.pid
+ @queue = Queue.new
+ @sync_mutex = Mutex.new
+ @failures = 0
logger.info "Starting thread"
@thread = Thread.new do
run_worker_loop
end
end
@@ -362,24 +374,29 @@
disconnect
end
def setup_cleanup_at_exit
at_exit do
- logger.info "Cleaning up agent, queue empty: #{@queue.empty?}, thread running: #{@thread.alive?}"
- @allow_reconnect = false
- logger.info "exit received, currently #{@queue.size} commands to be sent"
- queue_message('exit')
- begin
- with_timeout(EXIT_FLUSH_TIMEOUT) { @thread.join }
- rescue Timeout::Error
- if @queue.size > 0
- logger.error "Timed out working agent thread on exit, dropping #{@queue.size} metrics"
- else
- logger.error "Timed out Instrumental Agent, exiting"
+ if running?
+ logger.info "Cleaning up agent, queue empty: #{@queue.empty?}, thread running: #{@thread.alive?}"
+ @allow_reconnect = false
+ logger.info "exit received, currently #{@queue.size} commands to be sent"
+ queue_message('exit')
+ begin
+ with_timeout(EXIT_FLUSH_TIMEOUT) { @thread.join }
+ rescue Timeout::Error
+ if @queue.size > 0
+ logger.error "Timed out working agent thread on exit, dropping #{@queue.size} metrics"
+ else
+ logger.error "Timed out Instrumental Agent, exiting"
+ end
end
end
-
end
+ end
+
+ def running?
+ !@thread.nil? && @pid == Process.pid
end
def disconnect
if connected?
logger.info "Disconnecting..."