lib/qless/job.rb in qless-0.9.2 vs lib/qless/job.rb in qless-0.9.3

- old
+ new

@@ -1,34 +1,53 @@ require "qless" require "qless/queue" -require "qless/lua" require "redis" require "json" module Qless class BaseJob + attr_reader :client + def initialize(client, jid) @client = client @jid = jid end def klass - @klass ||= @klass_name.split('::').inject(Kernel) { |context, name| context.const_get(name) } + @klass ||= @klass_name.split('::').inject(Object) { |context, name| context.const_get(name) } end def queue @queue ||= Queue.new(@queue_name, @client) end end class Job < BaseJob - attr_reader :jid, :expires_at, :state, :queue_name, :history, :worker_name, :failure, :klass_name, :tracked, :dependencies, :dependents - attr_reader :original_retries, :retries_left + attr_reader :jid, :expires_at, :state, :queue_name, :worker_name, :failure, :klass_name, :tracked, :dependencies, :dependents + attr_reader :original_retries, :retries_left, :raw_queue_history attr_accessor :data, :priority, :tags + MiddlewareMisconfiguredError = Class.new(StandardError) + + module SupportsMiddleware + def around_perform(job) + perform(job) + end + end + def perform - klass.perform(self) + middlewares = Job.middlewares_on(klass) + + if middlewares.last == SupportsMiddleware + klass.around_perform(self) + elsif middlewares.any? + raise MiddlewareMisconfiguredError, "The middleware chain for #{klass} " + + "(#{middlewares.inspect}) is misconfigured. Qless::Job::SupportsMiddleware " + + "must be extended onto your job class first if you want to use any middleware." + else + klass.perform(self) + end end def self.build(client, klass, attributes = {}) defaults = { "jid" => Qless.generate_jid, @@ -47,33 +66,42 @@ "history" => [], "dependencies" => [], "dependents" => [] } attributes = defaults.merge(Qless.stringify_hash_keys(attributes)) - attributes["data"] = JSON.load(JSON.dump attributes["data"]) + attributes["data"] = JSON.parse(JSON.dump attributes["data"]) new(client, attributes) end + def self.middlewares_on(job_klass) + job_klass.singleton_class.ancestors.select do |ancestor| + ancestor.method_defined?(:around_perform) + end + end + def initialize(client, atts) super(client, atts.fetch('jid')) %w{jid data priority tags state tracked - failure history dependencies dependents}.each do |att| + failure dependencies dependents}.each do |att| self.instance_variable_set("@#{att}".to_sym, atts.fetch(att)) end - @expires_at = atts.fetch('expires') - @klass_name = atts.fetch('klass') - @queue_name = atts.fetch('queue') - @worker_name = atts.fetch('worker') - @original_retries = atts.fetch('retries') - @retries_left = atts.fetch('remaining') + @expires_at = atts.fetch('expires') + @klass_name = atts.fetch('klass') + @queue_name = atts.fetch('queue') + @worker_name = atts.fetch('worker') + @original_retries = atts.fetch('retries') + @retries_left = atts.fetch('remaining') + @raw_queue_history = atts.fetch('history') # This is a silly side-effect of Lua doing JSON parsing @tags = [] if @tags == {} @dependents = [] if @dependents == {} @dependencies = [] if @dependencies == {} @state_changed = false + @before_callbacks = Hash.new { |h, k| h[k] = [] } + @after_callbacks = Hash.new { |h, k| h[k] = [] } end def priority=(priority) if @client._priority.call([], [@jid, priority]) @priority = priority @@ -91,33 +119,81 @@ def to_s inspect end def description - "#{@jid} (#{@klass_name} / #{@queue_name})" + "#{@klass_name} (#{@jid} / #{@queue_name} / #{@state})" end def inspect "<Qless::Job #{description}>" end def ttl @expires_at - Time.now.to_f end + def reconnect_to_redis + @client.redis.client.reconnect + end + + def history + warn "WARNING: Qless::Job#history is deprecated; use Qless::Job#raw_queue_history instead" + + "; called from:\n#{caller.first}\n" + raw_queue_history + end + + def queue_history + @queue_history ||= @raw_queue_history.map do |history_event| + history_event.each_with_object({}) do |(key, value), hash| + # The only Numeric (Integer or Float) values we get in the history are timestamps + hash[key] = if value.is_a?(Numeric) + Time.at(value).utc + else + value + end + end + end + end + + def initially_put_at + @initially_put_at ||= history_timestamp('put', :min) + end + + def to_hash + { + jid: jid, + expires_at: expires_at, + state: state, + queue_name: queue_name, + history: raw_queue_history, + worker_name: worker_name, + failure: failure, + klass_name: klass_name, + tracked: tracked, + dependencies: dependencies, + dependents: dependents, + original_retries: original_retries, + retries_left: retries_left, + data: data, + priority: priority, + tags: tags + } + end + # Move this from it's current queue into another def move(queue) - note_state_change do + note_state_change :move do @client._put.call([queue], [ @jid, @klass_name, JSON.generate(@data), Time.now.to_f, 0 ]) end end # Fail a job def fail(group, message) - note_state_change do + note_state_change :fail do @client._fail.call([], [ @jid, @worker_name, group, message, Time.now.to_f, @@ -132,34 +208,47 @@ @worker_name, Time.now.to_f, JSON.generate(@data)]) || false end + CantCompleteError = Class.new(Qless::Error) + # Complete a job # Options include # => next (String) the next queue # => delay (int) how long to delay it in the next queue def complete(nxt=nil, options={}) - response = note_state_change do - if nxt.nil? + note_state_change :complete do + response = if nxt.nil? @client._complete.call([], [ @jid, @worker_name, @queue_name, Time.now.to_f, JSON.generate(@data)]) else @client._complete.call([], [ @jid, @worker_name, @queue_name, Time.now.to_f, JSON.generate(@data), 'next', nxt, 'delay', options.fetch(:delay, 0), 'depends', JSON.generate(options.fetch(:depends, []))]) end + + if response + response + else + description = if reloaded_instance = @client.jobs[@jid] + reloaded_instance.description + else + self.description + " -- can't be reloaded" + end + + raise CantCompleteError, "Failed to complete #{description}" + end end - response.nil? ? false : response end def state_changed? @state_changed end def cancel - note_state_change do + note_state_change :cancel do @client._cancel.call([], [@jid]) end end def track() @@ -177,11 +266,11 @@ def untag(*tags) @client._tag.call([], ['remove', @jid, Time.now.to_f] + tags) end def retry(delay=0) - note_state_change do + note_state_change :retry do results = @client._retry.call([], [@jid, @queue_name, @worker_name, Time.now.to_f, delay]) results.nil? ? false : results end end @@ -191,15 +280,31 @@ def undepend(*jids) !!@client._depends.call([], [@jid, 'off'] + jids) end + [:fail, :complete, :cancel, :move, :retry].each do |event| + define_method :"before_#{event}" do |&block| + @before_callbacks[event] << block + end + + define_method :"after_#{event}" do |&block| + @after_callbacks[event].unshift block + end + end + private - def note_state_change + def note_state_change(event) + @before_callbacks[event].each { |blk| blk.call(self) } result = yield @state_changed = true + @after_callbacks[event].each { |blk| blk.call(self) } result + end + + def history_timestamp(name, selector) + queue_history.map { |q| q[name] }.compact.send(selector) end end class RecurringJob < BaseJob attr_reader :jid, :data, :priority, :tags, :retries, :interval, :count, :queue_name, :klass_name