lib/instrumental/agent.rb in instrumental_agent-3.0.0.beta vs lib/instrumental/agent.rb in instrumental_agent-3.0.0.beta2
- old
+ new
@@ -263,26 +263,26 @@
def cleanup
if running?
logger.info "Cleaning up agent, aggregator_size: #{@aggregator_queue.size}, thread_running: #{@aggregator_thread.alive?}"
logger.info "Cleaning up agent, queue size: #{@sender_queue.size}, thread running: #{@sender_thread.alive?}"
@allow_reconnect = false
- if @sender_queue.size > 0 || @aggregator_queue.size > 0
- @sender_queue << ['exit']
- @aggregator_queue << ['exit']
- begin
- with_timeout(EXIT_FLUSH_TIMEOUT) { @aggregator_thread.join }
- with_timeout(EXIT_FLUSH_TIMEOUT) { @sender_thread.join }
- rescue Timeout::Error
- total_size = @sender_queue&.size.to_i +
- @aggregator_queue&.size.to_i +
- @event_aggregator&.size.to_i
+ begin
+ with_timeout(EXIT_FLUSH_TIMEOUT) do
+ @aggregator_queue << ['exit']
+ @aggregator_thread.join
+ @sender_queue << ['exit']
+ @sender_thread.join
+ end
+ rescue Timeout::Error
+ total_size = @sender_queue&.size.to_i +
+ @aggregator_queue&.size.to_i +
+ @event_aggregator&.size.to_i
- if total_size > 0
- logger.error "Timed out working agent thread on exit, dropping #{total_size} metrics"
- else
- logger.error "Timed out Instrumental Agent, exiting"
- end
+ if total_size > 0
+ logger.error "Timed out working agent thread on exit, dropping #{total_size} metrics"
+ else
+ logger.error "Timed out Instrumental Agent, exiting"
end
end
end
end
@@ -411,24 +411,35 @@
return if running?
return unless enabled?
disconnect
address = ipv4_address_for_host(@host, @port)
if address
- @pid = Process.pid
+ new_pid = if @pid != Process.pid
+ @pid = Process.pid
+ true
+ else
+ false
+ end
+
@sync_mutex = Mutex.new
@failures = 0
@sockaddr_in = Socket.pack_sockaddr_in(@port, address)
logger.info "Starting aggregator thread"
if !@aggregator_thread&.alive?
+ if new_pid
+ @event_aggregator = nil
+ @aggregator_queue = Queue.new
+ end
@aggregator_thread = Thread.new do
run_aggregator_loop
end
end
if !@sender_thread&.alive?
logger.info "Starting sender thread"
+ @sender_queue = Queue.new if new_pid
@sender_thread = Thread.new do
run_sender_loop
end
end
end
@@ -490,12 +501,15 @@
rescue Timeout::Error
['forward', {}]
end
end
if command_and_args
- sync_resource = command_options && command_options[:sync_resource]
case command_and_args
when 'exit'
+ if !@event_aggregator.nil?
+ @sender_queue << @event_aggregator
+ @event_aggregator = nil
+ end
logger.info "Exiting, #{@aggregator_queue.size} commands remain"
return true
when 'flush'
if !@event_aggregator.nil?
@sender_queue << @event_aggregator