module CanvasSync module JobBatches class Batch module Callback mattr_accessor :worker_class VALID_CALLBACKS = %w[success complete dead].freeze module CallbackWorkerCommon def perform(definition, event, opts, bid, parent_bid) return unless VALID_CALLBACKS.include?(event) method = nil target = :instance clazz = definition if clazz.is_a?(String) if clazz.include?('#') clazz, method = clazz.split("#") elsif clazz.include?('.') clazz, method = clazz.split(".") target = :class end end method ||= "on_#{event}" status = Batch::Status.new(bid) if clazz && object = Object.const_get(clazz) target = target == :instance ? object.new : object if target.respond_to?(method) target.send(method, status, opts) else Batch.logger.warn("Invalid callback method #{definition} - #{target.to_s} does not respond to #{method}") end else Batch.logger.warn("Invalid callback method #{definition} - Class #{clazz} not found") end end end class ActiveJobCallbackWorker < ActiveJob::Base include CallbackWorkerCommon def self.enqueue_all(args, queue) args.each do |arg_set| set(queue: queue).perform_later(*arg_set) end end end worker_class = ActiveJobCallbackWorker class Finalize def dispatch(status, opts) is_callback_batch = opts['origin'].present? has_callback_batch = opts['callback_bid'].present? bid = opts["bid"] event = opts["event"].to_sym Batch.logger.debug {"Finalize #{event} batch id: #{opts["bid"]}, callback batch id: #{callback_bid} callback_batch #{is_callback_batch}"} batch_status = Status.new bid send(event, bid, batch_status, batch_status.parent_bid) if event == :success && !has_callback_batch Batch.cleanup_redis(bid) end if event == :success && is_callback_batch && opts['origin']['event'].to_sym == :success Batch.cleanup_redis(opts['origin']['for_bid']) end end def success(bid, status, parent_bid) return unless parent_bid _, _, success, _, _, complete, pending, children, failure = Batch.redis do |r| r.multi do r.sadd("BID-#{parent_bid}-batches-success", bid) r.expire("BID-#{parent_bid}-batches-success", Batch::BID_EXPIRE_TTL) r.scard("BID-#{parent_bid}-batches-success") r.srem("BID-#{parent_bid}-batches-failed", bid) r.sadd("BID-#{parent_bid}-batches-complete", bid) r.scard("BID-#{parent_bid}-batches-complete") r.hincrby("BID-#{parent_bid}", "pending", 0) r.hincrby("BID-#{parent_bid}", "children", 0) r.scard("BID-#{parent_bid}-failed") end end # If the job finished successfully and parent batch is completed, call parent :complete callback # Parent :success callback will be called by its :complete callback if complete == children && pending == failure Batch.logger.debug {"Finalize parent complete bid: #{parent_bid}"} Batch.enqueue_callbacks(:complete, parent_bid) end end def complete(bid, status, parent_bid) pending, children, success = Batch.redis do |r| r.multi do r.hincrby("BID-#{bid}", "pending", 0) r.hincrby("BID-#{bid}", "children", 0) r.scard("BID-#{bid}-batches-success") end end # If the batch was successful run :success callback, which will call the parent's :complete callback (if necessary) # Also, only trigger the success callback if the :complete callback_batch was successful if pending.to_i.zero? && children == success Batch.enqueue_callbacks(:success, bid) # otherwise check for a parent and call its :complete if needed elsif parent_bid # if batch was not successfull check and see if its parent is complete # if the parent is complete we trigger the complete callback # We don't want to run this if the batch was successfull because the success # callback may add more jobs to the parent batch Batch.logger.debug {"Finalize parent complete bid: #{parent_bid}"} _, _, complete, pending, children, failure = Batch.redis do |r| r.multi do r.sadd("BID-#{parent_bid}-batches-complete", bid) r.sadd("BID-#{parent_bid}-batches-failed", bid) r.scard("BID-#{parent_bid}-batches-complete") r.hincrby("BID-#{parent_bid}", "pending", 0) r.hincrby("BID-#{parent_bid}", "children", 0) r.scard("BID-#{parent_bid}-failed") end end if complete == children && pending == failure Batch.enqueue_callbacks(:complete, parent_bid) end end end end end end end end