module CanvasSync module JobBatches class Batch module Callback VALID_CALLBACKS = %w[success complete dead].freeze module CallbackWorkerCommon def perform(definition, event, opts, bid, parent_bid) return unless VALID_CALLBACKS.include?(event) method = nil target = :instance clazz = definition if clazz.is_a?(String) if clazz.include?('#') clazz, method = clazz.split("#") elsif clazz.include?('.') clazz, method = clazz.split(".") target = :class end end method ||= "on_#{event}" status = Batch::Status.new(bid) if clazz && object = Object.const_get(clazz) target = target == :instance ? object.new : object if target.respond_to?(method) target.send(method, status, opts) else Batch.logger.warn("Invalid callback method #{definition} - #{target.to_s} does not respond to #{method}") end else Batch.logger.warn("Invalid callback method #{definition} - Class #{clazz} not found") end end end class ActiveJobCallbackWorker < ActiveJob::Base include CallbackWorkerCommon def self.enqueue_all(args, queue) args.each do |arg_set| set(queue: queue).perform_later(*arg_set) end end end if defined?(::Sidekiq) class SidekiqCallbackWorker include ::Sidekiq::Worker include CallbackWorkerCommon def self.enqueue_all(args, queue) return if args.empty? ::Sidekiq::Client.push_bulk( 'class' => self, 'args' => args, 'queue' => queue ) end end Worker = SidekiqCallbackWorker else Worker = ActiveJobCallbackWorker end class Finalize def dispatch status, opts bid = opts["bid"] callback_bid = status.bid event = opts["event"].to_sym callback_batch = bid != callback_bid Batch.logger.debug {"Finalize #{event} batch id: #{opts["bid"]}, callback batch id: #{callback_bid} callback_batch #{callback_batch}"} batch_status = Status.new bid send(event, bid, batch_status, batch_status.parent_bid) # Different events are run in different callback batches Batch.cleanup_redis callback_bid if callback_batch Batch.cleanup_redis bid if event == :success end def success(bid, status, parent_bid) return unless parent_bid _, _, success, _, complete, pending, children, failure = Batch.redis do |r| r.multi do r.sadd("BID-#{parent_bid}-success", bid) r.expire("BID-#{parent_bid}-success", Batch::BID_EXPIRE_TTL) r.scard("BID-#{parent_bid}-success") r.sadd("BID-#{parent_bid}-complete", bid) r.scard("BID-#{parent_bid}-complete") r.hincrby("BID-#{parent_bid}", "pending", 0) r.hincrby("BID-#{parent_bid}", "children", 0) r.scard("BID-#{parent_bid}-failed") end end # if job finished successfully and parent batch completed call parent complete callback # Success callback is called after complete callback if complete == children && pending == failure Batch.logger.debug {"Finalize parent complete bid: #{parent_bid}"} Batch.enqueue_callbacks(:complete, parent_bid) end end def complete(bid, status, parent_bid) pending, children, success = Batch.redis do |r| r.multi do r.hincrby("BID-#{bid}", "pending", 0) r.hincrby("BID-#{bid}", "children", 0) r.scard("BID-#{bid}-success") end end # if we batch was successful run success callback if pending.to_i.zero? && children == success Batch.enqueue_callbacks(:success, bid) elsif parent_bid # if batch was not successfull check and see if its parent is complete # if the parent is complete we trigger the complete callback # We don't want to run this if the batch was successfull because the success # callback may add more jobs to the parent batch Batch.logger.debug {"Finalize parent complete bid: #{parent_bid}"} _, complete, pending, children, failure = Batch.redis do |r| r.multi do r.sadd("BID-#{parent_bid}-complete", bid) r.scard("BID-#{parent_bid}-complete") r.hincrby("BID-#{parent_bid}", "pending", 0) r.hincrby("BID-#{parent_bid}", "children", 0) r.scard("BID-#{parent_bid}-failed") end end if complete == children && pending == failure Batch.enqueue_callbacks(:complete, parent_bid) end end end def cleanup_redis bid, callback_bid=nil Batch.cleanup_redis bid Batch.cleanup_redis callback_bid if callback_bid end end end end end end