lib/shoryuken/launcher.rb in shoryuken-3.0.11 vs lib/shoryuken/launcher.rb in shoryuken-3.1.0
- old
+ new
@@ -1,19 +1,98 @@
module Shoryuken
class Launcher
include Util
def initialize
- @manager = Shoryuken::Manager.new(Shoryuken::Fetcher.new,
- Shoryuken.options[:polling_strategy].new(Shoryuken.queues))
+ @managers = create_managers
+ @shutdowning = Concurrent::AtomicBoolean.new(false)
end
- def stop(options = {})
- @manager.stop(shutdown: !options[:shutdown].nil?,
- timeout: Shoryuken.options[:timeout])
+ def start
+ logger.info { 'Starting' }
+
+ start_callback
+ start_managers
end
- def run
- @manager.start
+ def stop!
+ initiate_stop
+
+ executor.shutdown
+
+ return if executor.wait_for_termination(Shoryuken.options[:timeout])
+
+ executor.kill
+ end
+
+ def stop
+ fire_event(:quiet, true)
+
+ initiate_stop
+
+ executor.shutdown
+ executor.wait_for_termination
+ end
+
+ private
+
+ def executor
+ Concurrent.global_io_executor
+ end
+
+ def start_managers
+ @managers.each do |manager|
+ Concurrent::Promise.execute { manager.start }.rescue do |ex|
+ log_manager_failure(ex)
+ start_soft_shutdown
+ end
+ end
+ end
+
+ def start_soft_shutdown
+ Process.kill('USR1', Process.pid) if @shutdowning.make_true
+ end
+
+ def log_manager_failure(ex)
+ return unless ex
+
+ logger.error { "Manager failed: #{ex.message}" }
+ logger.error { ex.backtrace.join("\n") } unless ex.backtrace.nil?
+ end
+
+ def initiate_stop
+ logger.info { 'Shutting down' }
+
+ @managers.each(&:stop)
+
+ stop_callback
+ end
+
+ def start_callback
+ if (callback = Shoryuken.start_callback)
+ logger.debug { 'Calling start_callback' }
+ callback.call
+ end
+
+ fire_event(:startup)
+ end
+
+ def stop_callback
+ if (callback = Shoryuken.stop_callback)
+ logger.debug { 'Calling stop_callback' }
+ callback.call
+ end
+
+ fire_event(:shutdown, true)
+ end
+
+ def create_managers
+ Shoryuken.groups.map do |group, options|
+ Shoryuken::Manager.new(
+ Shoryuken::Fetcher.new(group),
+ Shoryuken.polling_strategy(group).new(options[:queues]),
+ options[:concurrency]
+ )
+ end
end
end
end