lib/instrumental/agent.rb in instrumental_agent-0.9.10 vs lib/instrumental/agent.rb in instrumental_agent-0.9.11
- old
+ new
@@ -225,10 +225,33 @@
@thread.kill
@thread = nil
end
end
+ # Called when a process is exiting to give it some extra time to
+ # push events to the service. An at_exit handler is automatically
+ # registered for this method, but can be called manually in cases
+ # where at_exit is bypassed like Resque workers.
+ def cleanup
+ if running?
+ logger.info "Cleaning up agent, queue size: #{@queue.size}, thread running: #{@thread.alive?}"
+ @allow_reconnect = false
+ if @queue.size > 0
+ queue_message('exit')
+ begin
+ with_timeout(EXIT_FLUSH_TIMEOUT) { @thread.join }
+ rescue Timeout::Error
+ if @queue.size > 0
+ logger.error "Timed out working agent thread on exit, dropping #{@queue.size} metrics"
+ else
+ logger.error "Timed out Instrumental Agent, exiting"
+ end
+ end
+ end
+ end
+ end
+
private
def with_timeout(time, &block)
InstrumentalTimeout.timeout(time) { yield }
end
@@ -344,11 +367,11 @@
command_and_args, command_options = @queue.pop
sync_resource = command_options && command_options[:sync_resource]
test_connection
case command_and_args
when 'exit'
- logger.info "exiting, #{@queue.size} commands remain"
+ logger.info "Exiting, #{@queue.size} commands remain"
return true
when 'flush'
release_resource = true
else
logger.debug "Sending: #{command_and_args.chomp}"
@@ -361,10 +384,11 @@
sync_resource.signal
end
end
end
rescue Exception => err
+ logger.debug err.to_s
logger.debug err.backtrace.join("\n")
if @allow_reconnect == false ||
(command_options && command_options[:allow_reconnect] == false)
logger.error "Not trying to reconnect"
return
@@ -383,24 +407,10 @@
disconnect
end
def setup_cleanup_at_exit
at_exit do
- if running?
- logger.info "Cleaning up agent, queue empty: #{@queue.empty?}, thread running: #{@thread.alive?}"
- @allow_reconnect = false
- logger.info "exit received, currently #{@queue.size} commands to be sent"
- queue_message('exit')
- begin
- with_timeout(EXIT_FLUSH_TIMEOUT) { @thread.join }
- rescue Timeout::Error
- if @queue.size > 0
- logger.error "Timed out working agent thread on exit, dropping #{@queue.size} metrics"
- else
- logger.error "Timed out Instrumental Agent, exiting"
- end
- end
- end
+ cleanup
end
end
def running?
!@thread.nil? && @pid == Process.pid