require_relative './base_job' module CanvasSync module JobBatches class ManagedBatchJob < BaseJob def perform(sub_jobs, context: nil, ordered: true, concurrency: nil) man_batch_id = SecureRandom.urlsafe_base64(10) if concurrency == 0 || concurrency == nil || concurrency == true concurrency = sub_jobs.count elsif concurrency == false concurrency = 1 end root_batch = Batch.new Batch.redis do |r| r.multi do r.hset("MNGBID-#{man_batch_id}", "root_bid", root_batch.bid) r.hset("MNGBID-#{man_batch_id}", "ordered", ordered) r.hset("MNGBID-#{man_batch_id}", "concurrency", concurrency) r.expire("MNGBID-#{man_batch_id}", Batch::BID_EXPIRE_TTL) mapped_sub_jobs = sub_jobs.each_with_index.map do |j, i| j['_mngbid_index_'] = i # This allows duplicate jobs when a Redis Set is used j = ActiveJob::Arguments.serialize([j]) JSON.unparse(j) end if ordered r.rpush("MNGBID-#{man_batch_id}-jobs", mapped_sub_jobs) else r.sadd("MNGBID-#{man_batch_id}-jobs", mapped_sub_jobs) end r.expire("MNGBID-#{man_batch_id}-jobs", Batch::BID_EXPIRE_TTL) end end root_batch.description = "Managed Batch Root (#{man_batch_id})" root_batch.allow_context_changes = (concurrency == 1) root_batch.context = context root_batch.on(:success, "#{self.class.to_s}.cleanup_redis", managed_batch_id: man_batch_id) root_batch.jobs {} concurrency.times do self.class.perform_next_sequence_job(man_batch_id) end end def self.cleanup_redis(status, options) man_batch_id = options['managed_batch_id'] Batch.redis do |r| r.del( "MNGBID-#{man_batch_id}", "MNGBID-#{man_batch_id}-jobs", ) end end def self.job_succeeded_callback(status, options) man_batch_id = options['managed_batch_id'] perform_next_sequence_job(man_batch_id) end protected def self.perform_next_sequence_job(man_batch_id) root_bid, ordered = Batch.redis do |r| r.multi do r.hget("MNGBID-#{man_batch_id}", "root_bid") r.hget("MNGBID-#{man_batch_id}", "ordered") end end next_job_json = Batch.redis do |r| if ordered r.lpop("MNGBID-#{man_batch_id}-jobs") else r.spop("MNGBID-#{man_batch_id}-jobs") end end return unless next_job_json.present? next_job = JSON.parse(next_job_json) next_job = ActiveJob::Arguments.deserialize(next_job)[0] Batch.new(root_bid).jobs do Batch.new.tap do |batch| batch.description = "Managed Batch Fiber (#{man_batch_id})" batch.on(:success, "#{self.to_s}.job_succeeded_callback", managed_batch_id: man_batch_id) batch.jobs do ChainBuilder.enqueue_job(next_job) end end end end end end end