module CanvasSync module JobBatches class ChainBuilder VALID_PLACEMENT_PARAMETERS = %i[before after with].freeze attr_reader :base_job def initialize(base_type = SerialBatchJob) if base_type.is_a?(Hash) @base_job = base_type else @base_job = { job: base_type, parameters: [], } end end def process! normalize! self.class.enqueue_job(base_job) end def [](key) if key.is_a?(Class) get_sub_chain(key) else @base_job[key] end end def params ParamsMapper.new(self[:parameters]) end def <<(new_job) insert_at(-1, new_job) end def insert_at(position, new_jobs) chain = self.class.get_chain_parameter(base_job) new_jobs = [new_jobs] unless new_jobs.is_a?(Array) chain.insert(-1, *new_jobs) end def insert(new_jobs, **kwargs) invalid_params = kwargs.keys - VALID_PLACEMENT_PARAMETERS raise "Invalid placement parameters: #{invalid_params.map(&:to_s).join(', ')}" if invalid_params.present? raise "At most one placement parameter may be provided" if kwargs.values.compact.length > 1 new_jobs = [new_jobs] unless new_jobs.is_a?(Array) if !kwargs.present? insert_at(-1, new_jobs) else placement = kwargs.keys[0] relative_to = kwargs.values[0] matching_jobs = find_matching_jobs(relative_to) raise "Could not find a \"#{relative_to}\" job in the chain" if matching_jobs.count == 0 raise "Found multiple \"#{relative_to}\" jobs in the chain" if matching_jobs.count > 1 relative_job, sub_index = matching_jobs[0] parent_job = find_parent_job(relative_job) needed_parent_type = placement == :with ? ConcurrentBatchJob : SerialBatchJob chain = self.class.get_chain_parameter(parent_job) if parent_job[:job] != needed_parent_type old_job = chain[sub_index] parent_job = chain[sub_index] = { job: needed_parent_type, parameters: [], } sub_index = 0 chain = self.class.get_chain_parameter(parent_job) chain << old_job end if placement == :with chain.insert(-1, *new_jobs) else sub_index += 1 if placement == :after chain.insert(sub_index, *new_jobs) end end end def get_sub_chain(sub_type) matching_jobs = find_matching_jobs(sub_type) raise "Found multiple \"#{sub_type}\" jobs in the chain" if matching_jobs.count > 1 return nil if matching_jobs.count == 0 new(matching_jobs[0]) end def normalize!(job_def = self.base_job) if job_def.is_a?(ChainBuilder) job_def.normalize! else job_def[:job] = job_def[:job].to_s if (chain = self.class.get_chain_parameter(job_def, raise_error: false)).present? chain.map! { |sub_job| normalize!(sub_job) } end job_def end end # Legacy Support def merge_options(job, options) matching_jobs = find_matching_jobs(job) matching_jobs.each do |j| j[:options] ||= {} j[:options].deep_merge!(options) end end private def find_matching_jobs(search_job, parent_job = self.base_job) return to_enum(:find_matching_jobs, search_job, parent_job) unless block_given? sub_jobs = self.class.get_chain_parameter(parent_job) sub_jobs.each_with_index do |sub_job, i| if sub_job[:job].to_s == search_job.to_s yield [parent_job, i] elsif self.class._job_type_definitions[sub_job[:job]] find_matching_jobs(search_job) { |item| yield item } end end end def find_parent_job(job_def) iterate_job_tree do |job, path| return path[-1] if job == job_def end nil end def iterate_job_tree(root: self.base_job, path: [], &blk) blk.call(root, path) if self.class._job_type_definitions[root[:job]] sub_jobs = self.class.get_chain_parameter(root) sub_jobs.each_with_index do |sub_job, i| iterate_job_tree(root: sub_job, path: [*path, root], &blk) end end end class << self def _job_type_definitions @job_type_definitions ||= {} end def register_chain_job(job_class, chain_parameter, **options) _job_type_definitions[job_class.to_s] = { **options, chain_parameter: chain_parameter, } end def get_chain_parameter(job_def, raise_error: true) unless _job_type_definitions[job_def[:job].to_s].present? raise "Job Type #{base_job[:job].to_s} does not accept a sub-chain" if raise_error return nil end key = _job_type_definitions[job_def[:job].to_s][:chain_parameter] mapper = ParamsMapper.new(job_def[:parameters]) mapper[key] ||= [] end def enqueue_job(job_def) job_class = job_def[:job].constantize job_options = job_def[:parameters] || [] # Legacy Support if job_def[:options] job_options << {} unless job_options[-1].is_a?(Hash) job_options[-1].merge!(job_def[:options]) end if job_class.respond_to? :perform_async job_class.perform_async(*job_options) else job_class.perform_later(*job_options) end end end end ChainBuilder.register_chain_job(ConcurrentBatchJob, 0) ChainBuilder.register_chain_job(SerialBatchJob, 0) class ParamsMapper def initialize(backend) @backend = backend end def [](key) get_parameter(key) end def []=(key, value) set_parameter(key, value) end def to_a @backend end private def get_parameter(key) if key.is_a?(Numeric) @backend[key] else kwargs = @backend.last return nil unless kwargs.is_a?(Hash) kwargs[key] end end def set_parameter(key, value) if key.is_a?(Numeric) @backend[key] = value else kwargs = @backend.last unless kwargs.is_a?(Hash) kwargs = {} @backend.push(kwargs) end kwargs[key] = value end end end end end