lib/instrumental/agent.rb in instrumental_agent-3.0.0.beta vs lib/instrumental/agent.rb in instrumental_agent-3.0.0.beta2

- old
+ new

@@ -263,26 +263,26 @@ def cleanup if running? logger.info "Cleaning up agent, aggregator_size: #{@aggregator_queue.size}, thread_running: #{@aggregator_thread.alive?}" logger.info "Cleaning up agent, queue size: #{@sender_queue.size}, thread running: #{@sender_thread.alive?}" @allow_reconnect = false - if @sender_queue.size > 0 || @aggregator_queue.size > 0 - @sender_queue << ['exit'] - @aggregator_queue << ['exit'] - begin - with_timeout(EXIT_FLUSH_TIMEOUT) { @aggregator_thread.join } - with_timeout(EXIT_FLUSH_TIMEOUT) { @sender_thread.join } - rescue Timeout::Error - total_size = @sender_queue&.size.to_i + - @aggregator_queue&.size.to_i + - @event_aggregator&.size.to_i + begin + with_timeout(EXIT_FLUSH_TIMEOUT) do + @aggregator_queue << ['exit'] + @aggregator_thread.join + @sender_queue << ['exit'] + @sender_thread.join + end + rescue Timeout::Error + total_size = @sender_queue&.size.to_i + + @aggregator_queue&.size.to_i + + @event_aggregator&.size.to_i - if total_size > 0 - logger.error "Timed out working agent thread on exit, dropping #{total_size} metrics" - else - logger.error "Timed out Instrumental Agent, exiting" - end + if total_size > 0 + logger.error "Timed out working agent thread on exit, dropping #{total_size} metrics" + else + logger.error "Timed out Instrumental Agent, exiting" end end end end @@ -411,24 +411,35 @@ return if running? return unless enabled? disconnect address = ipv4_address_for_host(@host, @port) if address - @pid = Process.pid + new_pid = if @pid != Process.pid + @pid = Process.pid + true + else + false + end + @sync_mutex = Mutex.new @failures = 0 @sockaddr_in = Socket.pack_sockaddr_in(@port, address) logger.info "Starting aggregator thread" if !@aggregator_thread&.alive? + if new_pid + @event_aggregator = nil + @aggregator_queue = Queue.new + end @aggregator_thread = Thread.new do run_aggregator_loop end end if !@sender_thread&.alive? logger.info "Starting sender thread" + @sender_queue = Queue.new if new_pid @sender_thread = Thread.new do run_sender_loop end end end @@ -490,12 +501,15 @@ rescue Timeout::Error ['forward', {}] end end if command_and_args - sync_resource = command_options && command_options[:sync_resource] case command_and_args when 'exit' + if !@event_aggregator.nil? + @sender_queue << @event_aggregator + @event_aggregator = nil + end logger.info "Exiting, #{@aggregator_queue.size} commands remain" return true when 'flush' if !@event_aggregator.nil? @sender_queue << @event_aggregator