require_relative './base_job' module CanvasSync module JobBatches class ManagedBatchJob < BaseJob def self.make_batch(sub_jobs, ordered: true, concurrency: nil, context: nil, desc_prefix: nil, &blk) desc_prefix ||= '' if concurrency == 0 || concurrency == nil || concurrency == true concurrency = sub_jobs.count elsif concurrency == false concurrency = 1 end root_batch = Batch.new if concurrency < sub_jobs.count man_batch_id = SecureRandom.urlsafe_base64(10) Batch.redis do |r| r.multi do r.hset("MNGBID-#{man_batch_id}", "root_bid", root_batch.bid) r.hset("MNGBID-#{man_batch_id}", "ordered", ordered) r.hset("MNGBID-#{man_batch_id}", "concurrency", concurrency) r.expire("MNGBID-#{man_batch_id}", Batch::BID_EXPIRE_TTL) mapped_sub_jobs = sub_jobs.each_with_index.map do |j, i| j['_mngbid_index_'] = i # This allows duplicate jobs when a Redis Set is used j = ActiveJob::Arguments.serialize([j]) JSON.unparse(j) end if ordered r.rpush("MNGBID-#{man_batch_id}-jobs", mapped_sub_jobs) else r.sadd("MNGBID-#{man_batch_id}-jobs", mapped_sub_jobs) end r.expire("MNGBID-#{man_batch_id}-jobs", Batch::BID_EXPIRE_TTL) end end root_batch.allow_context_changes = (concurrency == 1) root_batch.on(:success, "#{to_s}.cleanup_redis", managed_batch_id: man_batch_id) desc_prefix = "MGD(#{man_batch_id}): #{desc_prefix}" end root_batch.context = context blk.call(ManagedBatchProxy.new(root_batch)) if blk.present? root_batch.description = "#{desc_prefix}: #{root_batch.description || 'Root'}" if concurrency < sub_jobs.count root_batch.jobs {} concurrency.times do perform_next_sequence_job(man_batch_id) end else root_batch.jobs do sub_jobs.each do |j| ChainBuilder.enqueue_job(j) end end end end def perform(sub_jobs, context: nil, ordered: true, concurrency: nil) self.class.make_batch(sub_jobs, ordered: ordered, concurrency: concurrency, context: context) end def self.cleanup_redis(status, options) man_batch_id = options['managed_batch_id'] Batch.redis do |r| r.del( "MNGBID-#{man_batch_id}", "MNGBID-#{man_batch_id}-jobs", ) end end def self.job_succeeded_callback(status, options) man_batch_id = options['managed_batch_id'] perform_next_sequence_job(man_batch_id) end protected def self.perform_next_sequence_job(man_batch_id) root_bid, ordered = Batch.redis do |r| r.multi do r.hget("MNGBID-#{man_batch_id}", "root_bid") r.hget("MNGBID-#{man_batch_id}", "ordered") end end next_job_json = Batch.redis do |r| if ordered r.lpop("MNGBID-#{man_batch_id}-jobs") else r.spop("MNGBID-#{man_batch_id}-jobs") end end return unless next_job_json.present? next_job = JSON.parse(next_job_json) next_job = ActiveJob::Arguments.deserialize(next_job)[0] Batch.new(root_bid).jobs do Batch.new.tap do |batch| batch.description = "Managed Batch Fiber (#{man_batch_id})" batch.on(:success, "#{self.to_s}.job_succeeded_callback", managed_batch_id: man_batch_id) batch.jobs do ChainBuilder.enqueue_job(next_job) end end end end class ManagedBatchProxy def initialize(real_batch) @real_batch = real_batch end delegate_missing_to :real_batch def jobs raise "Managed Batches do not support calling .jobs directly!" end private attr_reader :real_batch end end end end