lib/instrumental/agent.rb in instrumental_agent-0.1.6 vs lib/instrumental/agent.rb in instrumental_agent-0.2.0

- old
+ new

@@ -1,25 +1,23 @@ require 'instrumental/rack/middleware' require 'instrumental/version' -require 'eventmachine' require 'logger' +require 'thread' +require 'socket' # Sets up a connection to the collector. # # Instrumental::Agent.new(API_KEY) module Instrumental class Agent + BACKOFF = 2.0 + MAX_RECONNECT_DELAY = 15 + MAX_BUFFER = 100 + attr_accessor :host, :port attr_reader :connection, :enabled - def self.start_reactor - unless EM.reactor_running? - logger.debug 'Starting EventMachine reactor' - Thread.new { EM.run } - end - end - def self.logger=(l) @logger = l end def self.logger @@ -34,73 +32,16 @@ inst = super all << inst inst end - module ServerConnection - BACKOFF = 2 - MAX_RECONNECT_DELAY = 5 - MAX_BUFFER = 10 - - attr_accessor :agent - attr_reader :connected, :failures, :buffer - - def initialize(agent, api_key) - @agent = agent - @buffer = [] - @api_key = api_key - end - - def logger - agent.logger - end - - def connection_completed - logger.info "connected to collector" - @connected = true - @failures = 0 - send_data("hello version #{Instrumental::VERSION}\n") - send_data("authenticate #{@api_key}\n") if @api_key - dropped = @buffer.dup - @buffer = [] - dropped.each do |msg| - send_data(msg) - end - end - - def receive_data(data) - logger.debug "Received: #{data.chomp}" - end - - def send_data(data) - if @connected - super - else - if @buffer.size < MAX_BUFFER - @buffer << data - end - end - end - - def unbind - @connected = false - @failures = @failures.to_i + 1 - delay = [@failures ** BACKOFF / 10.to_f, MAX_RECONNECT_DELAY].min - logger.info "disconnected, reconnect in #{delay}..." - EM::Timer.new(delay) do - reconnect(agent.host, agent.port) - end - end - - end - # Sets up a connection to the collector. # # Instrumental::Agent.new(API_KEY) # Instrumental::Agent.new(API_KEY, :collector => 'hostname:port') def initialize(api_key, options = {}) - default_options = { :start_reactor => true, :enabled => true } + default_options = { :enabled => true } options = default_options.merge(options) @api_key = api_key if options[:collector] @host, @port = options[:collector].split(':') @port = (@port || 8000).to_i @@ -108,19 +49,14 @@ @host = 'instrumentalapp.com' @port = 8000 end @enabled = options[:enabled] - if @enabled - if options[:start_reactor] - self.class.start_reactor - end - - EM.next_tick do - @connection = EM.connect host, port, ServerConnection, self, api_key - end + @failures = 0 + @queue = Queue.new + start_connection_thread end end # Store a gauge for a metric, optionally at a specific time. # @@ -168,15 +104,73 @@ end def send_command(cmd, *args) if enabled? cmd = "%s %s\n" % [cmd, args.collect(&:to_s).join(" ")] - logger.debug "Sending: #{cmd.chomp}" - EM.next_tick do - connection.send_data(cmd) + if @queue.size < MAX_BUFFER + logger.debug "Queueing: #{cmd.chomp}" + @queue << cmd + else + logger.warn "Dropping command, queue full(#{@queue.size}): #{cmd.chomp}" end end end + def test_server_connection + # FIXME: Test connection state hack + begin + @socket.read_nonblock(1) # TODO: put data back? + rescue Errno::EAGAIN + # nop + end + end + def start_connection_thread + @thread = Thread.new do + begin + @socket = TCPSocket.new(host, port) + @failures = 0 + logger.info "connected to collector" + @socket.puts "hello version 0.0" + @socket.puts "authenticate #{@api_key}" + loop do + command_and_args = @queue.pop + begin + test_server_connection + rescue Exception => err + @queue << command_and_args # connection dead, requeue + raise err + end + + if command_and_args == 'exit' + logger.info "exiting, #{@queue.size} commands remain" + Thread.exit + else + logger.debug "Sending: #{command_and_args.chomp}" + @socket.puts command_and_args + end + end + rescue Exception => err + logger.error err.to_s + # FIXME: not always a disconnect + @failures += 1 + delay = [(@failures - 1) ** BACKOFF, MAX_RECONNECT_DELAY].min + logger.info "disconnected, reconnect in #{delay}..." + sleep delay + retry + end + end + at_exit do + if !@queue.empty? && @thread.alive? + if @failures > 0 + logger.info "exit received but disconnected, dropping #{@queue.size} commands" + @thread.kill + else + logger.info "exit received, #{@queue.size} commands to be sent" + @queue << 'exit' + @thread.join + end + end + end + end end end