require 'timeout' require 'active_support/dependencies' require 'active_support/core_ext/numeric/time' require 'active_support/core_ext/class/attribute_accessors' require 'active_support/hash_with_indifferent_access' require 'active_support/core_ext/hash/indifferent_access' require 'logger' require 'benchmark' module Delayed class Worker # rubocop:disable ClassLength DEFAULT_LOG_LEVEL = 'info'.freeze DEFAULT_SLEEP_DELAY = 5 DEFAULT_MAX_ATTEMPTS = 25 DEFAULT_MAX_RUN_TIME = 4.hours DEFAULT_DEFAULT_PRIORITY = 0 DEFAULT_DELAY_JOBS = true DEFAULT_QUEUES = [].freeze DEFAULT_QUEUE_ATTRIBUTES = DEFAULT_READ_AHEAD = 5 cattr_accessor :min_priority, :max_priority, :max_attempts, :max_run_time, :default_priority, :sleep_delay, :logger, :delay_jobs, :queues, :read_ahead, :plugins, :destroy_failed_jobs, :exit_on_complete, :default_log_level # Named queue into which jobs are enqueued by default cattr_accessor :default_queue_name cattr_reader :backend, :queue_attributes # name_prefix is ignored if name is set directly attr_accessor :name_prefix def self.reset self.default_log_level = DEFAULT_LOG_LEVEL self.sleep_delay = DEFAULT_SLEEP_DELAY self.max_attempts = DEFAULT_MAX_ATTEMPTS self.max_run_time = DEFAULT_MAX_RUN_TIME self.default_priority = DEFAULT_DEFAULT_PRIORITY self.delay_jobs = DEFAULT_DELAY_JOBS self.queues = DEFAULT_QUEUES self.queue_attributes = DEFAULT_QUEUE_ATTRIBUTES self.read_ahead = DEFAULT_READ_AHEAD @lifecycle = nil end # Add or remove plugins in this list before the worker is instantiated self.plugins = [Delayed::Plugins::ClearLocks] # By default failed jobs are destroyed after too many attempts. If you want to keep them around # (perhaps to inspect the reason for the failure), set this to false. self.destroy_failed_jobs = true # By default, Signals INT and TERM set @exit, and the worker exits upon completion of the current job. # If you would prefer to raise a SignalException and exit immediately you can use this. # Be aware daemons uses TERM to stop and restart # false - No exceptions will be raised # :term - Will only raise an exception on TERM signals but INT will wait for the current job to finish # true - Will raise an exception on TERM and INT cattr_accessor :raise_signal_exceptions self.raise_signal_exceptions = false def self.backend=(backend) if backend.is_a? Symbol require "delayed/serialization/#{backend}" require "delayed/backend/#{backend}" backend = "Delayed::Backend::#{backend.to_s.classify}::Job".constantize end @@backend = backend # rubocop:disable ClassVars silence_warnings { ::Delayed.const_set(:Job, backend) } end # rubocop:disable ClassVars def self.queue_attributes=(val) @@queue_attributes = val.with_indifferent_access end def self.guess_backend warn '[DEPRECATION] guess_backend is deprecated. Please remove it from your code.' end def self.before_fork unless @files_to_reopen @files_to_reopen = [] ObjectSpace.each_object(File) do |file| @files_to_reopen << file unless file.closed? end end backend.before_fork end def self.after_fork # Re-open file handles @files_to_reopen.each do |file| begin file.reopen file.path, 'a+' file.sync = true rescue ::Exception # rubocop:disable HandleExceptions, RescueException end end backend.after_fork end def self.lifecycle # In case a worker has not been set up, job enqueueing needs a lifecycle. setup_lifecycle unless @lifecycle @lifecycle end def self.setup_lifecycle @lifecycle = plugins.each { |klass| } end def self.reload_app? defined?(ActionDispatch::Reloader) && Rails.application.config.cache_classes == false end def self.delay_job?(job) if delay_jobs.is_a?(Proc) delay_jobs.arity == 1 ? : else delay_jobs end end def initialize(options = {}) @quiet = options.key?(:quiet) ? options[:quiet] : true @failed_reserve_count = 0 [:min_priority, :max_priority, :sleep_delay, :read_ahead, :queues, :exit_on_complete].each do |option| self.class.send("#{option}=", options[option]) if options.key?(option) end # Reset lifecycle on the offhand chance that something lazily # triggered its creation before all plugins had been registered. self.class.setup_lifecycle end # Every worker has a unique name which by default is the pid of the process. There are some # advantages to overriding this with something which survives worker restarts: Workers can # safely resume working on tasks which are locked by themselves. The worker will assume that # it crashed before. def name return @name unless @name.nil? "#{@name_prefix}host:#{Socket.gethostname} pid:#{}" rescue "#{@name_prefix}pid:#{}" end # Sets the name of the worker. # Setting the name to nil will reset the default worker name attr_writer :name def start # rubocop:disable CyclomaticComplexity, PerceivedComplexity trap('TERM') do { say 'Exiting...' } stop raise SignalException, 'TERM' if self.class.raise_signal_exceptions end trap('INT') do { say 'Exiting...' } stop raise SignalException, 'INT' if self.class.raise_signal_exceptions && self.class.raise_signal_exceptions != :term end say 'Starting job worker' self.class.lifecycle.run_callbacks(:execute, self) do loop do self.class.lifecycle.run_callbacks(:loop, self) do @realtime = Benchmark.realtime do @result = work_off end end count = @result[0] + @result[1] if if self.class.exit_on_complete say 'No more jobs available. Exiting' break elsif !stop? sleep(self.class.sleep_delay) reload! end else say format("#{count} jobs processed at %.4f j/s, %d failed", count / @realtime, @result.last) end break if stop? end end end def stop @exit = true end def stop? !!@exit end # Do num jobs and return stats on success/failure. # Exit early if interrupted. def work_off(num = 100) success = 0 failure = 0 num.times do case reserve_and_run_one_job when true success += 1 when false failure += 1 else break # leave if no work could be done end break if stop? # leave if we're exiting end [success, failure] end def run(job) job_say job, 'RUNNING' runtime = Benchmark.realtime do Timeout.timeout(max_run_time(job).to_i, WorkerTimeout) { job.invoke_job } job.destroy end job_say job, format('COMPLETED after %.4f', runtime) return true # did work rescue DeserializationError => error job_say job, "FAILED permanently with #{}: #{error.message}", 'error' job.error = error failed(job) rescue Exception => error # rubocop:disable RescueException self.class.lifecycle.run_callbacks(:error, self, job) { handle_failed_job(job, error) } return false # work failed end # Reschedule the job in the future (when a job fails). # Uses an exponential scale depending on the number of failed attempts. def reschedule(job, time = nil) if (job.attempts += 1) < max_attempts(job) time ||= job.reschedule_at job.run_at = time job.unlock! else job_say job, "FAILED permanently because of #{job.attempts} consecutive failures", 'error' failed(job) end end def failed(job) self.class.lifecycle.run_callbacks(:failure, self, job) do begin job.hook(:failure) rescue => error say "Error when running failure callback: #{error}", 'error' say error.backtrace.join("\n"), 'error' ensure job.destroy_failed_jobs? ? job.destroy :! end end end def job_say(job, text, level = default_log_level) text = "Job #{} (id=#{})#{say_queue(job.queue)} #{text}" say text, level end def say(text, level = default_log_level) text = "[Worker(#{name})] #{text}" puts text unless @quiet return unless logger # TODO: Deprecate use of Fixnum log levels unless level.is_a?(String) level = Logger::Severity.constants.detect { |i| Logger::Severity.const_get(i) == level }.to_s.downcase end logger.send(level, "#{'%FT%T%z')}: #{text}") end def max_attempts(job) job.max_attempts || self.class.max_attempts end def max_run_time(job) job.max_run_time || self.class.max_run_time end protected def say_queue(queue) " (queue=#{queue})" if queue end def handle_failed_job(job, error) job.error = error job_say job, "FAILED (#{job.attempts} prior attempts) with #{}: #{error.message}", 'error' reschedule(job) end # Run the next job we can get an exclusive lock on. # If no jobs are left we return nil def reserve_and_run_one_job job = reserve_job self.class.lifecycle.run_callbacks(:perform, self, job) { run(job) } if job end def reserve_job job = Delayed::Job.reserve(self) @failed_reserve_count = 0 job rescue ::Exception => error # rubocop:disable RescueException say "Error while reserving job: #{error}" Delayed::Job.recover_from(error) @failed_reserve_count += 1 raise FatalBackendError if @failed_reserve_count >= 10 nil end def reload! return unless self.class.reload_app? if defined?(ActiveSupport::Reloader) Rails.application.reloader.reload! else ActionDispatch::Reloader.cleanup! ActionDispatch::Reloader.prepare! end end end end Delayed::Worker.reset