lib/instrumental/agent.rb in instrumental_agent-0.3.0 vs lib/instrumental/agent.rb in instrumental_agent-0.4.0
- old
+ new
@@ -18,12 +18,12 @@
def self.logger=(l)
@logger = l
end
- def self.logger
- @logger ||= Logger.new('/dev/null')
+ def self.logger(force = false)
+ @logger ||= Logger.new(File.open('/dev/null', 'a')) # append mode so it's forksafe
end
def self.all
@agents ||= []
end
@@ -50,15 +50,18 @@
@api_key = api_key
@host = collector[0]
@port = (collector[1] || 8000).to_i
@enabled = options[:enabled]
@test_mode = options[:test_mode]
+ @pid = Process.pid
+
if @enabled
@failures = 0
@queue = Queue.new
- start_connection_thread
+ start_connection_worker
+ setup_cleanup_at_exit
end
end
# Store a gauge for a metric, optionally at a specific time.
#
@@ -93,11 +96,11 @@
def enabled?
@enabled
end
def connected?
- connection && connection.connected
+ @socket && !@socket.closed?
end
def logger
self.class.logger
end
@@ -123,10 +126,18 @@
logger.error e.backtrace.join("\n")
end
def send_command(cmd, *args)
if enabled?
+ if @pid != Process.pid
+ logger.info "Detected fork"
+ @pid = Process.pid
+ @socket = nil
+ @queue = Queue.new
+ start_connection_worker
+ end
+
cmd = "%s %s\n" % [cmd, args.collect(&:to_s).join(" ")]
if @queue.size < MAX_BUFFER
logger.debug "Queueing: #{cmd.chomp}"
@queue << cmd
cmd
@@ -135,56 +146,70 @@
nil
end
end
end
- def test_server_connection
+ def test_connection
# FIXME: Test connection state hack
begin
@socket.read_nonblock(1) # TODO: put data back?
rescue Errno::EAGAIN
# nop
end
end
- def start_connection_thread
- logger.info "Starting thread"
- @thread = Thread.new do
- begin
- @socket = TCPSocket.new(host, port)
- @failures = 0
- logger.info "connected to collector"
- @socket.puts "hello version #{Instrumental::VERSION} test_mode #{@test_mode}"
- @socket.puts "authenticate #{@api_key}"
+ def start_connection_worker
+ if enabled?
+ disconnect
+ logger.info "Starting thread"
+ @thread = Thread.new do
loop do
- command_and_args = @queue.pop
- begin
- test_server_connection
- rescue Exception => err
- @queue << command_and_args # connection dead, requeue
- raise err
- end
-
- if command_and_args == 'exit'
- logger.info "exiting, #{@queue.size} commands remain"
- @socket.flush
- Thread.exit
- else
- logger.debug "Sending: #{command_and_args.chomp}"
- @socket.puts command_and_args
- end
+ break if connection_worker
end
- rescue Exception => err
- logger.error err.to_s
- # FIXME: not always a disconnect
- @failures += 1
- delay = [(@failures - 1) ** BACKOFF, MAX_RECONNECT_DELAY].min
- logger.info "disconnected, reconnect in #{delay}..."
- sleep delay
- retry
end
end
+ end
+
+ def connection_worker
+ command_and_args = nil
+ logger.info "connecting to collector"
+ @socket = TCPSocket.new(host, port)
+ @failures = 0
+ logger.info "connected to collector at #{host}:#{port}"
+ @socket.puts "hello version #{Instrumental::VERSION} test_mode #{@test_mode}"
+ @socket.puts "authenticate #{@api_key}"
+ loop do
+ command_and_args = @queue.pop
+ test_connection
+
+ case command_and_args
+ when 'exit'
+ logger.info "exiting, #{@queue.size} commands remain"
+ return true
+ else
+ logger.debug "Sending: #{command_and_args.chomp}"
+ @socket.puts command_and_args
+ command_and_args = nil
+ end
+ end
+ rescue Exception => err
+ logger.error err.to_s
+ if command_and_args
+ logger.debug "requeueing: #{command_and_args}"
+ @queue << command_and_args
+ end
+ disconnect
+ @failures += 1
+ delay = [(@failures - 1) ** BACKOFF, MAX_RECONNECT_DELAY].min
+ logger.info "disconnected, reconnect in #{delay}..."
+ sleep delay
+ retry
+ ensure
+ disconnect
+ end
+
+ def setup_cleanup_at_exit
at_exit do
if !@queue.empty? && @thread.alive?
if @failures > 0
logger.info "exit received but disconnected, dropping #{@queue.size} commands"
@thread.kill
@@ -194,8 +219,18 @@
@thread.join
end
end
end
end
+
+ def disconnect
+ if connected?
+ logger.info "Disconnecting..."
+ @socket.flush
+ @socket.close
+ end
+ @socket = nil
+ end
+
end
end