module CanvasSync::JobBatches class Batch module Callback mattr_accessor :worker_class VALID_CALLBACKS = %i[success complete death stagnated].freeze module CallbackWorkerCommon def perform(definition, event, opts, bid, parent_bid) return unless VALID_CALLBACKS.include?(event.to_sym) method = nil target = :instance clazz = definition if clazz.is_a?(String) if clazz.include?('#') clazz, method = clazz.split("#") elsif clazz.include?('.') clazz, method = clazz.split(".") target = :class end end method ||= "on_#{event}" status = Batch::Status.new(bid) if clazz && object = Object.const_get(clazz) target = target == :instance ? object.new : object if target.respond_to?(method, true) target.send(method, status, opts.with_indifferent_access) else Batch.logger.warn("Invalid callback method #{definition} - #{target.to_s} does not respond to #{method}") end else Batch.logger.warn("Invalid callback method #{definition} - Class #{clazz} not found") end end end class Finalize # The methods in this class are called after all same-named callbacks have been # completed for the passed Batch. # These methods mainly handle bubbling events up to the parent Batch # You could say that they are the callbacks for callbacks. def dispatch(status, opts) bid = opts["bid"] event = opts["event"].to_sym Batch.logger.debug {"Finalize #{event} batch id: #{opts["bid"]}"} batch_status = Status.new bid send(event, bid, batch_status, batch_status.parent_bid) Batch.redis do |r| r.srem("BID-#{bid}-pending_callbacks", "#{event}-finalize") end if event == :success if opts['origin'].present? # This is a callback for a callback. In this case we need to check if we should cleanup the original bid. origin_bid = opts['origin']['for_bid'] _, pending, success_ran = Batch.redis do |r| r.multi do |r| r.srem("BID-#{origin_bid}-pending_callbacks", opts['origin']['event']) r.scard("BID-#{origin_bid}-pending_callbacks") r.hget("BID-#{origin_bid}", "success") end end Batch.cleanup_redis(origin_bid) if pending == 0 && success_ran == 'true' end if (Batch.redis {|r| r.scard("BID-#{bid}-pending_callbacks") }) == 0 Batch.cleanup_redis(bid) end end end def success(bid, status, parent_bid) return unless parent_bid Batch.with_callback_check(parent_bid) do |r| r.sadd("BID-#{parent_bid}-batches-success", bid) r.srem("BID-#{parent_bid}-batches-failed", bid) r.sadd("BID-#{parent_bid}-batches-complete", bid) end end def complete(bid, status, parent_bid) return unless parent_bid pending, children, success = Batch.redis do |r| r.multi do |r| r.hincrby("BID-#{bid}", "pending", 0) r.hincrby("BID-#{bid}", "children", 0) r.scard("BID-#{bid}-batches-success") end end if !(pending.to_i.zero? && children == success) # If batch was not successfull check and see if its parent is complete # if the parent is complete we can trigger its complete callback. # # Otherwise, we don't want to to trigger the parent's :complete here (and # we instead opt to have success tigger parent :complete) - this # allows the success callback to add additional jobs to the parent batch # before triggering :complete. Batch.with_callback_check(parent_bid, except: [:success]) do |r| r.sadd("BID-#{parent_bid}-batches-complete", bid) r.sadd("BID-#{parent_bid}-batches-failed", bid) end end end def death(bid, status, parent_bid) return unless parent_bid # We only need to bubble the event here - other events (eg stagnation) will be checked and bubbled elsewhere. Batch.enqueue_callbacks(:death, parent_bid) end def stagnated(bid, status, parent_bid) return unless parent_bid Batch.with_callback_check(parent_bid) do |r| r.sadd("BID-#{parent_bid}-batches-stagnated", bid) r.expire("BID-#{parent_bid}-batches-stagnated", BID_EXPIRE_TTL) end end end end end end