lib/sidekiq/hierarchy/job.rb in sidekiq-hierarchy-1.1.0 vs lib/sidekiq/hierarchy/job.rb in sidekiq-hierarchy-2.0.0
- old
+ new
@@ -5,15 +5,20 @@
# Job hash keys
INFO_FIELD = 'i'.freeze
PARENT_FIELD = 'p'.freeze
STATUS_FIELD = 's'.freeze
- WORKFLOW_STATUS_FIELD = 'w'.freeze
+
ENQUEUED_AT_FIELD = 'e'.freeze
RUN_AT_FIELD = 'r'.freeze
COMPLETED_AT_FIELD = 'c'.freeze
+ WORKFLOW_STATUS_FIELD = 'w'.freeze
+ WORKFLOW_FINISHED_AT_FIELD = 'wf'.freeze
+ SUBTREE_SIZE_FIELD = 't'.freeze
+ FINISHED_SUBTREE_SIZE_FIELD = 'tf'.freeze
+
# Values for STATUS_FIELD
STATUS_ENQUEUED = '0'.freeze
STATUS_RUNNING = '1'.freeze
STATUS_COMPLETE = '2'.freeze
STATUS_REQUEUED = '3'.freeze
@@ -35,10 +40,11 @@
alias_method :find, :new
def create(jid, job_hash)
new(jid).tap do |job|
job[INFO_FIELD] = Sidekiq.dump_json(filtered_job_hash(job_hash))
+ job.increment_subtree_size # start at subtree size 1 -- no children
end
end
# saves INFO_KEYS as well as whatever keys are specified
# in the worker's sidekiq options under :workflow_keys
@@ -119,10 +125,48 @@
def leaves
# This could be done in a single Lua script server-side...
self.leaf? ? [self] : children.flat_map(&:leaves)
end
+ # Walks the subtree rooted here in DFS order
+ # Returns an Enumerator; use #to_a to get an array instead
+ def subtree_jobs
+ to_visit = [self]
+ Enumerator.new do |y|
+ while node = to_visit.pop
+ y << node # sugar for yielding a value
+ to_visit += node.children
+ end
+ end
+ end
+
+ # The cached cardinality of the tree rooted at this job
+ def subtree_size
+ self[SUBTREE_SIZE_FIELD].to_i
+ end
+
+ # Recursively updates subtree size on this and all higher subtrees
+ def increment_subtree_size(incr=1)
+ redis { |conn| conn.hincrby(redis_job_hkey, SUBTREE_SIZE_FIELD, incr) }
+ if p_job = parent
+ p_job.increment_subtree_size(incr)
+ end
+ end
+
+ # The cached count of the finished jobs in the tree rooted at this job
+ def finished_subtree_size
+ self[FINISHED_SUBTREE_SIZE_FIELD].to_i
+ end
+
+ # Recursively updates finished subtree size on this and all higher subtrees
+ def increment_finished_subtree_size(incr=1)
+ redis { |conn| conn.hincrby(redis_job_hkey, FINISHED_SUBTREE_SIZE_FIELD, incr) }
+ if p_job = parent
+ p_job.increment_finished_subtree_size(incr)
+ end
+ end
+
# Draws a new doubly-linked parent-child relationship in redis
def add_child(child_job)
redis do |conn|
conn.multi do
# draw child->parent relationship
@@ -131,11 +175,14 @@
# draw parent->child relationship
conn.rpush(redis_children_lkey, child_job.jid)
conn.expire(redis_children_lkey, ONE_MONTH)
end
end
- true # will never fail w/o raising an exception
+
+ # updates subtree counts to reflect new child
+ increment_subtree_size(child_job.subtree_size)
+ increment_finished_subtree_size(child_job.finished_subtree_size)
end
def workflow
Workflow.find(root)
end
@@ -177,10 +224,11 @@
s_val, t_field = STATUS_FAILED, COMPLETED_AT_FIELD
end
self[STATUS_FIELD] = s_val
self[t_field] = Time.now.to_f.to_s if t_field
+ increment_finished_subtree_size if [:failed, :complete].include?(new_status)
Sidekiq::Hierarchy.publish(Notifications::JOB_UPDATE, self, new_status, old_status)
end
# Status update: mark as enqueued (step 1)
@@ -246,9 +294,13 @@
def failed_at
if failed? && t = self[COMPLETED_AT_FIELD]
Time.at(t.to_f)
end
+ end
+
+ def finished?
+ [:failed, :complete].include?(status) # two terminal states
end
def finished_at
if t = self[COMPLETED_AT_FIELD]
Time.at(t.to_f)