require_relative './base_job' module CanvasSync module JobBatches class SerialBatchJob < BaseJob def perform(sub_jobs, context: nil) serial_id = SecureRandom.urlsafe_base64(10) root_batch = Batch.new Batch.redis do |r| r.multi do mapped_sub_jobs = sub_jobs.map do |j| j = ActiveJob::Arguments.serialize([j]) JSON.unparse(j) end r.hset("SERBID-#{serial_id}", "root_bid", root_batch.bid) r.expire("SERBID-#{serial_id}", Batch::BID_EXPIRE_TTL) r.rpush("SERBID-#{serial_id}-jobs", mapped_sub_jobs) r.expire("SERBID-#{serial_id}-jobs", Batch::BID_EXPIRE_TTL) end end root_batch.description = "Serial Batch Root (#{serial_id})" root_batch.allow_context_changes = true root_batch.context = context root_batch.on(:success, "#{self.class.to_s}.cleanup_redis", serial_batch_id: serial_id) root_batch.jobs {} self.class.perform_next_sequence_job(serial_id) end def self.cleanup_redis(status, options) serial_id = options['serial_batch_id'] Batch.redis do |r| r.del( "SERBID-#{serial_id}", "SERBID-#{serial_id}-jobs", ) end end def self.job_succeeded_callback(status, options) serial_id = options['serial_batch_id'] perform_next_sequence_job(serial_id) end protected def self.perform_next_sequence_job(serial_id) root_bid, next_job_json = Batch.redis do |r| r.multi do r.hget("SERBID-#{serial_id}", "root_bid") r.lpop("SERBID-#{serial_id}-jobs") end end return unless next_job_json.present? next_job = JSON.parse(next_job_json) next_job = ActiveJob::Arguments.deserialize(next_job)[0] Batch.new(root_bid).jobs do Batch.new.tap do |batch| batch.description = "Serial Batch Fiber (#{serial_id})" batch.on(:success, "#{self.to_s}.job_succeeded_callback", serial_batch_id: serial_id) batch.jobs do ChainBuilder.enqueue_job(next_job) end end end end end end end