lib/qless/job.rb in qless-0.9.1 vs lib/qless/job.rb in qless-0.9.2
- old
+ new
@@ -8,25 +8,25 @@
class BaseJob
def initialize(client, jid)
@client = client
@jid = jid
end
-
+
def klass
@klass ||= @klass_name.split('::').inject(Kernel) { |context, name| context.const_get(name) }
end
-
+
def queue
@queue ||= Queue.new(@queue_name, @client)
end
end
-
+
class Job < BaseJob
attr_reader :jid, :expires_at, :state, :queue_name, :history, :worker_name, :failure, :klass_name, :tracked, :dependencies, :dependents
attr_reader :original_retries, :retries_left
attr_accessor :data, :priority, :tags
-
+
def perform
klass.perform(self)
end
def self.build(client, klass, attributes = {})
@@ -50,18 +50,18 @@
}
attributes = defaults.merge(Qless.stringify_hash_keys(attributes))
attributes["data"] = JSON.load(JSON.dump attributes["data"])
new(client, attributes)
end
-
+
def initialize(client, atts)
super(client, atts.fetch('jid'))
%w{jid data priority tags state tracked
failure history dependencies dependents}.each do |att|
self.instance_variable_set("@#{att}".to_sym, atts.fetch(att))
end
-
+
@expires_at = atts.fetch('expires')
@klass_name = atts.fetch('klass')
@queue_name = atts.fetch('queue')
@worker_name = atts.fetch('worker')
@original_retries = atts.fetch('retries')
@@ -71,50 +71,50 @@
@tags = [] if @tags == {}
@dependents = [] if @dependents == {}
@dependencies = [] if @dependencies == {}
@state_changed = false
end
-
+
def priority=(priority)
if @client._priority.call([], [@jid, priority])
@priority = priority
end
end
-
+
def [](key)
@data[key]
end
-
+
def []=(key, val)
@data[key] = val
end
-
+
def to_s
inspect
end
def description
"#{@jid} (#{@klass_name} / #{@queue_name})"
end
-
+
def inspect
"<Qless::Job #{description}>"
end
-
+
def ttl
@expires_at - Time.now.to_f
end
-
+
# Move this from it's current queue into another
def move(queue)
note_state_change do
@client._put.call([queue], [
@jid, @klass_name, JSON.generate(@data), Time.now.to_f, 0
])
end
end
-
+
# Fail a job
def fail(group, message)
note_state_change do
@client._fail.call([], [
@jid,
@@ -122,20 +122,20 @@
group, message,
Time.now.to_f,
JSON.generate(@data)]) || false
end
end
-
+
# Heartbeat a job
def heartbeat()
@client._heartbeat.call([], [
@jid,
@worker_name,
Time.now.to_f,
JSON.generate(@data)]) || false
end
-
+
# Complete a job
# Options include
# => next (String) the next queue
# => delay (int) how long to delay it in the next queue
def complete(nxt=nil, options={})
@@ -153,42 +153,44 @@
end
def state_changed?
@state_changed
end
-
+
def cancel
note_state_change do
@client._cancel.call([], [@jid])
end
end
-
+
def track()
@client._track.call([], ['track', @jid, Time.now.to_f])
end
-
+
def untrack
@client._track.call([], ['untrack', @jid, Time.now.to_f])
end
-
+
def tag(*tags)
@client._tag.call([], ['add', @jid, Time.now.to_f] + tags)
end
-
+
def untag(*tags)
@client._tag.call([], ['remove', @jid, Time.now.to_f] + tags)
end
-
+
def retry(delay=0)
- results = @client._retry.call([], [@jid, @queue_name, @worker_name, Time.now.to_f, delay])
- results.nil? ? false : results
+ note_state_change do
+ results = @client._retry.call([], [@jid, @queue_name, @worker_name, Time.now.to_f, delay])
+ results.nil? ? false : results
+ end
end
-
+
def depend(*jids)
!!@client._depends.call([], [@jid, 'on'] + jids)
end
-
+
def undepend(*jids)
!!@client._depends.call([], [@jid, 'off'] + jids)
end
private
@@ -197,62 +199,62 @@
result = yield
@state_changed = true
result
end
end
-
+
class RecurringJob < BaseJob
attr_reader :jid, :data, :priority, :tags, :retries, :interval, :count, :queue_name, :klass_name
-
+
def initialize(client, atts)
super(client, atts.fetch('jid'))
%w{jid data priority tags retries interval count}.each do |att|
self.instance_variable_set("@#{att}".to_sym, atts.fetch(att))
end
-
+
@klass_name = atts.fetch('klass')
@queue_name = atts.fetch('queue')
@tags = [] if @tags == {}
end
-
+
def priority=(value)
@client._recur.call([], ['update', @jid, 'priority', value])
@priority = value
end
-
+
def retries=(value)
@client._recur.call([], ['update', @jid, 'retries', value])
@retries = value
end
-
+
def interval=(value)
@client._recur.call([], ['update', @jid, 'interval', value])
@interval = value
end
-
+
def data=(value)
@client._recur.call([], ['update', @jid, 'data', JSON.generate(value)])
@data = value
end
-
+
def klass=(value)
@client._recur.call([], ['update', @jid, 'klass', value.to_s])
@klass_name = value.to_s
end
-
+
def move(queue)
@client._recur.call([], ['update', @jid, 'queue', queue])
@queue_name = queue
end
-
+
def cancel
@client._recur.call([], ['off', @jid])
end
-
+
def tag(*tags)
@client._recur.call([], ['tag', @jid] + tags)
end
-
+
def untag(*tags)
@client._recur.call([], ['untag', @jid] + tags)
end
end
end