lib/sidekiq/hierarchy/job.rb in sidekiq-hierarchy-1.1.0 vs lib/sidekiq/hierarchy/job.rb in sidekiq-hierarchy-2.0.0

- old
+ new

@@ -5,15 +5,20 @@ # Job hash keys INFO_FIELD = 'i'.freeze PARENT_FIELD = 'p'.freeze STATUS_FIELD = 's'.freeze - WORKFLOW_STATUS_FIELD = 'w'.freeze + ENQUEUED_AT_FIELD = 'e'.freeze RUN_AT_FIELD = 'r'.freeze COMPLETED_AT_FIELD = 'c'.freeze + WORKFLOW_STATUS_FIELD = 'w'.freeze + WORKFLOW_FINISHED_AT_FIELD = 'wf'.freeze + SUBTREE_SIZE_FIELD = 't'.freeze + FINISHED_SUBTREE_SIZE_FIELD = 'tf'.freeze + # Values for STATUS_FIELD STATUS_ENQUEUED = '0'.freeze STATUS_RUNNING = '1'.freeze STATUS_COMPLETE = '2'.freeze STATUS_REQUEUED = '3'.freeze @@ -35,10 +40,11 @@ alias_method :find, :new def create(jid, job_hash) new(jid).tap do |job| job[INFO_FIELD] = Sidekiq.dump_json(filtered_job_hash(job_hash)) + job.increment_subtree_size # start at subtree size 1 -- no children end end # saves INFO_KEYS as well as whatever keys are specified # in the worker's sidekiq options under :workflow_keys @@ -119,10 +125,48 @@ def leaves # This could be done in a single Lua script server-side... self.leaf? ? [self] : children.flat_map(&:leaves) end + # Walks the subtree rooted here in DFS order + # Returns an Enumerator; use #to_a to get an array instead + def subtree_jobs + to_visit = [self] + Enumerator.new do |y| + while node = to_visit.pop + y << node # sugar for yielding a value + to_visit += node.children + end + end + end + + # The cached cardinality of the tree rooted at this job + def subtree_size + self[SUBTREE_SIZE_FIELD].to_i + end + + # Recursively updates subtree size on this and all higher subtrees + def increment_subtree_size(incr=1) + redis { |conn| conn.hincrby(redis_job_hkey, SUBTREE_SIZE_FIELD, incr) } + if p_job = parent + p_job.increment_subtree_size(incr) + end + end + + # The cached count of the finished jobs in the tree rooted at this job + def finished_subtree_size + self[FINISHED_SUBTREE_SIZE_FIELD].to_i + end + + # Recursively updates finished subtree size on this and all higher subtrees + def increment_finished_subtree_size(incr=1) + redis { |conn| conn.hincrby(redis_job_hkey, FINISHED_SUBTREE_SIZE_FIELD, incr) } + if p_job = parent + p_job.increment_finished_subtree_size(incr) + end + end + # Draws a new doubly-linked parent-child relationship in redis def add_child(child_job) redis do |conn| conn.multi do # draw child->parent relationship @@ -131,11 +175,14 @@ # draw parent->child relationship conn.rpush(redis_children_lkey, child_job.jid) conn.expire(redis_children_lkey, ONE_MONTH) end end - true # will never fail w/o raising an exception + + # updates subtree counts to reflect new child + increment_subtree_size(child_job.subtree_size) + increment_finished_subtree_size(child_job.finished_subtree_size) end def workflow Workflow.find(root) end @@ -177,10 +224,11 @@ s_val, t_field = STATUS_FAILED, COMPLETED_AT_FIELD end self[STATUS_FIELD] = s_val self[t_field] = Time.now.to_f.to_s if t_field + increment_finished_subtree_size if [:failed, :complete].include?(new_status) Sidekiq::Hierarchy.publish(Notifications::JOB_UPDATE, self, new_status, old_status) end # Status update: mark as enqueued (step 1) @@ -246,9 +294,13 @@ def failed_at if failed? && t = self[COMPLETED_AT_FIELD] Time.at(t.to_f) end + end + + def finished? + [:failed, :complete].include?(status) # two terminal states end def finished_at if t = self[COMPLETED_AT_FIELD] Time.at(t.to_f)