lib/instrumental/agent.rb in instrumental_agent-0.8.3 vs lib/instrumental/agent.rb in instrumental_agent-0.9.0

- old
+ new

@@ -1,21 +1,28 @@ require 'instrumental/rack/middleware' require 'instrumental/version' require 'logger' require 'thread' require 'socket' -require 'timeout' +if RUBY_VERSION < "1.9" + require 'system_timer' +else + require 'timeout' +end + # Sets up a connection to the collector. # # Instrumental::Agent.new(API_KEY) module Instrumental class Agent BACKOFF = 2.0 MAX_RECONNECT_DELAY = 15 MAX_BUFFER = 5000 REPLY_TIMEOUT = 10 + CONNECT_TIMEOUT = 20 + EXIT_FLUSH_TIMEOUT = 5 attr_accessor :host, :port, :synchronous, :queue attr_reader :connection, :enabled def self.logger=(l) @@ -58,16 +65,18 @@ @host = collector[0] @port = (collector[1] || 8000).to_i @enabled = options[:enabled] @test_mode = options[:test_mode] @synchronous = options[:synchronous] + @allow_reconnect = true @pid = Process.pid if @enabled @failures = 0 @queue = Queue.new + @sync_mutex = Mutex.new start_connection_worker setup_cleanup_at_exit end end @@ -134,11 +143,11 @@ nil end # Send a notice to the server (deploys, downtime, etc.) # - # agent.notice('A notice') + # agent.notice('A notice') def notice(note, time = Time.now, duration = 0) if valid_note?(note) send_command("notice", time.to_i, duration.to_i, note) note else @@ -147,10 +156,23 @@ rescue Exception => e report_exception(e) nil end + # Synchronously flush all pending metrics out to the server + # By default will not try to reconnect to the server if a + # connection failure happens during the flush, though you + # may optionally override this behavior by passing true. + # + # agent.flush + def flush(allow_reconnect = false) + queue_message('flush', { + :synchronous => true, + :allow_reconnect => allow_reconnect + }) + end + def enabled? @enabled end def connected? @@ -165,10 +187,15 @@ @logger ||= self.class.logger end private + def with_timeout(time, &block) + tmr_klass = RUBY_VERSION < "1.9" ? SystemTimer : Timeout + tmr_klass.timeout(time) { yield } + end + def valid_note?(note) note !~ /[\n\r]/ end def valid?(metric, value, time) @@ -205,80 +232,108 @@ @socket = nil @queue = Queue.new start_connection_worker end - cmd = "%s %s\n" % [cmd, args.collect { |v| v.to_s }.join(" ")] + cmd = "%s %s\n" % [cmd, args.collect { |a| a.to_s }.join(" ")] if @queue.size < MAX_BUFFER logger.debug "Queueing: #{cmd.chomp}" - @main_thread = Thread.current if @synchronous - @queue << cmd - Thread.stop if @synchronous - cmd + queue_message(cmd, { :synchronous => @synchronous }) else logger.warn "Dropping command, queue full(#{@queue.size}): #{cmd.chomp}" nil end end end + def queue_message(message, options = {}) + if @enabled + options ||= {} + if options[:allow_reconnect].nil? + options[:allow_reconnect] = @allow_reconnect + end + synchronous = options.delete(:synchronous) + if synchronous + options[:sync_resource] ||= ConditionVariable.new + @queue << [message, options] + @sync_mutex.synchronize { + options[:sync_resource].wait(@sync_mutex) + } + else + @queue << [message, options] + end + end + message + end + def test_connection # FIXME: Test connection state hack begin @socket.read_nonblock(1) # TODO: put data back? rescue Errno::EAGAIN - # nop + # noop end end def start_connection_worker if enabled? disconnect logger.info "Starting thread" @thread = Thread.new do - loop do - break if connection_worker - end + run_worker_loop end end end def send_with_reply_timeout(message) @socket.puts message - Timeout.timeout(REPLY_TIMEOUT) do + with_timeout(REPLY_TIMEOUT) do response = @socket.gets if response.to_s.chomp != "ok" raise "Bad Response #{response.inspect} to #{message.inspect}" end end end - def connection_worker + def run_worker_loop command_and_args = nil + command_options = nil logger.info "connecting to collector" - @socket = TCPSocket.new(host, port) + @socket = with_timeout(CONNECT_TIMEOUT) { TCPSocket.new(host, port) } logger.info "connected to collector at #{host}:#{port}" send_with_reply_timeout "hello version #{Instrumental::VERSION} test_mode #{@test_mode}" send_with_reply_timeout "authenticate #{@api_key}" @failures = 0 loop do - command_and_args = @queue.pop + command_and_args, command_options = @queue.pop + sync_resource = command_options && command_options[:sync_resource] test_connection - case command_and_args when 'exit' logger.info "exiting, #{@queue.size} commands remain" return true + when 'flush' + release_resource = true else logger.debug "Sending: #{command_and_args.chomp}" @socket.puts command_and_args - command_and_args = nil end - @main_thread.run if @synchronous + command_and_args = nil + command_options = nil + if sync_resource + @sync_mutex.synchronize do + sync_resource.signal + end + end end rescue Exception => err - logger.error err.to_s + logger.debug err.backtrace.join("\n") + if @allow_reconnect == false || + (command_options && command_options[:allow_reconnect] == false) + logger.error "Not trying to reconnect" + return + end if command_and_args logger.debug "requeueing: #{command_and_args}" @queue << command_and_args end disconnect @@ -291,19 +346,23 @@ disconnect end def setup_cleanup_at_exit at_exit do - if !@queue.empty? && @thread.alive? - if @failures > 0 - logger.info "exit received but disconnected, dropping #{@queue.size} commands" - @thread.kill + logger.info "Cleaning up agent, queue empty: #{@queue.empty?}, thread running: #{@thread.alive?}" + @allow_reconnect = false + logger.info "exit received, currently #{@queue.size} commands to be sent" + queue_message('exit') + begin + with_timeout(EXIT_FLUSH_TIMEOUT) { @thread.join } + rescue Timeout::Error + if @queue.size > 0 + logger.error "Timed out working agent thread on exit, dropping #{@queue.size} metrics" else - logger.info "exit received, #{@queue.size} commands to be sent" - @queue << 'exit' - @thread.join + logger.error "Timed out Instrumental Agent, exiting" end end + end end def disconnect if connected?