module CanvasSync module JobBatches class Batch module Callback mattr_accessor :worker_class VALID_CALLBACKS = %w[success complete death].freeze module CallbackWorkerCommon def perform(definition, event, opts, bid, parent_bid) return unless VALID_CALLBACKS.include?(event) method = nil target = :instance clazz = definition if clazz.is_a?(String) if clazz.include?('#') clazz, method = clazz.split("#") elsif clazz.include?('.') clazz, method = clazz.split(".") target = :class end end method ||= "on_#{event}" status = Batch::Status.new(bid) if clazz && object = Object.const_get(clazz) target = target == :instance ? object.new : object if target.respond_to?(method, true) target.send(method, status, opts) else Batch.logger.warn("Invalid callback method #{definition} - #{target.to_s} does not respond to #{method}") end else Batch.logger.warn("Invalid callback method #{definition} - Class #{clazz} not found") end end end class Finalize def dispatch(status, opts) bid = opts["bid"] event = opts["event"].to_sym Batch.logger.debug {"Finalize #{event} batch id: #{opts["bid"]}"} batch_status = Status.new bid send(event, bid, batch_status, batch_status.parent_bid) Batch.redis do |r| r.srem("BID-#{bid}-pending_callbacks", "#{event}-finalize") end if event == :success if opts['origin'].present? # This is a callback for a callback. In this case we need to check if we should cleanup the original bid. origin_bid = opts['origin']['for_bid'] _, pending, success_ran = Batch.redis do |r| r.multi do |r| r.srem("BID-#{origin_bid}-pending_callbacks", opts['origin']['event']) r.scard("BID-#{origin_bid}-pending_callbacks") r.hget("BID-#{origin_bid}", "success") end end Batch.cleanup_redis(origin_bid) if pending == 0 && success_ran == 'true' end if (Batch.redis {|r| r.scard("BID-#{bid}-pending_callbacks") }) == 0 Batch.cleanup_redis(bid) end end end def success(bid, status, parent_bid) return unless parent_bid _, _, success, _, _, complete, pending, children, success, failure = Batch.redis do |r| r.multi do |r| r.sadd("BID-#{parent_bid}-batches-success", bid) r.expire("BID-#{parent_bid}-batches-success", Batch::BID_EXPIRE_TTL) r.scard("BID-#{parent_bid}-batches-success") r.srem("BID-#{parent_bid}-batches-failed", bid) r.sadd("BID-#{parent_bid}-batches-complete", bid) r.scard("BID-#{parent_bid}-batches-complete") r.hincrby("BID-#{parent_bid}", "pending", 0) r.hincrby("BID-#{parent_bid}", "children", 0) r.scard("BID-#{parent_bid}-batches-success") r.scard("BID-#{parent_bid}-failed") end end if complete == children && pending == failure Batch.logger.debug {"Finalize parent complete bid: #{parent_bid}"} Batch.enqueue_callbacks(:complete, parent_bid) end if pending.to_i.zero? && children == success Batch.logger.debug {"Finalize parent success bid: #{parent_bid}"} Batch.enqueue_callbacks(:success, parent_bid) end end def complete(bid, status, parent_bid) pending, children, success = Batch.redis do |r| r.multi do |r| r.hincrby("BID-#{bid}", "pending", 0) r.hincrby("BID-#{bid}", "children", 0) r.scard("BID-#{bid}-batches-success") end end if parent_bid && !(pending.to_i.zero? && children == success) # if batch was not successfull check and see if its parent is complete # if the parent is complete we trigger the complete callback # We don't want to run this if the batch was successfull because the success # callback may add more jobs to the parent batch Batch.logger.debug {"Finalize parent complete bid: #{parent_bid}"} _, _, complete, pending, children, failure = Batch.redis do |r| r.multi do |r| r.sadd("BID-#{parent_bid}-batches-complete", bid) r.sadd("BID-#{parent_bid}-batches-failed", bid) r.scard("BID-#{parent_bid}-batches-complete") r.hincrby("BID-#{parent_bid}", "pending", 0) r.hincrby("BID-#{parent_bid}", "children", 0) r.scard("BID-#{parent_bid}-failed") end end if complete == children && pending == failure Batch.enqueue_callbacks(:complete, parent_bid) end end end def death(bid, status, parent_bid) return unless parent_bid Batch.enqueue_callbacks(:death, parent_bid) end end end end end end