Sha256: 3464cd010866c632c4715247601116fb4abd6b7d636038f63e9e07519a0f69fa
Contents?: true
Size: 1.64 KB
Versions: 1
Compression:
Stored size: 1.64 KB
Contents
require 'sidekiq' require 'sidekiq/util' require 'rekiq/contract' require 'rekiq/scheduler' module Rekiq module Middleware class WorkOverseer include ::Sidekiq::Util def call(worker, msg, queue) return yield unless msg.key?('rq:ctr') @worker = worker @worker_name = worker.class.name @msg = msg @queue = queue @contract = Contract.from_hash(msg['rq:ctr']) set_rekiq_worker_attributes if cancel_worker? return logger.info "worker #{@worker_name} was canceled" end if msg.key?('rq:sdl') msg.delete('rq:sdl') else return yield end begin reschedule unless @contract.schedule_post_work? yield ensure reschedule if @contract.schedule_post_work? end end protected def set_rekiq_worker_attributes @worker.scheduled_work_time = Time.at(@msg['rq:at'].to_f) @worker.estimated_next_work_time = @contract.next_work_time(@worker.scheduled_work_time) end def cancel_worker? @worker.cancel_rekiq_worker?(*@contract.cancel_args) end def reschedule jid, work_time = Rekiq::Scheduler .new(@worker_name, @queue, @msg['args'], @contract) .schedule_next_work(Time.at(@msg['rq:at'].to_f)) unless jid.nil? logger.info "worker #{@worker_name} scheduled for " \ "#{work_time} with jid #{jid}" else logger.info 'recurrence terminated, worker terminated' end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
rekiq-1.2.0 | lib/rekiq/middleware/work_overseer.rb |