lib/instrumental/agent.rb in instrumental_agent-0.1.6 vs lib/instrumental/agent.rb in instrumental_agent-0.2.0
- old
+ new
@@ -1,25 +1,23 @@
require 'instrumental/rack/middleware'
require 'instrumental/version'
-require 'eventmachine'
require 'logger'
+require 'thread'
+require 'socket'
# Sets up a connection to the collector.
#
# Instrumental::Agent.new(API_KEY)
module Instrumental
class Agent
+ BACKOFF = 2.0
+ MAX_RECONNECT_DELAY = 15
+ MAX_BUFFER = 100
+
attr_accessor :host, :port
attr_reader :connection, :enabled
- def self.start_reactor
- unless EM.reactor_running?
- logger.debug 'Starting EventMachine reactor'
- Thread.new { EM.run }
- end
- end
-
def self.logger=(l)
@logger = l
end
def self.logger
@@ -34,73 +32,16 @@
inst = super
all << inst
inst
end
- module ServerConnection
- BACKOFF = 2
- MAX_RECONNECT_DELAY = 5
- MAX_BUFFER = 10
-
- attr_accessor :agent
- attr_reader :connected, :failures, :buffer
-
- def initialize(agent, api_key)
- @agent = agent
- @buffer = []
- @api_key = api_key
- end
-
- def logger
- agent.logger
- end
-
- def connection_completed
- logger.info "connected to collector"
- @connected = true
- @failures = 0
- send_data("hello version #{Instrumental::VERSION}\n")
- send_data("authenticate #{@api_key}\n") if @api_key
- dropped = @buffer.dup
- @buffer = []
- dropped.each do |msg|
- send_data(msg)
- end
- end
-
- def receive_data(data)
- logger.debug "Received: #{data.chomp}"
- end
-
- def send_data(data)
- if @connected
- super
- else
- if @buffer.size < MAX_BUFFER
- @buffer << data
- end
- end
- end
-
- def unbind
- @connected = false
- @failures = @failures.to_i + 1
- delay = [@failures ** BACKOFF / 10.to_f, MAX_RECONNECT_DELAY].min
- logger.info "disconnected, reconnect in #{delay}..."
- EM::Timer.new(delay) do
- reconnect(agent.host, agent.port)
- end
- end
-
- end
-
# Sets up a connection to the collector.
#
# Instrumental::Agent.new(API_KEY)
# Instrumental::Agent.new(API_KEY, :collector => 'hostname:port')
def initialize(api_key, options = {})
- default_options = { :start_reactor => true, :enabled => true }
+ default_options = { :enabled => true }
options = default_options.merge(options)
@api_key = api_key
if options[:collector]
@host, @port = options[:collector].split(':')
@port = (@port || 8000).to_i
@@ -108,19 +49,14 @@
@host = 'instrumentalapp.com'
@port = 8000
end
@enabled = options[:enabled]
-
if @enabled
- if options[:start_reactor]
- self.class.start_reactor
- end
-
- EM.next_tick do
- @connection = EM.connect host, port, ServerConnection, self, api_key
- end
+ @failures = 0
+ @queue = Queue.new
+ start_connection_thread
end
end
# Store a gauge for a metric, optionally at a specific time.
#
@@ -168,15 +104,73 @@
end
def send_command(cmd, *args)
if enabled?
cmd = "%s %s\n" % [cmd, args.collect(&:to_s).join(" ")]
- logger.debug "Sending: #{cmd.chomp}"
- EM.next_tick do
- connection.send_data(cmd)
+ if @queue.size < MAX_BUFFER
+ logger.debug "Queueing: #{cmd.chomp}"
+ @queue << cmd
+ else
+ logger.warn "Dropping command, queue full(#{@queue.size}): #{cmd.chomp}"
end
end
end
+ def test_server_connection
+ # FIXME: Test connection state hack
+ begin
+ @socket.read_nonblock(1) # TODO: put data back?
+ rescue Errno::EAGAIN
+ # nop
+ end
+ end
+ def start_connection_thread
+ @thread = Thread.new do
+ begin
+ @socket = TCPSocket.new(host, port)
+ @failures = 0
+ logger.info "connected to collector"
+ @socket.puts "hello version 0.0"
+ @socket.puts "authenticate #{@api_key}"
+ loop do
+ command_and_args = @queue.pop
+ begin
+ test_server_connection
+ rescue Exception => err
+ @queue << command_and_args # connection dead, requeue
+ raise err
+ end
+
+ if command_and_args == 'exit'
+ logger.info "exiting, #{@queue.size} commands remain"
+ Thread.exit
+ else
+ logger.debug "Sending: #{command_and_args.chomp}"
+ @socket.puts command_and_args
+ end
+ end
+ rescue Exception => err
+ logger.error err.to_s
+ # FIXME: not always a disconnect
+ @failures += 1
+ delay = [(@failures - 1) ** BACKOFF, MAX_RECONNECT_DELAY].min
+ logger.info "disconnected, reconnect in #{delay}..."
+ sleep delay
+ retry
+ end
+ end
+ at_exit do
+ if !@queue.empty? && @thread.alive?
+ if @failures > 0
+ logger.info "exit received but disconnected, dropping #{@queue.size} commands"
+ @thread.kill
+ else
+ logger.info "exit received, #{@queue.size} commands to be sent"
+ @queue << 'exit'
+ @thread.join
+ end
+ end
+ end
+ end
end
end