Sha256: 8429c09f4275ad78d7c896b8b0571bcdb32e3a7df1a6daf48292d8afd664564c
Contents?: true
Size: 1.59 KB
Versions: 6
Compression:
Stored size: 1.59 KB
Contents
require 'sidekiq/actor' require 'sidekiq/manager' require 'sidekiq/fetch' require 'sidekiq/scheduled' module Sidekiq # The Launcher is a very simple Actor whose job is to # start, monitor and stop the core Actors in Sidekiq. # If any of these actors die, the Sidekiq process exits # immediately. class Launcher include Actor include Util trap_exit :actor_died attr_reader :manager, :poller, :fetcher def initialize(options) @manager = Sidekiq::Manager.new_link options @poller = Sidekiq::Scheduled::Poller.new_link @fetcher = Sidekiq::Fetcher.new_link @manager, options @manager.fetcher = @fetcher @done = false @options = options end def actor_died(actor, reason) return if @done Sidekiq.logger.warn("Sidekiq died due to the following error, cannot recover, process exiting") handle_exception(reason) exit(1) end def run watchdog('Launcher#run') do manager.async.start poller.async.poll(true) end end def stop watchdog('Launcher#stop') do @done = true Sidekiq::Fetcher.done! fetcher.terminate if fetcher.alive? poller.terminate if poller.alive? manager.async.stop(:shutdown => true, :timeout => @options[:timeout]) manager.wait(:shutdown) # Requeue everything in case there was a worker who grabbed work while stopped Sidekiq::Fetcher.strategy.bulk_requeue([], @options) end end def procline(tag) $0 = manager.procline(tag) manager.after(5) { procline(tag) } end end end
Version data entries
6 entries across 6 versions & 1 rubygems