Sha256: 866120626a023919e3caa6b92e8a33b9fa8ef497698802ababa6ea266f661174

Contents?: true

Size: 1.95 KB

Versions: 3

Compression:

Stored size: 1.95 KB

Contents

module Shoryuken
  class Launcher
    include Util

    def initialize
      @managers = create_managers
    end

    def start
      logger.info { 'Starting' }

      start_callback
      start_managers
    end

    def stop!
      initiate_stop

      executor.shutdown

      return if executor.wait_for_termination(Shoryuken.options[:timeout])

      executor.kill
    end

    def stop
      fire_event(:quiet, true)

      initiate_stop

      stop_new_dispatching
      await_dispatching_in_progress

      executor.shutdown
      executor.wait_for_termination
    end

    def healthy?
      Shoryuken.groups.keys.all? do |group|
        manager = @managers.find { |m| m.group == group }
        manager && manager.running?
      end
    end

    private

    def stop_new_dispatching
      @managers.each(&:stop_new_dispatching)
    end

    def await_dispatching_in_progress
      @managers.each(&:await_dispatching_in_progress)
    end

    def executor
      @_executor ||= Shoryuken.launcher_executor || Concurrent.global_io_executor
    end

    def start_managers
      @managers.each do |manager|
        Concurrent::Future.execute { manager.start }
      end
    end

    def initiate_stop
      logger.info { 'Shutting down' }

      stop_callback
    end

    def start_callback
      if (callback = Shoryuken.start_callback)
        logger.debug { 'Calling start_callback' }
        callback.call
      end

      fire_event(:startup)
    end

    def stop_callback
      if (callback = Shoryuken.stop_callback)
        logger.debug { 'Calling stop_callback' }
        callback.call
      end

      fire_event(:shutdown, true)
    end

    def create_managers
      Shoryuken.groups.map do |group, options|
        Shoryuken::Manager.new(
          group,
          Shoryuken::Fetcher.new(group),
          Shoryuken.polling_strategy(group).new(options[:queues], Shoryuken.delay(group)),
          options[:concurrency],
          executor
        )
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
shoryuken-6.0.0 lib/shoryuken/launcher.rb
shoryuken-5.3.2 lib/shoryuken/launcher.rb
shoryuken-5.3.1 lib/shoryuken/launcher.rb