lib/instrumental/agent.rb in instrumental_agent-0.9.1 vs lib/instrumental/agent.rb in instrumental_agent-0.9.5

- old
+ new

@@ -1,10 +1,11 @@ require 'instrumental/rack/middleware' require 'instrumental/version' require 'logger' require 'thread' require 'socket' + if RUBY_VERSION < "1.9" && RUBY_PLATFORM != "java" begin gem 'system_timer' require 'system_timer' InstrumentalTimeout = SystemTimer @@ -67,36 +68,32 @@ # Sets up a connection to the collector. # # Instrumental::Agent.new(API_KEY) # Instrumental::Agent.new(API_KEY, :collector => 'hostname:port') def initialize(api_key, options = {}) - default_options = { - :collector => 'instrumentalapp.com:8000', - :enabled => true, - :test_mode => false, - :synchronous => false - } - options = default_options.merge(options) - collector = options[:collector].split(':') + # symbolize options keys + options.replace( + options.inject({}) { |m, (k, v)| m[(k.to_sym rescue k) || k] = v; m } + ) - @api_key = api_key - @host = collector[0] - @port = (collector[1] || 8000).to_i - @enabled = options[:enabled] - @test_mode = options[:test_mode] - @synchronous = options[:synchronous] + # defaults + # host: instrumentalapp.com + # port: 8000 + # enabled: true + # test_mode: false + # synchronous: false + @api_key = api_key + @host, @port = options[:collector].to_s.split(':') + @host ||= 'instrumentalapp.com' + @port = (@port || 8000).to_i + @enabled = options.has_key?(:enabled) ? !!options[:enabled] : true + @test_mode = !!options[:test_mode] + @synchronous = !!options[:synchronous] + @pid = Process.pid @allow_reconnect = true - @pid = Process.pid - - if @enabled - @failures = 0 - @queue = Queue.new - @sync_mutex = Mutex.new - start_connection_worker - setup_cleanup_at_exit - end + setup_cleanup_at_exit if @enabled end # Store a gauge for a metric, optionally at a specific time. # # agent.gauge('load', 1.23) @@ -202,10 +199,27 @@ def logger @logger ||= self.class.logger end + # Stopping the agent will immediately stop all communication + # to Instrumental. If you call this and submit another metric, + # the agent will start again. + # + # Calling stop will cause all metrics waiting to be sent to be + # discarded. Don't call it unless you are expecting this behavior. + # + # agent.stop + # + def stop + disconnect + if @thread + @thread.kill + @thread = nil + end + end + private def with_timeout(time, &block) InstrumentalTimeout.timeout(time) { yield } end @@ -240,17 +254,11 @@ logger.error e.backtrace.join("\n") end def send_command(cmd, *args) if enabled? - if @pid != Process.pid - logger.info "Detected fork" - @pid = Process.pid - @socket = nil - @queue = Queue.new - start_connection_worker - end + start_connection_worker if !running? cmd = "%s %s\n" % [cmd, args.collect { |a| a.to_s }.join(" ")] if @queue.size < MAX_BUFFER logger.debug "Queueing: #{cmd.chomp}" queue_message(cmd, { :synchronous => @synchronous }) @@ -291,10 +299,14 @@ end def start_connection_worker if enabled? disconnect + @pid = Process.pid + @queue = Queue.new + @sync_mutex = Mutex.new + @failures = 0 logger.info "Starting thread" @thread = Thread.new do run_worker_loop end end @@ -362,24 +374,29 @@ disconnect end def setup_cleanup_at_exit at_exit do - logger.info "Cleaning up agent, queue empty: #{@queue.empty?}, thread running: #{@thread.alive?}" - @allow_reconnect = false - logger.info "exit received, currently #{@queue.size} commands to be sent" - queue_message('exit') - begin - with_timeout(EXIT_FLUSH_TIMEOUT) { @thread.join } - rescue Timeout::Error - if @queue.size > 0 - logger.error "Timed out working agent thread on exit, dropping #{@queue.size} metrics" - else - logger.error "Timed out Instrumental Agent, exiting" + if running? + logger.info "Cleaning up agent, queue empty: #{@queue.empty?}, thread running: #{@thread.alive?}" + @allow_reconnect = false + logger.info "exit received, currently #{@queue.size} commands to be sent" + queue_message('exit') + begin + with_timeout(EXIT_FLUSH_TIMEOUT) { @thread.join } + rescue Timeout::Error + if @queue.size > 0 + logger.error "Timed out working agent thread on exit, dropping #{@queue.size} metrics" + else + logger.error "Timed out Instrumental Agent, exiting" + end end end - end + end + + def running? + !@thread.nil? && @pid == Process.pid end def disconnect if connected? logger.info "Disconnecting..."