Sha256: 3259420d6683b52902e4d9438e551c435c382a3e428e4a2780cd04be1a191b8e

Contents?: true

Size: 1.47 KB

Versions: 14

Compression:

Stored size: 1.47 KB

Contents

module Messaging
  # @private
  class ConsumerSupervisor
    include Messaging::Instrumentation

    def initialize
      @threads = Concurrent::Array.new
      Messaging.routes.define_consumers!
    end

    def start
      Concurrent.use_simple_logger
      Messaging.logger.info 'Consumers starting'
      @signal_to_stop = false
      @threads.clear
      @thread_pool = Concurrent::FixedThreadPool.new(consumers.size, auto_terminate: false)

      consumers.each do |consumer|
        @thread_pool.post do
          thread = Thread.current
          thread.abort_on_exception = true
          @threads << thread
          run_consumer(consumer)
        end
      end

      true
    end

    def stop
      return if @signal_to_stop

      instrument('consumer_supervisor.stop') do
        @signal_to_stop = true
        consumers.map { |consumer| Thread.new { consumer.stop } }.join
        @threads.select(&:alive?).each { |thread| thread&.wakeup }
        @thread_pool&.shutdown
        @thread_pool&.wait_for_termination(60)
        Messaging.logger.info 'Consumers stopped'
      end
    end

    def status
      consumers.map(&:log_current_status)
    end

    def consumers
      Messaging.routes.consumers
    end

    private

    def run_consumer(consumer)
      consumer.start
    rescue StandardError => e
      return if @signal_to_stop

      Messaging.logger.error "Consumer #{consumer.name} failed: #{e.message}"
      ExceptionHandler.call(e)
      sleep 2
      retry
    end
  end
end

Version data entries

14 entries across 14 versions & 1 rubygems

Version Path
messaging-4.0.12 lib/messaging/consumer_supervisor.rb
messaging-4.0.11 lib/messaging/consumer_supervisor.rb
messaging-4.0.10 lib/messaging/consumer_supervisor.rb
messaging-4.0.10.pre lib/messaging/consumer_supervisor.rb
messaging-4.0.9 lib/messaging/consumer_supervisor.rb
messaging-4.0.8 lib/messaging/consumer_supervisor.rb
messaging-4.0.7 lib/messaging/consumer_supervisor.rb
messaging-4.0.6 lib/messaging/consumer_supervisor.rb
messaging-4.0.5 lib/messaging/consumer_supervisor.rb
messaging-4.0.4.pre lib/messaging/consumer_supervisor.rb
messaging-4.0.3.pre lib/messaging/consumer_supervisor.rb
messaging-4.0.2.pre lib/messaging/consumer_supervisor.rb
messaging-4.0.1.pre lib/messaging/consumer_supervisor.rb
messaging-4.0.0.pre lib/messaging/consumer_supervisor.rb