require_relative './base_job' module CanvasSync::JobBatches class ManagedBatchJob < BaseJob def self.make_batch(sub_jobs, ordered: true, concurrency: nil, context: nil, preflight_check: nil, desc_prefix: nil, &blk) desc_prefix ||= '' if concurrency == 0 || concurrency == nil || concurrency == true concurrency = sub_jobs.count elsif concurrency == false concurrency = 1 end root_batch = Batch.new man_batch_id = nil if concurrency < sub_jobs.count man_batch_id = SecureRandom.urlsafe_base64(10) Batch.redis do |r| r.multi do |r| r.hset("MNGBID-#{man_batch_id}", "root_bid", root_batch.bid) r.hset("MNGBID-#{man_batch_id}", "ordered", ordered ? 1 : 0) r.hset("MNGBID-#{man_batch_id}", "concurrency", concurrency) r.hset("MNGBID-#{man_batch_id}", "preflight_check", preflight_check) if preflight_check.present? r.expire("MNGBID-#{man_batch_id}", Batch::BID_EXPIRE_TTL) mapped_sub_jobs = sub_jobs.each_with_index.map do |j, i| j['_mngbid_index_'] = i # This allows duplicate jobs when a Redis Set is used j = ::ActiveJob::Arguments.serialize([j]) JSON.unparse(j) end if ordered r.rpush("MNGBID-#{man_batch_id}-jobs", mapped_sub_jobs) else r.sadd("MNGBID-#{man_batch_id}-jobs", mapped_sub_jobs) end r.expire("MNGBID-#{man_batch_id}-jobs", Batch::BID_EXPIRE_TTL) end end root_batch.allow_context_changes = (concurrency == 1) root_batch.on(:success, "#{to_s}.cleanup_redis", managed_batch_id: man_batch_id) desc_prefix = "MGD(#{man_batch_id}): #{desc_prefix}" end root_batch.context = context blk.call(ManagedBatchProxy.new(root_batch)) if blk.present? root_batch.description = "#{desc_prefix}#{root_batch.description || 'Root'}" root_batch.context["managed_batch_bid"] = man_batch_id if man_batch_id if concurrency < sub_jobs.count root_batch.placeholder! concurrency.times do perform_next_sequence_job(man_batch_id, skip_preflight: true) end else root_batch.jobs do sub_jobs.each do |j| ChainBuilder.enqueue_job(j) end end end root_batch end def perform(sub_jobs, **kwargs) self.class.make_batch(sub_jobs, **kwargs) end def self.cleanup_redis(status, options) man_batch_id = options['managed_batch_id'] Batch.redis do |r| r.del( "MNGBID-#{man_batch_id}", "MNGBID-#{man_batch_id}-jobs", ) end end def self.job_succeeded_callback(status, options) man_batch_id = options['managed_batch_id'] perform_next_sequence_job(man_batch_id) end protected def self.perform_next_sequence_job(man_batch_id, skip_preflight: false) root_bid, ordered, preflight_check = Batch.redis do |r| r.multi do |r| r.hget("MNGBID-#{man_batch_id}", "root_bid") r.hget("MNGBID-#{man_batch_id}", "ordered") r.hget("MNGBID-#{man_batch_id}", "preflight_check") end end if !skip_preflight && preflight_check.present? if preflight_check.include?(".") clazz, method_name = preflight_check.split('.') clazz = clazz.constantize else clazz = Object method_name = preflight_check end preflight_check = ->(*args) { clazz.send(method_name, *args) } else preflight_check = ->(*args) { true } end ordered = CanvasSync::MiscHelper.to_boolean(ordered) loop do next_job_json = Batch.redis do |r| if ordered r.lpop("MNGBID-#{man_batch_id}-jobs") else r.spop("MNGBID-#{man_batch_id}-jobs") end end break unless next_job_json.present? next_job = JSON.parse(next_job_json) next_job = ::ActiveJob::Arguments.deserialize(next_job)[0] preflight_result = preflight_check.call(next_job) if preflight_result == :abort cleanup_redis(nil, { "managed_batch_id" => man_batch_id }) break elsif !preflight_check next end Batch.new(root_bid).jobs do Batch.new.tap do |batch| batch.description = "Managed Batch Fiber (#{man_batch_id})" batch.on(:success, "#{self.to_s}.job_succeeded_callback", managed_batch_id: man_batch_id) if next_job[:chain_link].present? # Annotate Batch with chain-step info batch.context["csb:chain_link"] = next_job[:chain_link] # TODO Add Fiber Batch to chain-link # With the exception of the top of the Chain, all nested ManagedBatch Roots should be within a Fiber, # so we shouldn't really need to make the Root checkin with the chain # ...except to cleanup the chain batch.on(:complete, "#{ChainBuilder.to_s}.chain_step_complete", chain_link: next_job[:chain_link]) end batch.jobs do ChainBuilder.enqueue_job(next_job) end end end break end end class ManagedBatchProxy def initialize(real_batch) @real_batch = real_batch end delegate_missing_to :real_batch def jobs raise "Managed Batches do not support calling .jobs directly!" end private attr_reader :real_batch end end end