Sha256: 6c0b64a92349576e01f785523a1865d7efb10c9b4091dfd7c33690b409562699
Contents?: true
Size: 1.94 KB
Versions: 1
Compression:
Stored size: 1.94 KB
Contents
module Dynamiq class Job attr_reader :queue, :message, :item def initialize(queue, message = nil) @queue = queue @message = message @item = message.is_a?(Hash) ? message : Sidekiq.load_json(message) end def klass @item['class'] end def display_class # Unwrap known wrappers so they show up in a human-friendly manner in the Web UI @klass ||= case klass when /\ASidekiq::Extensions::Delayed/ safe_load(args[0], klass) do |target, method, _| "#{target}.#{method}" end when "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper" args[0] else klass end end def display_args # Unwrap known wrappers so they show up in a human-friendly manner in the Web UI @args ||= case klass when /\ASidekiq::Extensions::Delayed/ safe_load(args[0], args) do |_, _, arg| arg end when "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper" args[1..-1] else args end end def args @item['args'] end def jid @item['jid'] end def score @item['score'] end def enqueued_at Time.at(@item['enqueued_at'] || 0).utc end def latency Time.now.to_f - @item['enqueued_at'] end def acknowledge # nothing to do end def [](name) @item.__send__(:[], name) end def queue_name queue.name end def requeue queue.requeue message end def delete deleted = Sidekiq.redis do |conn| rem_value = @value.dup rem_value.delete 'score' conn.zrem [:dynamic_queue, @queue].join(':'), rem_value.to_json end deleted ? 1 : 0 end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
dynamiq-0.1.0 | lib/dynamiq/job.rb |