module CanvasSync::JobBatches class ChainBuilder VALID_PLACEMENT_PARAMETERS = %i[before after with].freeze attr_reader :base_job def initialize(base_type = SerialBatchJob, chain_id: nil) @chain_id = chain_id || SecureRandom.urlsafe_base64(10) if base_type.is_a?(Hash) @base_job = base_type @base_job[:args] ||= @base_job[:parameters] || [] @base_job[:kwargs] ||= {} else @base_job = build_job_hash(base_type) end self.class.get_chain_parameter(base_job) end def process! normalize! self.class.enqueue_job(base_job) end def [](key) if key.is_a?(Class) get_sub_chain(key) else # Legacy Support key = :args if key == :parameters @base_job[key] end end def args; return self[:args]; end def kwargs; return self[:kwargs]; end def <<(new_job) insert_at(-1, new_job) end def insert_at(position, new_jobs, *args, **kwargs, &blk) chain = self.class.get_chain_parameter(base_job) if new_jobs.is_a?(Class) || new_jobs.is_a?(String) new_jobs = build_job_hash(new_jobs, args: args, kwargs: kwargs, &blk) elsif args.count > 0 || kwargs.count > 0 raise "Unexpected number of arguments" end new_jobs = [new_jobs] unless new_jobs.is_a?(Array) chain.insert(position, *new_jobs) end def insert(new_jobs, *args, **kwargs, &blk) if new_jobs.is_a?(Class) || new_jobs.is_a?(String) job_kwargs = kwargs.except(*VALID_PLACEMENT_PARAMETERS) new_jobs = build_job_hash(new_jobs, args: args, kwargs: job_kwargs, &blk) kwargs = kwargs.slice(*VALID_PLACEMENT_PARAMETERS) else invalid_params = kwargs.keys - VALID_PLACEMENT_PARAMETERS raise "Invalid placement parameters: #{invalid_params.map(&:to_s).join(', ')}" if invalid_params.present? raise "At most one placement parameter may be provided" if kwargs.values.compact.length > 1 raise "Unexpected number of arguments" if args.length > 0 end new_jobs = [new_jobs] unless new_jobs.is_a?(Array) if !kwargs.present? insert_at(-1, new_jobs) else placement = kwargs.keys[0] relative_to = kwargs.values[0] matching_jobs = find_matching_jobs(relative_to).to_a raise "Could not find a \"#{relative_to}\" job in the chain" if matching_jobs.count == 0 raise "Found multiple \"#{relative_to}\" jobs in the chain" if matching_jobs.count > 1 relative_job, parent_job, sub_index = matching_jobs[0] needed_parent_type = placement == :with ? ConcurrentBatchJob : SerialBatchJob chain = self.class.get_chain_parameter(parent_job) if parent_job[:job] != needed_parent_type old_job = chain[sub_index] parent_job = chain[sub_index] = { job: needed_parent_type, parameters: [], } sub_index = 0 chain = self.class.get_chain_parameter(parent_job) chain << old_job end if placement == :with chain.insert(-1, *new_jobs) else sub_index += 1 if placement == :after chain.insert(sub_index, *new_jobs) end end end def empty? self.class.get_chain_parameter(self).empty? end def get_sub_chain(sub_type) matching_jobs = find_matching_jobs(sub_type).to_a raise "Found multiple \"#{sub_type}\" jobs in the chain" if matching_jobs.count > 1 return nil if matching_jobs.count == 0 job = matching_jobs[0][0] job = self.class.new(job, chain_id: @chain_id) unless job.is_a?(ChainBuilder) job end def normalize!(job_def = self.base_job) if job_def.is_a?(ChainBuilder) job_def.normalize! else job_def[:job] = job_def[:job].to_s job_def[:chain_link] ||= "#{@chain_id}-#{SecureRandom.urlsafe_base64(10)}" if (chain = self.class.get_chain_parameter(job_def, raise_error: false)).present? chain.map! { |sub_job| normalize!(sub_job) } end job_def end end def apply_block(&blk) return unless blk.present? instance_exec(&blk) end private def build_job_hash(job, args: [], kwargs: {}, &blk) hsh = { job: job, args: args, kwargs: kwargs, } self.class.new(hsh, chain_id: @chain_id).apply_block(&blk) if blk.present? hsh end def find_matching_jobs(search_job, parent_job = self.base_job) return to_enum(:find_matching_jobs, search_job, parent_job) unless block_given? sub_jobs = self.class.get_chain_parameter(parent_job) sub_jobs.each_with_index do |sub_job, i| if sub_job[:job].to_s == search_job.to_s yield [sub_job, parent_job, i] elsif self.class._job_type_definitions[sub_job[:job].to_s] find_matching_jobs(search_job, sub_job) { |item| yield item } end end end def find_parent_job(job_def) iterate_job_tree do |job, path| return path[-1] if job == job_def end nil end def iterate_job_tree(root: self.base_job, path: [], &blk) blk.call(root, path) if self.class._job_type_definitions[root[:job]] sub_jobs = self.class.get_chain_parameter(root) sub_jobs.each_with_index do |sub_job, i| iterate_job_tree(root: sub_job, path: [*path, root], &blk) end end end class << self # Support builder syntaxt/DSL # Chain.build(ConcurrentBatchJob) do # insert(SomeJob, arg1, kwarg: 1) # insert(SerialBatchJob) do # insert(SomeJob, arg1, kwarg: 1) # end # end def build(job, *args, **kwargs, &blk) new(job).tap do |ch| ch.base_job[:args] = args ch.base_job[:kwargs] = kwargs ch.apply_block(&blk) end end def _job_type_definitions @job_type_definitions ||= {} end def register_chain_job(job_class, chain_parameter, **options) _job_type_definitions[job_class.to_s] = { **options, chain_parameter: chain_parameter, } end def get_chain_parameter(job_def, raise_error: true) unless _job_type_definitions[job_def[:job].to_s].present? raise "Job Type #{job_def[:job].to_s} does not accept a sub-chain" if raise_error return nil end key = _job_type_definitions[job_def[:job].to_s][:chain_parameter] if key.is_a?(Numeric) job_def[:args][key] ||= [] else job_def[:kwargs][key] ||= [] end end # TODO: Add a Chain progress web View # Augment Batch tree-view with Chain data # > [DONE] Tree view w/o Chain will only show Parent/Current batches and Job Counts # > If augmented with Chain data, the above will be annotated with Chain-related info and will be able to show Jobs defined in the Chain # > Chain-jobs will be supplied chain_id and chain_step_id metadata # > Using server-middleware, if a Chain-job (has chain_id and chain_step_id) creates a Batch, tag the Batch w/ the chain_id and chain_step_id # > UI will map Batches to Chain-steps using the chain_step_id. UI will add entries for any Chain-steps that were not tied to a Batch # > [DONE] Use a Lua script to find child batch IDs. Support max_depth, items_per_depth, top_depth_slice parameters def enqueue_job(job_def) job_class = job_def[:job].constantize job_args = job_def[:args] || job_def[:parameters] || [] job_kwargs = job_def[:kwargs] || {} # Legacy Support if job_def[:options] job_args << {} unless job_args[-1].is_a?(Hash) job_args[-1].merge!(job_def[:options]) end if job_class.respond_to? :perform_async job_class.perform_async(*job_args, **job_kwargs) else job_class.perform_later(*job_args, **job_kwargs) end end def link_to_batch!(chain_link, batch) # Or make chains a separate entity - Chains show batches, but batches don't show chain? # Or "Annotate" a Batch with chain data - could extract chain id from any job entry end def annotate_batch!(batch, chain) end def handle_step_complete(status, opts) chain_link = opts[:chain_link] chain_id, chain_step_id = chain_link.split('-') CanvasSync::JobBatches::Batch.redis.multi do |r| r.hset("CHAIN-#{chain_id}-steps", chain_step_id, "complete") end end end end ChainBuilder.register_chain_job(ConcurrentBatchJob, 0) ChainBuilder.register_chain_job(SerialBatchJob, 0) end