lib/sidekiq/web_custom/processor.rb in sidekiq-web_custom-0.4.1 vs lib/sidekiq/web_custom/processor.rb in sidekiq-web_custom-0.5.0

- old
+ new

@@ -2,53 +2,54 @@ module Sidekiq module WebCustom class Processor < ::Sidekiq::Processor - def self.execute(max:, queue:, options: Sidekiq.options) + def self.execute(max:, queue:, options: Sidekiq) __processor__(queue: queue, options: options).__execute(max: max) end - def self.execute_job(job:, options: Sidekiq.options) + def self.execute_job(job:, options: Sidekiq) __processor__(queue: job.queue, options: options).__execute_job(job: job) rescue StandardError => _ false # error gets loggged downstream end - def self.__processor__(queue:, options: Sidekiq.options) - options_temp = options.clone + def self.__processor__(queue:, options: Sidekiq) + options_temp = options.dup queue = queue.is_a?(String) ? Sidekiq::Queue.new(queue) : queue options_temp[:queues] = [queue.name] klass = options_temp[:fetch]&.class || BasicFetch options_temp[:fetch] = klass.new(options_temp) - new(manager: nil, options: options_temp, queue: queue) + + new(options: options_temp, queue: queue) end - def initialize(manager:, options:, queue:) + def initialize(options:, queue:) @__queue = queue @__basic_fetch = options[:fetch].class == BasicFetch - super(manager, options) + super(options) end def __execute_job(job:) queue_name = "queue:#{job.queue}" work_unit = Sidekiq::BasicFetch::UnitOfWork.new(queue_name, job.item.to_json) begin - Sidekiq.logger.info "Manually processing individual work unit for #{work_unit.queue_name}" + logger.info "Manually processing individual work unit for #{work_unit.queue_name}" process(work_unit) rescue StandardError => e - Sidekiq.logger.error "Manually processed work unit failed with #{e.message}. Work unit will not be dequeued" + logger.error "Manually processed work unit failed with #{e.message}. Work unit will not be dequeued" raise e end begin job.delete - Sidekiq.logger.info { "Manually processed work unit sucessfully dequeued." } + logger.info { "Manually processed work unit sucessfully dequeued." } rescue StandardError => e - Sidekiq.logger.fatal "Manually processed work unit failed to be dequeued. #{e.message}." + logger.fatal "Manually processed work unit failed to be dequeued. #{e.message}." raise e end true end @@ -57,24 +58,24 @@ count = 0 max.times do break if @__queue.size <= 0 if Thread.current[Sidekiq::WebCustom::BREAK_BIT] - Sidekiq.logger.warn "Yikes -- Break bit has been set. Attempting to return in time. Completed #{count} of attempted #{max}" + logger.warn "Yikes -- Break bit has been set. Attempting to return in time. Completed #{count} of attempted #{max}" break end - Sidekiq.logger.info { "Manually processing next item in queue:[#{@__queue.name}]" } + logger.info { "Manually processing next item in queue:[#{@__queue.name}]" } process_one count += 1 - end + count rescue Exception => ex if @job && @__basic_fetch - Sidekiq.logger.fatal "Processor Execution interrupted. Lost Job #{@job.job}" + logger.fatal "Processor Execution interrupted. Lost Job #{@job.job}" end - Sidekiq.logger.warn "Manual execution has terminated. Received error [#{ex.message}]" + logger.warn "Manual execution has terminated. Received error [#{ex.message}]" return count end end end end