module Eco module API class Session class Batch # @attr_reader name [String] the name of this `batch job` # @attr_reader type [Symbol] a valid batch operation # @attr_reader sets [Array] the parts of the person model this batch is supposed to affect # @attr_reader usecase [Eco::API::UseCases::UseCase, nil] when provided: `usecase` that generated this `batch job` # @attr_reader status [Eco::API::Session::Batch::Status] if launched: the `status` of the `batch` # @attr_reader feedback [Eco::API::Session::Batch::Feedback] helper class for feedback and decision making class Job < Eco::API::Common::Session::BaseSession @types = [:get, :create, :update, :delete] @sets = [:core, :details, :account] class << self attr_reader :types, :sets def valid_type?(value) types.include?(value) end def valid_sets?(value) sts = [value].flatten sts.all? { |s| sets.include?(s) } end end attr_reader :name, :type, :sets attr_reader :usecase attr_reader :status, :feedback # @param e [Eco::API::Common::Session::Environment] requires a session environmen, as any child of `Eco::API::Common::Session::BaseSession` # @param name [String] the name of this `batch job` # @param type [Symbol] a valid batch operation # @param usecase [Eco::API::UseCases::UseCase, nil] when provided: `usecase` that generated this `batch job` def initialize(e, name:, type:, sets:, usecase: nil) raise "A name is required to refer a job. Given: #{name}" if !name raise "Type should be one of #{self.class.types}. Given: #{type}" unless self.class.valid_type?(type) raise "Sets should be some of #{self.class.sets}. Given: #{sets}" unless self.class.valid_sets?(sets) raise "usecase must be a Eco::API::UseCases::UseCase object. Given: #{usecase.class}" if usecase && !usecase.is_a?(Eco::API::UseCases::UseCase) super(e) @name = name @type = type @sets = [sets].flatten.compact @usecase = usecase @feedback = Eco::API::Session::Batch::Feedback.new(job: self) reset end def reset @queue = [] @queue_hash = {} @callbacks = {} @pending = true @status = nil end # @return [Boolean] was this `batch job` generated by a `usecase`? (`Eco::API::UseCases::UseCase`) def usecase? !!usecase end # @return [Hash] options the root `usecase` is run with def options usecase?? usecase.options : {} end def match?(type:, sets:) sets = [sets].flatten type == self.type && (sets.order == self.sets.order) end # @return [Boolean] has been this `batch job` launched? def pending? @pending end # Adds an entry(ies) to the job queue. # @param entry [Person, Enumberable] the person(s) we want to update, carrying the changes to be done. # @param unique [Boolean] specifies if repeated entries should be avoided in the queue. # @yield [person] callback before launching the batch job request against the server. # @yeldparam param [Person] current person object that that should be treated by the callback before launching the batch. # @return [Void] def add(entry, unique: true, &block) case entry when Enumerable entry.each {|e| add(e, unique: unique, &block)} else unless !entry unless unique && @queue_hash.key?(entry) @queue_hash[entry] = true @queue.push(entry) @callbacks[entry] = Proc.new if block_given? end end end end # Helper/shortcut to obtain a people object out of `input` # @note if `input` is not provided, it will use `queue` # @return [Eco::API::Organization::People] def people(input = @queue) Eco::API::Organization::People.new(input) end # Processes the `queue` and, unless `simulate` is `true`, launches against the server: # 1. if the entries of `queue` got pending _callbacks_ (delayed changes), it processes them # 2. unless type == `:create`: if there's a defined `api_excluded` _callback_ it calls it (see `Eco::API::Session::Config::People#api_excluded`) # 3. transforms the result to a `Eco::API::Organization::People` object # 4. if there are `api policies` defined, it passes the entries through them in order (see `Eco::API::Session::Config#policies`) # 5. at this point all the transformations have taken place... # 6. only include the entries that, after all above, still hold pending changes (`!as_update.empty?`) to be launched as update # 7. if we are **not** in `dry-run` (or `simulate`), launch the batch request against the server (see `Eco::API::Session::Batch#launch`) # 8. next, it links the resulting batch `status` to this `Batch::Job` (see `Eco::API::Session::Batch::Status`) # 9. the post launch kicks in, and for success requests, it consolidates the associated entries (see `Ecoportal::API::V1::Person#consolidate!`) # 10. launches specific error handlers, if there were **errors** from the Server as a result of the `batch.launch`, and there are `Error::Handlers` defined # 11. if we are **not** in `dry-run` (or `simulate`), it backs up the raw queries launched to the Server def launch(simulate: false) pqueue = processed_queue requests = pqueue.map {|e| as_update(e)} pre_checks(requests, simulate: simulate) if !simulate if pqueue.length > 0 backup_update(requests) @status = session.batch.launch(pqueue, method: type) @status.root = self end end post_launch(queue: pqueue, simulate: simulate) logger.info("Simulate: this would have launched: '#{type}'") if simulate @pending = false return @status end private def as_update(*args) feedback.as_update(*args) end def processed_queue @queue.each {|e| @callbacks[e].call(e) if @callbacks.key?(e) } apply_policies(api_included(@queue)).select {|e| !as_update(e).empty?} end # if there is a config definition to exclude entries # and the current batch is not a creation batch # - filter out excluded entries from the api update def api_included(full_queue) return full_queue if type == :create return full_queue unless excluded = session.config.people.api_excluded full_queue.select {|entry| !excluded.call(entry, session, options, self)} end def apply_policies(pre_queue) people(pre_queue).tap do |entries| policies = session.config.policies unless policies.empty? || options.dig(:skip, :api_policies) policies.launch(people: entries, session: session, options: options) end end end def batch_policy unless options.dig(:skip, :batch_policy) @batch_policy ||= session.config.batch_policies[self.type] end end def pre_checks(requests, simulate: false) only_stats = options.dig(:feedback, :only_stats) max_chars = simulate ? 2500 : 800 msg = feedback.generate(requests, max_chars: max_chars, only_stats: only_stats) logger.info(msg) @request_stats = feedback.request_stats(requests) if simulate && batch_policy && !batch_policy.compliant?(@request_stats) logger.warn("Batch Policy Uncompliance: this and next batches will be aborted!") logger.warn(batch_policy.uncompliance(@request_stats)) elsif batch_policy # will throw an Exception if the policy request_stats is not compliant batch_policy.validate!(@request_stats) end end def post_launch(queue: [], simulate: false) if !simulate && @status @status.queue.map do |entry| if @status.success?(entry) entry.consolidate! if entry.respond_to?(:consolidate!) #else # do not entry.reset! (keep track on changes still) end end # launch_error handlers handlers = session.config.error_handlers if @status.errors.any? && !handlers.empty? err_types = @status.errors.by_type handlers.each do |handler| if entries = err_types[handler.name] handler.launch(people: people(entries), session: session, options: options) end end end elsif simulate queue.map do |entry| entry.consolidate! if entry.respond_to?(:consolidate!) end end end def backup_update(requests) dir = config.people.requests_folder file = File.join(dir, "#{type}_data.json") file_manager.save_json(requests, file, :timestamp) end end end end end end