lib/eco/api/session/batch/job.rb in eco-helpers-3.0.4 vs lib/eco/api/session/batch/job.rb in eco-helpers-3.0.5

- old
+ new

@@ -1,20 +1,25 @@ +# rubocop:disable Naming/MethodParameterName module Eco module API class Session class Batch # @attr_reader name [String] the name of this `batch job` # @attr_reader type [Symbol] a valid batch operation - # @attr_reader sets [Array<Symbol>] the parts of the person model this batch is supposed to affect - # @attr_reader usecase [Eco::API::UseCases::UseCase, nil] when provided: `usecase` that generated this `batch job` - # @attr_reader status [Eco::API::Session::Batch::Status] if launched: the `status` of the `batch` - # @attr_reader feedback [Eco::API::Session::Batch::Feedback] helper class for feedback and end-user decision making + # @attr_reader sets [Array<Symbol>] the parts of the person model this + # batch is supposed to affect + # @attr_reader usecase [Eco::API::UseCases::UseCase, nil] when provided: + # `usecase` that generated this `batch job` + # @attr_reader status [Eco::API::Session::Batch::Status] if launched: + # the `status` of the `batch` + # @attr_reader feedback [Eco::API::Session::Batch::Feedback] helper class + # for feedback and end-user decision making class Job < Eco::API::Common::Session::BaseSession include Eco::Language::Methods::DslAble - @types = [:get, :create, :update, :delete] - @sets = [:core, :details, :account] + @types = %i[get create update delete] + @sets = %i[core details account] class << self attr_reader :types, :sets def valid_type?(value) @@ -29,25 +34,35 @@ attr_reader :name, :type, :sets attr_reader :usecase attr_reader :status, :feedback - # @param e [Eco::API::Common::Session::Environment] requires a session environment, as any child of `Eco::API::Common::Session::BaseSession`. + # @param ev [Eco::API::Common::Session::Environment] requires a session environment, as + # any child of `Eco::API::Common::Session::BaseSession`. # @param name [String] the name of this `batch job` # @param type [Symbol] a valid batch operation. # @param usecase [Eco::API::UseCases::UseCase, nil] when provided: `usecase` that generated this `batch job`. - # This is necessary to know the `options` used to run the usecase, which could modify the `Batch::Job` behviour. + # This is necessary to know the `options` used to run the usecase, which could modify the + # `Batch::Job` behviour. # @param accept_update_with_no_id [Boolean] temporary contingency # This parameter has been added due to a bug on server side. # An external_id is still required. - def initialize(e, name:, type:, sets:, usecase: nil, accept_update_with_no_id: false) - raise "A name is required to refer a job. Given: #{name}" if !name - raise "Type should be one of #{self.class.types}. Given: #{type}" unless self.class.valid_type?(type) - raise "Sets should be some of #{self.class.sets}. Given: #{sets}" unless self.class.valid_sets?(sets) - raise "usecase must be a Eco::API::UseCases::UseCase object. Given: #{usecase.class}" if usecase && !usecase.is_a?(Eco::API::UseCases::UseCase) - super(e) + def initialize(ev, name:, type:, sets:, usecase: nil, accept_update_with_no_id: false) + msg = "A name is required to refer a job. Given: '#{name}'" + raise msg unless name + msg = "Type should be one of #{self.class.types}. Given: #{type}" + raise msg unless self.class.valid_type?(type) + + msg = "Sets should be some of #{self.class.sets}. Given: #{sets}" + raise msg unless self.class.valid_sets?(sets) + + msg = "usecase must be a Eco::API::UseCases::UseCase object. Given: #{usecase.class}" + raise msg if usecase && !usecase.is_a?(Eco::API::UseCases::UseCase) + + super(ev) + @name = name @type = type @sets = [sets].flatten.compact @usecase = usecase @feedback = Eco::API::Session::Batch::Feedback.new(job: self) @@ -67,16 +82,25 @@ # @note # * this job will not be linked to the `Batch::Jobs` model of the current session # * mostly used for error_handlers # @return [Eco::API::Session::Batch::Job] def dup(name = "ad-hoc:job-from:#{self.name}", usecase: self.usecase) - self.class.new(enviro, name: name, type: type, sets: sets, usecase: usecase) + self.class.new( + enviro, + name: name, + type: type, + sets: sets, + usecase: usecase + ) end # @return [Eco::API::Session::Batch::Jobs] group of subjobs of this `Batch::Job` def subjobs - @subjobs ||= Eco::API::Session::Batch::Jobs.new(enviro, name: "childs-of:#{self.name}") + @subjobs ||= Eco::API::Session::Batch::Jobs.new( + enviro, + name: "childs-of:#{name}" + ) end # @return [Boolean] `true` if the current batch job is a result of an error_handler def error_handler? usecase? && usecase.is_a?(Eco::API::Error::Handler) @@ -91,28 +115,31 @@ def options usecase?? usecase.options : {} end # Adds an entry(ies) to the job queue. - # @param entry [Ecoportal::API::V1::Person, Enumerable<Person>] the person(s) we want to update, carrying the changes to be done. + # @param entry [Ecoportal::API::V1::Person, Enumerable<Person>] + # the person(s) we want to update, carrying the changes to be done. # @param unique [Boolean] specifies if repeated entries should be avoided in the queue. - # @yield [person] callback before launching the batch job request against the server. - # @yieldparam person [Person] current person object that that should be treated by the callback before launching the batch. + # @yield [person] callback before launching the batch job request + # against the server. + # @yieldparam person [Person] current person object that that should be + # treated by the callback before launching the batch. # @return [Eco::API::Session::Batch::Job] this `Batch::Job`. def add(entry, unique: true, &block) case entry when Enumerable - entry.each {|e| add(e, unique: unique, &block)} + entry.each {|el| add(el, unique: unique, &block)} else - unless !entry - unless unique && @queue_hash.key?(entry) - @queue_hash[entry] = true - @queue.push(entry) - @callbacks[entry] = block if block_given? - end - end + return self unless entry + return self if unique && @queue_hash.key?(entry) + + @queue_hash[entry] = true + @queue.push(entry) + @callbacks[entry] = block if block_given? end + self end # @return [Boolean] has been this `batch job` launched? def pending? @@ -121,11 +148,13 @@ # @note it requires launch to be firstly invoked # @raise [Exception] if 'launch' has not firstly invoked # @return [Enumbrable<Hash>] the last requests that the queue will generate def requests - raise "Method missuse. Firstly 'launch' should be invoked" unless instance_variable_defined?(:@requests) + msg = "Method missuse. Firstly 'launch' should be invoked" + raise msg unless instance_variable_defined?(:@requests) + @requests end # @see Eco::API::Session::Batch::Feedback#request_stats def request_stats(requests = nil) @@ -133,11 +162,11 @@ end # @see Eco::API::Session::Batch::Status#errors? # @return [Boolean] `true` if there were Server errors, `false` otherwise def errors? - status && status.errors? + status&.errors? end # Helper/shortcut to obtain a people object out of `input` # @note if `input` is not provided, it will use `queue` # @return [Eco::API::Organization::People] @@ -146,250 +175,336 @@ end # Processes the `queue` and, unless `simulate` is `true`, launches against the server: # 1. pre_processes the queue obtaining the `requests`: # - if the entries of `queue` got pending _callbacks_ (delayed changes), it processes them - # - unless type == `:create`: if there's a defined `api_excluded` _callback_ it calls it (see {Eco::API::Session::Config::People#api_excluded}) + # - unless type == `:create`: if there's a defined `api_excluded` + # _callback_ it calls it (see {Eco::API::Session::Config::People#api_excluded}) # - transforms the result to a `Eco::API::Organization::People` object - # - if there are `api policies` defined, it passes the entries through them in order (see {Eco::API::Session::Config#policies}) + # - if there are `api policies` defined, it passes the entries through + # them in order (see {Eco::API::Session::Config#policies}) # - this step is **skipped** if the option `-skip-api-policies` was used in the command line # - at this point all the transformations have taken place... - # - only include the entries that, after all above, still hold pending changes (`!as_update.empty?`) to be launched as update + # - only include the entries that, after all above, still hold pending changes + # (`!as_update.empty?`) to be launched as update # 2. pre launch checks against the `requests`: # - it generates `stats` (`Eco::API::Session::Batch::RequestStats`) out of the requests - # - if there is a batch policy declared for the current job `type`, it checks compliance against `stats` (see {Eco::API::Session::Batch::Policies}), + # - if there is a batch policy declared for the current job `type`, it + # checks compliance against `stats` (see {Eco::API::Session::Batch::Policies}), # - a non-compliant batch will stop the current session by raising an `Exception` # - this setp is **skipped** if the option `-skip-batch-policy` was used in the command line # 3. if we are **not** in `dry-run` (or `simulate`), it: - # - backs up the raw queries (`requests`) launched to the Server, if we are **not** in `dry-run` (or `simulate`) + # - backs up the raw queries (`requests`) launched to the Server, + # if we are **not** in `dry-run` (or `simulate`) # - **launches the batch** request against the _Server_ (see {Eco::API::Session::Batch#launch}) # - links the resulting batch `status` to this `Batch::Job` (see {Eco::API::Session::Batch::Status}) # - prints any `errors` replied by the _Server_ # 4. the post launch kicks in, and: - # - for success requests, it consolidates the associated entries (see `Ecoportal::API::V1::Person#consolidate!`) - # - launches specific error handlers, if there were **errors** from the Server as a result of the `batch.launch`, and there are `Error::Handlers` defined + # - for success requests, it consolidates the associated entries + # (see `Ecoportal::API::V1::Person#consolidate!`) + # - launches specific error handlers, if there were **errors** from the Server + # as a result of the `batch.launch`, and there are `Error::Handlers` defined # @return [Eco::API::Session::Batch::Status] - def launch(simulate: false) + def launch(simulate: false) # rubocop:disable Metrics/AbcSize pqueue = processed_queue @requests = as_update(pqueue) pre_checks(requests, simulate: simulate) if simulate if options.dig(:requests, :backup) req_backup = as_update(pqueue, add_feedback: false) backup_update(req_backup, simulate: simulate) end - else - if pqueue.length > 0 - req_backup = as_update(pqueue, add_feedback: false) - backup_update(req_backup) - logger.debug("Job ('#{name}':#{type}): going to launch batch against #{pqueue.count} entries") - session.batch.launch(pqueue, method: type).tap do |job_status| - @status = job_status - status.root = self - status.errors.print - end + elsif !pqueue.empty? + req_backup = as_update(pqueue, add_feedback: false) + backup_update(req_backup) + log(:debug) { + "Job ('#{name}':#{type}): going to launch batch against #{pqueue.count} entries" + } + + session.batch.launch(pqueue, method: type).tap do |job_status| + @status = job_status + status.root = self + status.errors.print end end unless requests.empty? || !simulate - logger.info("--- simulate mode (dry-run) -- job '#{name}' -- this would have launched #{type.to_s.upcase}") + msg = "--- simulate mode (dry-run) -- job '#{name}' " + msg << "-- this would have launched #{type.to_s.upcase}" + log(:info) { msg } end post_launch(queue: pqueue, simulate: simulate) @pending = false - return status + + status end # Provides a text summary of the current status including: # 1. stats of the changes introduced by the job in the different parts of the person model # 2. if the job is compliant with the batch policy # 3. error messages in case they were errors from the server # @note if `launch` was not invoked, it specifies so # @return [String] the summary - def summary + def summary # rubocop:disable Metrics/AbcSize [].tap do |msg| if pending? msg << "PENDING - Batch #{type.to_s.upcase} - job '#{name}' - length: #{@queue.length}" else - msg << feedback.generate(requests, only_stats: true) + msg << feedback.generate(requests, only_stats: true) + if batch_policy && !batch_policy.compliant?(request_stats) msg << "Batch Policy Uncompliance:" msg << batch_policy.uncompliance(request_stats) end - msg << status.errors.message unless !status + msg << status.errors.message if status msg << subjobs_summary end end.join("\n") end private def subjobs_summary - return "" unless subjobs.count > 0 + return "" unless subjobs.count.positive? + [].tap do |msg| subjobs.map {|subjob| msg << subjob.summary} end.join("\n") end def as_update(data, **kargs) - if data.is_a?(Array) - data.map do |e| - feedback.as_update(e, **kargs) - end.compact.select {|e| e && !e.empty?} - else - feedback.as_update(data, **kargs) - end + return feedback.as_update(data, **kargs) unless data.is_a?(Array) + + data.map do |el| + feedback.as_update(el, **kargs) + end.compact.reject(&:empty?) end # Method to generate the base of people that will be present in the queue # @note # - If the entry is a new person, we are not in a creation job and the person doesn't have `id` # it means that it failed to be created (it doesn't exist on server-side). # The entry won't be included. # - The contingency above wouldn't be necessary if the server worked perfectly. def processed_queue - pre_filtered = @queue.select do |entry| + pre_filtered = pre_filtered_queue + pre_filtered.each do |el| + @callbacks[el].call(el) if @callbacks.key?(el) + end + + apply_policies(api_included(pre_filtered)).reject do |el| + as_update(el).empty? + end.select do |el| + next true unless el.is_a?(Ecoportal::API::V1::Person) + next true unless el.new? + + # new people should either have account or details + el.account || el.details + end + end + + def pre_filtered_queue + @queue.select do |entry| by_pass_filter = false - if unexisting = entry.new? && !entry.id && type != :create + + if (unexisting = entry.new? && !entry.id && type != :create) ref = Eco::API::Session::Batch::Feedback.person_ref(entry) + msg = "Job ('#{name}':#{type}): " if (by_pass_filter = entry.external_id && @accept_update_with_no_id) - msg << "entry errored on creation (failed creation) but will try with person code: #{ref}" + msg << "entry errored on creation (failed creation)" + msg << " but will try with person code: #{ref}" else msg << "excluded unexisting entry (failed creation): #{ref}" end - session.logger.warn(msg) + + log(:warn) { msg } end + !unexisting || by_pass_filter end - pre_filtered.each {|e| @callbacks[e].call(e) if @callbacks.key?(e) } - apply_policies(api_included(pre_filtered)).select do |e| - !as_update(e).empty? - end.select do |e| - next true unless e.is_a?(Ecoportal::API::V1::Person) - next true unless e.new? - # new people should either have account or details - e.account || e.details - end end # if there is a config definition to exclude entries # and the current batch is not a creation batch # - filter out excluded entries from the api update def api_included(full_queue) return full_queue if type == :create - return full_queue unless excluded_callback = session.config.people.api_excluded + return full_queue unless (excluded_callback = session.config.people.api_excluded) inc_excluded = options.dig(:include, :excluded) excluded_only = inc_excluded.is_a?(Hash) && excluded[:only] - is_excluded = Proc.new do |entry| + + is_excluded = proc do |entry| evaluate(entry, session, options, self, &excluded_callback) end + return full_queue.select(&is_excluded) if excluded_only return full_queue if inc_excluded + full_queue.reject(&is_excluded) end # Applies the changes introduced by api policies def apply_policies(pre_queue) people(pre_queue).tap do |entries| + next if options.dig(:skip, :api_policies) + policies = session.policies - unless policies.empty? || options.dig(:skip, :api_policies) - policies.launch(people: entries, session: session, options: options, job: self) - end + next if policies.empty? + + policies.launch( + people: entries, + session: session, + options: options, + job: self + ) end end # Shortcut to get the batch (belt) policy def batch_policy - unless options.dig(:skip, :batch_policy) - @batch_policy ||= session.config.batch_policies[self.type] - end + return if options.dig(:skip, :batch_policy) + + @batch_policy ||= session.config.batch_policies[type] end # Checks batch policy compliance and displays the feedback on request stats def pre_checks(requests, simulate: false) only_stats = options.dig(:feedback, :only_stats) max_chars = simulate ? 2500 : 800 - msg = feedback.generate(requests, max_chars: max_chars, only_stats: only_stats) - logger.info(msg) + msg = feedback.generate( + requests, + max_chars: max_chars, + only_stats: only_stats + ) + log(:info) { msg } + + return unless batch_policy + # batch_policy - stats = request_stats(requests) - if simulate && batch_policy && !batch_policy.compliant?(stats) - logger.warn("Batch Policy Uncompliance: this and next batches will be aborted!") - logger.warn(batch_policy.uncompliance(stats)) - elsif batch_policy + stats = request_stats(requests) + + if simulate && !batch_policy.compliant?(stats) + msg = "Batch Policy Uncompliance: this and next batches will be aborted!\n" + msg << batch_policy.uncompliance(stats) + log(:warn) { msg } + else # will throw an Exception if the policy request_stats is not compliant batch_policy.validate!(stats) end end # after launched to the server # 1. `consolidate!` person model if succeeded (person.doc -> person.original_doc) # 2. if there were errors: launch specific error handlers if they are defined for the type of error def post_launch(queue: [], simulate: false) - if !simulate && status - status.queue.map do |entry| - if status.success?(entry) - if type == :create && entry.respond_to?(:id=) - entry.id = status[entry].body["id"].tap do |id| - if id.to_s.strip.empty? - ref = Eco::API::Session::Batch::Feedback.person_ref(entry) - msg = "Entry has been created but API did not return its 'id': #{ref}" - if response = status[entry] - msg << " (Response - status: #{response.status.code}; body: #{response.body.pretty_inspect})" - end - logger.error(msg) - end - end - end - if entry.respond_to?(:consolidate!) - entry.consolidate! - if entry.respond_to?(:dirty?) && entry.dirty? && entry.respond_to?(:as_update) - msg = "After consolidate there's still a dirty model:\n" - msg << entry.as_update.pretty_inspect - logger.debug(msg) - end - end - #else # do not entry.reset! (keep track on changes still) - end + return consolidate_changes_dry_run!(queue: queue) if simulate + + consolidate_changes_actual_run! + run_error_handlers! + end + + def consolidate_changes_dry_run!(queue: []) + fake_id = 111_111_111_111_111_111_111_111 + + queue.each do |entry| + if type == :create && entry.respond_to?(:id=) + entry.id = fake_id.to_s + fake_id += 1 end - # launch error_handlers - handlers = session.config.error_handlers - if status.errors.any? && !handlers.empty? && !error_handler? - err_types = status.errors.by_type - logger.debug("(#{self.name}) got these error types: #{err_types.keys}") - handlers.each do |handler| - if entries = err_types[handler.name] - handler_job = subjobs_add("#{self.name} => #{handler.name}", usecase: handler) - logger.debug("Running error handler #{handler.name} (against #{entries.count} entries)") - handler.launch(people: people(entries), session: session, options: options, job: handler_job) - logger.debug("Launching job of error handler: #{handler_job.name}") - handler_job.launch(simulate: simulate) + + entry.consolidate! if entry.respond_to?(:consolidate!) + end + end + + def consolidate_changes_actual_run! # rubocop:disable Metrics/AbcSize + return unless status + + status.queue.each do |entry| + next unless status.success?(entry) + + if type == :create && entry.respond_to?(:id=) + entry.id = status[entry].body["id"].tap do |id| + next unless id.to_s.strip.empty? + + ref = Eco::API::Session::Batch::Feedback.person_ref(entry) + msg = "Entry has been created but API did not return its 'id': #{ref}" + + if (response = status[entry]) + msg << " (Response - status: #{response.status.code}; " + msg << "body: #{response.body.pretty_inspect})" end + + log(:error) { msg } end end - elsif simulate - fake_id = 111111111111111111111111 - queue.map do |entry| - if type == :create && entry.respond_to?(:id=) - entry.id = fake_id.to_s - fake_id += 1 - end - entry.consolidate! if entry.respond_to?(:consolidate!) - end + + next unless entry.respond_to?(:consolidate!) + + entry.consolidate! + + next unless entry.respond_to?(:dirty?) + next unless entry.dirty? + next unless entry.respond_to?(:as_update) + + msg = "After consolidate there's still a dirty model:\n" + msg << entry.as_update.pretty_inspect + log(:debug) { msg } end end + def run_error_handlers! # rubocop:disable Metrics/AbcSize + return if error_handler? + return unless status + return unless status.errors.any? + + handlers = session.config.error_handlers + return if handlers.empty? + + # launch error_handlers + err_types = status.errors.by_type + log(:debug) { + "(#{name}) got these error types: #{err_types.keys}" + } + + handlers.each do |handler| + next unless (entries = err_types[handler.name]) + + handler_job = subjobs_add( + "#{name} => #{handler.name}", + usecase: handler + ) + + msg = "Running error handler #{handler.name}" + msg << " (against #{entries.count} entries)" + log(:debug) { msg } + + handler.launch( + people: people(entries), + session: session, + options: options, + job: handler_job + ) + + log(:debug) { + "Launching job of error handler: #{handler_job.name}" + } + + handler_job.launch(simulate: false) + end + end + # Keep a copy of the requests for future reference def backup_update(requests, simulate: false) dry_run = simulate ? "_dry_run" : "" dir = config.people.requests_folder - filename = name.split(" ").join("-").gsub(/[=\\\/><,"-]+/,"_") + filename = name.split(" ").join("-").gsub(/[=\\\/><,"-]+/, "_") # rubocop:disable Style/RedundantArgument file = File.join(dir, "#{type}_data_#{filename}#{dry_run}.json") file_manager.save_json(requests, file, :timestamp) end # Adds a job tied to the current job @@ -402,5 +517,7 @@ end end end end end + +# rubocop:enable Naming/MethodParameterName