lib/instrumental/agent.rb in instrumental_agent-0.3.0 vs lib/instrumental/agent.rb in instrumental_agent-0.4.0

- old
+ new

@@ -18,12 +18,12 @@ def self.logger=(l) @logger = l end - def self.logger - @logger ||= Logger.new('/dev/null') + def self.logger(force = false) + @logger ||= Logger.new(File.open('/dev/null', 'a')) # append mode so it's forksafe end def self.all @agents ||= [] end @@ -50,15 +50,18 @@ @api_key = api_key @host = collector[0] @port = (collector[1] || 8000).to_i @enabled = options[:enabled] @test_mode = options[:test_mode] + @pid = Process.pid + if @enabled @failures = 0 @queue = Queue.new - start_connection_thread + start_connection_worker + setup_cleanup_at_exit end end # Store a gauge for a metric, optionally at a specific time. # @@ -93,11 +96,11 @@ def enabled? @enabled end def connected? - connection && connection.connected + @socket && !@socket.closed? end def logger self.class.logger end @@ -123,10 +126,18 @@ logger.error e.backtrace.join("\n") end def send_command(cmd, *args) if enabled? + if @pid != Process.pid + logger.info "Detected fork" + @pid = Process.pid + @socket = nil + @queue = Queue.new + start_connection_worker + end + cmd = "%s %s\n" % [cmd, args.collect(&:to_s).join(" ")] if @queue.size < MAX_BUFFER logger.debug "Queueing: #{cmd.chomp}" @queue << cmd cmd @@ -135,56 +146,70 @@ nil end end end - def test_server_connection + def test_connection # FIXME: Test connection state hack begin @socket.read_nonblock(1) # TODO: put data back? rescue Errno::EAGAIN # nop end end - def start_connection_thread - logger.info "Starting thread" - @thread = Thread.new do - begin - @socket = TCPSocket.new(host, port) - @failures = 0 - logger.info "connected to collector" - @socket.puts "hello version #{Instrumental::VERSION} test_mode #{@test_mode}" - @socket.puts "authenticate #{@api_key}" + def start_connection_worker + if enabled? + disconnect + logger.info "Starting thread" + @thread = Thread.new do loop do - command_and_args = @queue.pop - begin - test_server_connection - rescue Exception => err - @queue << command_and_args # connection dead, requeue - raise err - end - - if command_and_args == 'exit' - logger.info "exiting, #{@queue.size} commands remain" - @socket.flush - Thread.exit - else - logger.debug "Sending: #{command_and_args.chomp}" - @socket.puts command_and_args - end + break if connection_worker end - rescue Exception => err - logger.error err.to_s - # FIXME: not always a disconnect - @failures += 1 - delay = [(@failures - 1) ** BACKOFF, MAX_RECONNECT_DELAY].min - logger.info "disconnected, reconnect in #{delay}..." - sleep delay - retry end end + end + + def connection_worker + command_and_args = nil + logger.info "connecting to collector" + @socket = TCPSocket.new(host, port) + @failures = 0 + logger.info "connected to collector at #{host}:#{port}" + @socket.puts "hello version #{Instrumental::VERSION} test_mode #{@test_mode}" + @socket.puts "authenticate #{@api_key}" + loop do + command_and_args = @queue.pop + test_connection + + case command_and_args + when 'exit' + logger.info "exiting, #{@queue.size} commands remain" + return true + else + logger.debug "Sending: #{command_and_args.chomp}" + @socket.puts command_and_args + command_and_args = nil + end + end + rescue Exception => err + logger.error err.to_s + if command_and_args + logger.debug "requeueing: #{command_and_args}" + @queue << command_and_args + end + disconnect + @failures += 1 + delay = [(@failures - 1) ** BACKOFF, MAX_RECONNECT_DELAY].min + logger.info "disconnected, reconnect in #{delay}..." + sleep delay + retry + ensure + disconnect + end + + def setup_cleanup_at_exit at_exit do if !@queue.empty? && @thread.alive? if @failures > 0 logger.info "exit received but disconnected, dropping #{@queue.size} commands" @thread.kill @@ -194,8 +219,18 @@ @thread.join end end end end + + def disconnect + if connected? + logger.info "Disconnecting..." + @socket.flush + @socket.close + end + @socket = nil + end + end end