Sha256: e6499eb0eb3740f6ff8464f88f66cc7647a2e55cac502854c1124dea8cde8b8e
Contents?: true
Size: 1.4 KB
Versions: 20
Compression:
Stored size: 1.4 KB
Contents
module Messaging # @private class ConsumerSupervisor include Messaging::Instrumentation def initialize @threads = Concurrent::Array.new Messaging.routes.define_consumers! end def start Concurrent.use_simple_logger Messaging.logger.info 'Consumers starting' @signal_to_stop = false @threads.clear @thread_pool = Concurrent::FixedThreadPool.new(consumers.size, auto_terminate: false) consumers.each do |consumer| @thread_pool.post do thread = Thread.current thread.abort_on_exception = true @threads << thread run_consumer(consumer) end end true end def stop return if @signal_to_stop instrument('consumer_supervisor.stop') do @signal_to_stop = true consumers.map { |consumer| Thread.new { consumer.stop } }.join @threads.select(&:alive?).each { |thread| thread&.wakeup } @thread_pool&.shutdown @thread_pool&.wait_for_termination(60) Messaging.logger.info 'Consumers stopped' end end def status consumers.map(&:log_current_status) end def consumers Messaging.routes.consumers end private def run_consumer(consumer) consumer.start rescue StandardError => e return if @signal_to_stop ExceptionHandler.call(e) sleep 2 retry end end end
Version data entries
20 entries across 20 versions & 1 rubygems