Sha256: 01cd6ff054b34221ae75b320cf623e30518878d755fed61d6697b377ed36f244
Contents?: true
Size: 1.83 KB
Versions: 3
Compression:
Stored size: 1.83 KB
Contents
module Sidekiq module Monitor class Processor def queue(worker_class, item, queue) args = item['args'] name = job_name(worker_class, args) Sidekiq::Monitor::Job.find_or_create_by_jid( jid: item['jid'], queue: queue, class_name: worker_class.name, args: args, retry: item['retry'], enqueued_at: DateTime.now, status: 'queued', name: name ) end def start(worker, msg, queue) jid = msg['jid'] args = msg['args'] job = Sidekiq::Monitor::Job.find_by_jid(jid) if job.blank? name = job_name(worker.class, args) job = Sidekiq::Monitor::Job.new( jid: jid, queue: queue, class_name: worker.class.name, args: args, retry: msg['retry'], name: name ) end job.update_attributes( started_at: DateTime.now, status: 'running' ) end def error(worker, msg, queue, exception) result = { message: exception.message, backtrace: exception.backtrace } job = find_job(msg) job.update_attributes( finished_at: DateTime.now, status: 'failed', result: result ) end def complete(worker, msg, queue, return_value) job = find_job(msg) job.update_attributes( finished_at: DateTime.now, status: 'complete', result: (return_value if return_value.is_a?(Hash)) ) end protected def find_job(msg) Sidekiq::Monitor::Job.find_by_jid(msg['jid']) end def job_name(worker_class, args) worker_class.respond_to?(:job_name) ? worker_class.job_name(*args) : nil end end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
sidekiq_monitor-0.0.3 | lib/sidekiq/monitor/processor.rb |
sidekiq_monitor-0.0.2 | lib/sidekiq/monitor/processor.rb |
sidekiq_monitor-0.0.1 | lib/sidekiq/monitor/processor.rb |