lib/sidekiq/web_custom/processor.rb in sidekiq-web_custom-0.4.1 vs lib/sidekiq/web_custom/processor.rb in sidekiq-web_custom-0.5.0
- old
+ new
@@ -2,53 +2,54 @@
module Sidekiq
module WebCustom
class Processor < ::Sidekiq::Processor
- def self.execute(max:, queue:, options: Sidekiq.options)
+ def self.execute(max:, queue:, options: Sidekiq)
__processor__(queue: queue, options: options).__execute(max: max)
end
- def self.execute_job(job:, options: Sidekiq.options)
+ def self.execute_job(job:, options: Sidekiq)
__processor__(queue: job.queue, options: options).__execute_job(job: job)
rescue StandardError => _
false # error gets loggged downstream
end
- def self.__processor__(queue:, options: Sidekiq.options)
- options_temp = options.clone
+ def self.__processor__(queue:, options: Sidekiq)
+ options_temp = options.dup
queue = queue.is_a?(String) ? Sidekiq::Queue.new(queue) : queue
options_temp[:queues] = [queue.name]
klass = options_temp[:fetch]&.class || BasicFetch
options_temp[:fetch] = klass.new(options_temp)
- new(manager: nil, options: options_temp, queue: queue)
+
+ new(options: options_temp, queue: queue)
end
- def initialize(manager:, options:, queue:)
+ def initialize(options:, queue:)
@__queue = queue
@__basic_fetch = options[:fetch].class == BasicFetch
- super(manager, options)
+ super(options)
end
def __execute_job(job:)
queue_name = "queue:#{job.queue}"
work_unit = Sidekiq::BasicFetch::UnitOfWork.new(queue_name, job.item.to_json)
begin
- Sidekiq.logger.info "Manually processing individual work unit for #{work_unit.queue_name}"
+ logger.info "Manually processing individual work unit for #{work_unit.queue_name}"
process(work_unit)
rescue StandardError => e
- Sidekiq.logger.error "Manually processed work unit failed with #{e.message}. Work unit will not be dequeued"
+ logger.error "Manually processed work unit failed with #{e.message}. Work unit will not be dequeued"
raise e
end
begin
job.delete
- Sidekiq.logger.info { "Manually processed work unit sucessfully dequeued." }
+ logger.info { "Manually processed work unit sucessfully dequeued." }
rescue StandardError => e
- Sidekiq.logger.fatal "Manually processed work unit failed to be dequeued. #{e.message}."
+ logger.fatal "Manually processed work unit failed to be dequeued. #{e.message}."
raise e
end
true
end
@@ -57,24 +58,24 @@
count = 0
max.times do
break if @__queue.size <= 0
if Thread.current[Sidekiq::WebCustom::BREAK_BIT]
- Sidekiq.logger.warn "Yikes -- Break bit has been set. Attempting to return in time. Completed #{count} of attempted #{max}"
+ logger.warn "Yikes -- Break bit has been set. Attempting to return in time. Completed #{count} of attempted #{max}"
break
end
- Sidekiq.logger.info { "Manually processing next item in queue:[#{@__queue.name}]" }
+ logger.info { "Manually processing next item in queue:[#{@__queue.name}]" }
process_one
count += 1
-
end
+
count
rescue Exception => ex
if @job && @__basic_fetch
- Sidekiq.logger.fatal "Processor Execution interrupted. Lost Job #{@job.job}"
+ logger.fatal "Processor Execution interrupted. Lost Job #{@job.job}"
end
- Sidekiq.logger.warn "Manual execution has terminated. Received error [#{ex.message}]"
+ logger.warn "Manual execution has terminated. Received error [#{ex.message}]"
return count
end
end
end
end