Sha256: 3259420d6683b52902e4d9438e551c435c382a3e428e4a2780cd04be1a191b8e
Contents?: true
Size: 1.47 KB
Versions: 14
Compression:
Stored size: 1.47 KB
Contents
module Messaging # @private class ConsumerSupervisor include Messaging::Instrumentation def initialize @threads = Concurrent::Array.new Messaging.routes.define_consumers! end def start Concurrent.use_simple_logger Messaging.logger.info 'Consumers starting' @signal_to_stop = false @threads.clear @thread_pool = Concurrent::FixedThreadPool.new(consumers.size, auto_terminate: false) consumers.each do |consumer| @thread_pool.post do thread = Thread.current thread.abort_on_exception = true @threads << thread run_consumer(consumer) end end true end def stop return if @signal_to_stop instrument('consumer_supervisor.stop') do @signal_to_stop = true consumers.map { |consumer| Thread.new { consumer.stop } }.join @threads.select(&:alive?).each { |thread| thread&.wakeup } @thread_pool&.shutdown @thread_pool&.wait_for_termination(60) Messaging.logger.info 'Consumers stopped' end end def status consumers.map(&:log_current_status) end def consumers Messaging.routes.consumers end private def run_consumer(consumer) consumer.start rescue StandardError => e return if @signal_to_stop Messaging.logger.error "Consumer #{consumer.name} failed: #{e.message}" ExceptionHandler.call(e) sleep 2 retry end end end
Version data entries
14 entries across 14 versions & 1 rubygems