Sha256: e6499eb0eb3740f6ff8464f88f66cc7647a2e55cac502854c1124dea8cde8b8e

Contents?: true

Size: 1.4 KB

Versions: 20

Compression:

Stored size: 1.4 KB

Contents

module Messaging
  # @private
  class ConsumerSupervisor
    include Messaging::Instrumentation

    def initialize
      @threads = Concurrent::Array.new
      Messaging.routes.define_consumers!
    end

    def start
      Concurrent.use_simple_logger
      Messaging.logger.info 'Consumers starting'
      @signal_to_stop = false
      @threads.clear
      @thread_pool = Concurrent::FixedThreadPool.new(consumers.size, auto_terminate: false)

      consumers.each do |consumer|
        @thread_pool.post do
          thread = Thread.current
          thread.abort_on_exception = true
          @threads << thread
          run_consumer(consumer)
        end
      end

      true
    end

    def stop
      return if @signal_to_stop

      instrument('consumer_supervisor.stop') do
        @signal_to_stop = true
        consumers.map { |consumer| Thread.new { consumer.stop } }.join
        @threads.select(&:alive?).each { |thread| thread&.wakeup }
        @thread_pool&.shutdown
        @thread_pool&.wait_for_termination(60)
        Messaging.logger.info 'Consumers stopped'
      end
    end

    def status
      consumers.map(&:log_current_status)
    end

    def consumers
      Messaging.routes.consumers
    end

    private

    def run_consumer(consumer)
      consumer.start
    rescue StandardError => e
      return if @signal_to_stop

      ExceptionHandler.call(e)
      sleep 2
      retry
    end
  end
end

Version data entries

20 entries across 20 versions & 1 rubygems

Version Path
messaging-3.8.2 lib/messaging/consumer_supervisor.rb
messaging-3.8.1 lib/messaging/consumer_supervisor.rb
messaging-3.8.0 lib/messaging/consumer_supervisor.rb
messaging-3.7.3 lib/messaging/consumer_supervisor.rb
messaging-3.7.2 lib/messaging/consumer_supervisor.rb
messaging-3.7.1 lib/messaging/consumer_supervisor.rb
messaging-3.7.0 lib/messaging/consumer_supervisor.rb
messaging-3.6.2 lib/messaging/consumer_supervisor.rb
messaging-3.6.1 lib/messaging/consumer_supervisor.rb
messaging-3.6.0 lib/messaging/consumer_supervisor.rb
messaging-3.5.7 lib/messaging/consumer_supervisor.rb
messaging-3.5.6 lib/messaging/consumer_supervisor.rb
messaging-3.5.5 lib/messaging/consumer_supervisor.rb
messaging-3.5.4 lib/messaging/consumer_supervisor.rb
messaging-3.5.3 lib/messaging/consumer_supervisor.rb
messaging-3.5.2 lib/messaging/consumer_supervisor.rb
messaging-3.5.1 lib/messaging/consumer_supervisor.rb
messaging-3.4.3 lib/messaging/consumer_supervisor.rb
messaging-3.4.2 lib/messaging/consumer_supervisor.rb
messaging-3.4.1 lib/messaging/consumer_supervisor.rb