lib/instrumental/agent.rb in instrumental_agent-0.8.3 vs lib/instrumental/agent.rb in instrumental_agent-0.9.0
- old
+ new
@@ -1,21 +1,28 @@
require 'instrumental/rack/middleware'
require 'instrumental/version'
require 'logger'
require 'thread'
require 'socket'
-require 'timeout'
+if RUBY_VERSION < "1.9"
+ require 'system_timer'
+else
+ require 'timeout'
+end
+
# Sets up a connection to the collector.
#
# Instrumental::Agent.new(API_KEY)
module Instrumental
class Agent
BACKOFF = 2.0
MAX_RECONNECT_DELAY = 15
MAX_BUFFER = 5000
REPLY_TIMEOUT = 10
+ CONNECT_TIMEOUT = 20
+ EXIT_FLUSH_TIMEOUT = 5
attr_accessor :host, :port, :synchronous, :queue
attr_reader :connection, :enabled
def self.logger=(l)
@@ -58,16 +65,18 @@
@host = collector[0]
@port = (collector[1] || 8000).to_i
@enabled = options[:enabled]
@test_mode = options[:test_mode]
@synchronous = options[:synchronous]
+ @allow_reconnect = true
@pid = Process.pid
if @enabled
@failures = 0
@queue = Queue.new
+ @sync_mutex = Mutex.new
start_connection_worker
setup_cleanup_at_exit
end
end
@@ -134,11 +143,11 @@
nil
end
# Send a notice to the server (deploys, downtime, etc.)
#
- # agent.notice('A notice')
+ # agent.notice('A notice')
def notice(note, time = Time.now, duration = 0)
if valid_note?(note)
send_command("notice", time.to_i, duration.to_i, note)
note
else
@@ -147,10 +156,23 @@
rescue Exception => e
report_exception(e)
nil
end
+ # Synchronously flush all pending metrics out to the server
+ # By default will not try to reconnect to the server if a
+ # connection failure happens during the flush, though you
+ # may optionally override this behavior by passing true.
+ #
+ # agent.flush
+ def flush(allow_reconnect = false)
+ queue_message('flush', {
+ :synchronous => true,
+ :allow_reconnect => allow_reconnect
+ })
+ end
+
def enabled?
@enabled
end
def connected?
@@ -165,10 +187,15 @@
@logger ||= self.class.logger
end
private
+ def with_timeout(time, &block)
+ tmr_klass = RUBY_VERSION < "1.9" ? SystemTimer : Timeout
+ tmr_klass.timeout(time) { yield }
+ end
+
def valid_note?(note)
note !~ /[\n\r]/
end
def valid?(metric, value, time)
@@ -205,80 +232,108 @@
@socket = nil
@queue = Queue.new
start_connection_worker
end
- cmd = "%s %s\n" % [cmd, args.collect { |v| v.to_s }.join(" ")]
+ cmd = "%s %s\n" % [cmd, args.collect { |a| a.to_s }.join(" ")]
if @queue.size < MAX_BUFFER
logger.debug "Queueing: #{cmd.chomp}"
- @main_thread = Thread.current if @synchronous
- @queue << cmd
- Thread.stop if @synchronous
- cmd
+ queue_message(cmd, { :synchronous => @synchronous })
else
logger.warn "Dropping command, queue full(#{@queue.size}): #{cmd.chomp}"
nil
end
end
end
+ def queue_message(message, options = {})
+ if @enabled
+ options ||= {}
+ if options[:allow_reconnect].nil?
+ options[:allow_reconnect] = @allow_reconnect
+ end
+ synchronous = options.delete(:synchronous)
+ if synchronous
+ options[:sync_resource] ||= ConditionVariable.new
+ @queue << [message, options]
+ @sync_mutex.synchronize {
+ options[:sync_resource].wait(@sync_mutex)
+ }
+ else
+ @queue << [message, options]
+ end
+ end
+ message
+ end
+
def test_connection
# FIXME: Test connection state hack
begin
@socket.read_nonblock(1) # TODO: put data back?
rescue Errno::EAGAIN
- # nop
+ # noop
end
end
def start_connection_worker
if enabled?
disconnect
logger.info "Starting thread"
@thread = Thread.new do
- loop do
- break if connection_worker
- end
+ run_worker_loop
end
end
end
def send_with_reply_timeout(message)
@socket.puts message
- Timeout.timeout(REPLY_TIMEOUT) do
+ with_timeout(REPLY_TIMEOUT) do
response = @socket.gets
if response.to_s.chomp != "ok"
raise "Bad Response #{response.inspect} to #{message.inspect}"
end
end
end
- def connection_worker
+ def run_worker_loop
command_and_args = nil
+ command_options = nil
logger.info "connecting to collector"
- @socket = TCPSocket.new(host, port)
+ @socket = with_timeout(CONNECT_TIMEOUT) { TCPSocket.new(host, port) }
logger.info "connected to collector at #{host}:#{port}"
send_with_reply_timeout "hello version #{Instrumental::VERSION} test_mode #{@test_mode}"
send_with_reply_timeout "authenticate #{@api_key}"
@failures = 0
loop do
- command_and_args = @queue.pop
+ command_and_args, command_options = @queue.pop
+ sync_resource = command_options && command_options[:sync_resource]
test_connection
-
case command_and_args
when 'exit'
logger.info "exiting, #{@queue.size} commands remain"
return true
+ when 'flush'
+ release_resource = true
else
logger.debug "Sending: #{command_and_args.chomp}"
@socket.puts command_and_args
- command_and_args = nil
end
- @main_thread.run if @synchronous
+ command_and_args = nil
+ command_options = nil
+ if sync_resource
+ @sync_mutex.synchronize do
+ sync_resource.signal
+ end
+ end
end
rescue Exception => err
- logger.error err.to_s
+ logger.debug err.backtrace.join("\n")
+ if @allow_reconnect == false ||
+ (command_options && command_options[:allow_reconnect] == false)
+ logger.error "Not trying to reconnect"
+ return
+ end
if command_and_args
logger.debug "requeueing: #{command_and_args}"
@queue << command_and_args
end
disconnect
@@ -291,19 +346,23 @@
disconnect
end
def setup_cleanup_at_exit
at_exit do
- if !@queue.empty? && @thread.alive?
- if @failures > 0
- logger.info "exit received but disconnected, dropping #{@queue.size} commands"
- @thread.kill
+ logger.info "Cleaning up agent, queue empty: #{@queue.empty?}, thread running: #{@thread.alive?}"
+ @allow_reconnect = false
+ logger.info "exit received, currently #{@queue.size} commands to be sent"
+ queue_message('exit')
+ begin
+ with_timeout(EXIT_FLUSH_TIMEOUT) { @thread.join }
+ rescue Timeout::Error
+ if @queue.size > 0
+ logger.error "Timed out working agent thread on exit, dropping #{@queue.size} metrics"
else
- logger.info "exit received, #{@queue.size} commands to be sent"
- @queue << 'exit'
- @thread.join
+ logger.error "Timed out Instrumental Agent, exiting"
end
end
+
end
end
def disconnect
if connected?