lib/eco/api/session/batch/job.rb in eco-helpers-3.0.4 vs lib/eco/api/session/batch/job.rb in eco-helpers-3.0.5
- old
+ new
@@ -1,20 +1,25 @@
+# rubocop:disable Naming/MethodParameterName
module Eco
module API
class Session
class Batch
# @attr_reader name [String] the name of this `batch job`
# @attr_reader type [Symbol] a valid batch operation
- # @attr_reader sets [Array<Symbol>] the parts of the person model this batch is supposed to affect
- # @attr_reader usecase [Eco::API::UseCases::UseCase, nil] when provided: `usecase` that generated this `batch job`
- # @attr_reader status [Eco::API::Session::Batch::Status] if launched: the `status` of the `batch`
- # @attr_reader feedback [Eco::API::Session::Batch::Feedback] helper class for feedback and end-user decision making
+ # @attr_reader sets [Array<Symbol>] the parts of the person model this
+ # batch is supposed to affect
+ # @attr_reader usecase [Eco::API::UseCases::UseCase, nil] when provided:
+ # `usecase` that generated this `batch job`
+ # @attr_reader status [Eco::API::Session::Batch::Status] if launched:
+ # the `status` of the `batch`
+ # @attr_reader feedback [Eco::API::Session::Batch::Feedback] helper class
+ # for feedback and end-user decision making
class Job < Eco::API::Common::Session::BaseSession
include Eco::Language::Methods::DslAble
- @types = [:get, :create, :update, :delete]
- @sets = [:core, :details, :account]
+ @types = %i[get create update delete]
+ @sets = %i[core details account]
class << self
attr_reader :types, :sets
def valid_type?(value)
@@ -29,25 +34,35 @@
attr_reader :name, :type, :sets
attr_reader :usecase
attr_reader :status, :feedback
- # @param e [Eco::API::Common::Session::Environment] requires a session environment, as any child of `Eco::API::Common::Session::BaseSession`.
+ # @param ev [Eco::API::Common::Session::Environment] requires a session environment, as
+ # any child of `Eco::API::Common::Session::BaseSession`.
# @param name [String] the name of this `batch job`
# @param type [Symbol] a valid batch operation.
# @param usecase [Eco::API::UseCases::UseCase, nil] when provided: `usecase` that generated this `batch job`.
- # This is necessary to know the `options` used to run the usecase, which could modify the `Batch::Job` behviour.
+ # This is necessary to know the `options` used to run the usecase, which could modify the
+ # `Batch::Job` behviour.
# @param accept_update_with_no_id [Boolean] temporary contingency
# This parameter has been added due to a bug on server side.
# An external_id is still required.
- def initialize(e, name:, type:, sets:, usecase: nil, accept_update_with_no_id: false)
- raise "A name is required to refer a job. Given: #{name}" if !name
- raise "Type should be one of #{self.class.types}. Given: #{type}" unless self.class.valid_type?(type)
- raise "Sets should be some of #{self.class.sets}. Given: #{sets}" unless self.class.valid_sets?(sets)
- raise "usecase must be a Eco::API::UseCases::UseCase object. Given: #{usecase.class}" if usecase && !usecase.is_a?(Eco::API::UseCases::UseCase)
- super(e)
+ def initialize(ev, name:, type:, sets:, usecase: nil, accept_update_with_no_id: false)
+ msg = "A name is required to refer a job. Given: '#{name}'"
+ raise msg unless name
+ msg = "Type should be one of #{self.class.types}. Given: #{type}"
+ raise msg unless self.class.valid_type?(type)
+
+ msg = "Sets should be some of #{self.class.sets}. Given: #{sets}"
+ raise msg unless self.class.valid_sets?(sets)
+
+ msg = "usecase must be a Eco::API::UseCases::UseCase object. Given: #{usecase.class}"
+ raise msg if usecase && !usecase.is_a?(Eco::API::UseCases::UseCase)
+
+ super(ev)
+
@name = name
@type = type
@sets = [sets].flatten.compact
@usecase = usecase
@feedback = Eco::API::Session::Batch::Feedback.new(job: self)
@@ -67,16 +82,25 @@
# @note
# * this job will not be linked to the `Batch::Jobs` model of the current session
# * mostly used for error_handlers
# @return [Eco::API::Session::Batch::Job]
def dup(name = "ad-hoc:job-from:#{self.name}", usecase: self.usecase)
- self.class.new(enviro, name: name, type: type, sets: sets, usecase: usecase)
+ self.class.new(
+ enviro,
+ name: name,
+ type: type,
+ sets: sets,
+ usecase: usecase
+ )
end
# @return [Eco::API::Session::Batch::Jobs] group of subjobs of this `Batch::Job`
def subjobs
- @subjobs ||= Eco::API::Session::Batch::Jobs.new(enviro, name: "childs-of:#{self.name}")
+ @subjobs ||= Eco::API::Session::Batch::Jobs.new(
+ enviro,
+ name: "childs-of:#{name}"
+ )
end
# @return [Boolean] `true` if the current batch job is a result of an error_handler
def error_handler?
usecase? && usecase.is_a?(Eco::API::Error::Handler)
@@ -91,28 +115,31 @@
def options
usecase?? usecase.options : {}
end
# Adds an entry(ies) to the job queue.
- # @param entry [Ecoportal::API::V1::Person, Enumerable<Person>] the person(s) we want to update, carrying the changes to be done.
+ # @param entry [Ecoportal::API::V1::Person, Enumerable<Person>]
+ # the person(s) we want to update, carrying the changes to be done.
# @param unique [Boolean] specifies if repeated entries should be avoided in the queue.
- # @yield [person] callback before launching the batch job request against the server.
- # @yieldparam person [Person] current person object that that should be treated by the callback before launching the batch.
+ # @yield [person] callback before launching the batch job request
+ # against the server.
+ # @yieldparam person [Person] current person object that that should be
+ # treated by the callback before launching the batch.
# @return [Eco::API::Session::Batch::Job] this `Batch::Job`.
def add(entry, unique: true, &block)
case entry
when Enumerable
- entry.each {|e| add(e, unique: unique, &block)}
+ entry.each {|el| add(el, unique: unique, &block)}
else
- unless !entry
- unless unique && @queue_hash.key?(entry)
- @queue_hash[entry] = true
- @queue.push(entry)
- @callbacks[entry] = block if block_given?
- end
- end
+ return self unless entry
+ return self if unique && @queue_hash.key?(entry)
+
+ @queue_hash[entry] = true
+ @queue.push(entry)
+ @callbacks[entry] = block if block_given?
end
+
self
end
# @return [Boolean] has been this `batch job` launched?
def pending?
@@ -121,11 +148,13 @@
# @note it requires launch to be firstly invoked
# @raise [Exception] if 'launch' has not firstly invoked
# @return [Enumbrable<Hash>] the last requests that the queue will generate
def requests
- raise "Method missuse. Firstly 'launch' should be invoked" unless instance_variable_defined?(:@requests)
+ msg = "Method missuse. Firstly 'launch' should be invoked"
+ raise msg unless instance_variable_defined?(:@requests)
+
@requests
end
# @see Eco::API::Session::Batch::Feedback#request_stats
def request_stats(requests = nil)
@@ -133,11 +162,11 @@
end
# @see Eco::API::Session::Batch::Status#errors?
# @return [Boolean] `true` if there were Server errors, `false` otherwise
def errors?
- status && status.errors?
+ status&.errors?
end
# Helper/shortcut to obtain a people object out of `input`
# @note if `input` is not provided, it will use `queue`
# @return [Eco::API::Organization::People]
@@ -146,250 +175,336 @@
end
# Processes the `queue` and, unless `simulate` is `true`, launches against the server:
# 1. pre_processes the queue obtaining the `requests`:
# - if the entries of `queue` got pending _callbacks_ (delayed changes), it processes them
- # - unless type == `:create`: if there's a defined `api_excluded` _callback_ it calls it (see {Eco::API::Session::Config::People#api_excluded})
+ # - unless type == `:create`: if there's a defined `api_excluded`
+ # _callback_ it calls it (see {Eco::API::Session::Config::People#api_excluded})
# - transforms the result to a `Eco::API::Organization::People` object
- # - if there are `api policies` defined, it passes the entries through them in order (see {Eco::API::Session::Config#policies})
+ # - if there are `api policies` defined, it passes the entries through
+ # them in order (see {Eco::API::Session::Config#policies})
# - this step is **skipped** if the option `-skip-api-policies` was used in the command line
# - at this point all the transformations have taken place...
- # - only include the entries that, after all above, still hold pending changes (`!as_update.empty?`) to be launched as update
+ # - only include the entries that, after all above, still hold pending changes
+ # (`!as_update.empty?`) to be launched as update
# 2. pre launch checks against the `requests`:
# - it generates `stats` (`Eco::API::Session::Batch::RequestStats`) out of the requests
- # - if there is a batch policy declared for the current job `type`, it checks compliance against `stats` (see {Eco::API::Session::Batch::Policies}),
+ # - if there is a batch policy declared for the current job `type`, it
+ # checks compliance against `stats` (see {Eco::API::Session::Batch::Policies}),
# - a non-compliant batch will stop the current session by raising an `Exception`
# - this setp is **skipped** if the option `-skip-batch-policy` was used in the command line
# 3. if we are **not** in `dry-run` (or `simulate`), it:
- # - backs up the raw queries (`requests`) launched to the Server, if we are **not** in `dry-run` (or `simulate`)
+ # - backs up the raw queries (`requests`) launched to the Server,
+ # if we are **not** in `dry-run` (or `simulate`)
# - **launches the batch** request against the _Server_ (see {Eco::API::Session::Batch#launch})
# - links the resulting batch `status` to this `Batch::Job` (see {Eco::API::Session::Batch::Status})
# - prints any `errors` replied by the _Server_
# 4. the post launch kicks in, and:
- # - for success requests, it consolidates the associated entries (see `Ecoportal::API::V1::Person#consolidate!`)
- # - launches specific error handlers, if there were **errors** from the Server as a result of the `batch.launch`, and there are `Error::Handlers` defined
+ # - for success requests, it consolidates the associated entries
+ # (see `Ecoportal::API::V1::Person#consolidate!`)
+ # - launches specific error handlers, if there were **errors** from the Server
+ # as a result of the `batch.launch`, and there are `Error::Handlers` defined
# @return [Eco::API::Session::Batch::Status]
- def launch(simulate: false)
+ def launch(simulate: false) # rubocop:disable Metrics/AbcSize
pqueue = processed_queue
@requests = as_update(pqueue)
pre_checks(requests, simulate: simulate)
if simulate
if options.dig(:requests, :backup)
req_backup = as_update(pqueue, add_feedback: false)
backup_update(req_backup, simulate: simulate)
end
- else
- if pqueue.length > 0
- req_backup = as_update(pqueue, add_feedback: false)
- backup_update(req_backup)
- logger.debug("Job ('#{name}':#{type}): going to launch batch against #{pqueue.count} entries")
- session.batch.launch(pqueue, method: type).tap do |job_status|
- @status = job_status
- status.root = self
- status.errors.print
- end
+ elsif !pqueue.empty?
+ req_backup = as_update(pqueue, add_feedback: false)
+ backup_update(req_backup)
+ log(:debug) {
+ "Job ('#{name}':#{type}): going to launch batch against #{pqueue.count} entries"
+ }
+
+ session.batch.launch(pqueue, method: type).tap do |job_status|
+ @status = job_status
+ status.root = self
+ status.errors.print
end
end
unless requests.empty? || !simulate
- logger.info("--- simulate mode (dry-run) -- job '#{name}' -- this would have launched #{type.to_s.upcase}")
+ msg = "--- simulate mode (dry-run) -- job '#{name}' "
+ msg << "-- this would have launched #{type.to_s.upcase}"
+ log(:info) { msg }
end
post_launch(queue: pqueue, simulate: simulate)
@pending = false
- return status
+
+ status
end
# Provides a text summary of the current status including:
# 1. stats of the changes introduced by the job in the different parts of the person model
# 2. if the job is compliant with the batch policy
# 3. error messages in case they were errors from the server
# @note if `launch` was not invoked, it specifies so
# @return [String] the summary
- def summary
+ def summary # rubocop:disable Metrics/AbcSize
[].tap do |msg|
if pending?
msg << "PENDING - Batch #{type.to_s.upcase} - job '#{name}' - length: #{@queue.length}"
else
- msg << feedback.generate(requests, only_stats: true)
+ msg << feedback.generate(requests, only_stats: true)
+
if batch_policy && !batch_policy.compliant?(request_stats)
msg << "Batch Policy Uncompliance:"
msg << batch_policy.uncompliance(request_stats)
end
- msg << status.errors.message unless !status
+ msg << status.errors.message if status
msg << subjobs_summary
end
end.join("\n")
end
private
def subjobs_summary
- return "" unless subjobs.count > 0
+ return "" unless subjobs.count.positive?
+
[].tap do |msg|
subjobs.map {|subjob| msg << subjob.summary}
end.join("\n")
end
def as_update(data, **kargs)
- if data.is_a?(Array)
- data.map do |e|
- feedback.as_update(e, **kargs)
- end.compact.select {|e| e && !e.empty?}
- else
- feedback.as_update(data, **kargs)
- end
+ return feedback.as_update(data, **kargs) unless data.is_a?(Array)
+
+ data.map do |el|
+ feedback.as_update(el, **kargs)
+ end.compact.reject(&:empty?)
end
# Method to generate the base of people that will be present in the queue
# @note
# - If the entry is a new person, we are not in a creation job and the person doesn't have `id`
# it means that it failed to be created (it doesn't exist on server-side).
# The entry won't be included.
# - The contingency above wouldn't be necessary if the server worked perfectly.
def processed_queue
- pre_filtered = @queue.select do |entry|
+ pre_filtered = pre_filtered_queue
+ pre_filtered.each do |el|
+ @callbacks[el].call(el) if @callbacks.key?(el)
+ end
+
+ apply_policies(api_included(pre_filtered)).reject do |el|
+ as_update(el).empty?
+ end.select do |el|
+ next true unless el.is_a?(Ecoportal::API::V1::Person)
+ next true unless el.new?
+
+ # new people should either have account or details
+ el.account || el.details
+ end
+ end
+
+ def pre_filtered_queue
+ @queue.select do |entry|
by_pass_filter = false
- if unexisting = entry.new? && !entry.id && type != :create
+
+ if (unexisting = entry.new? && !entry.id && type != :create)
ref = Eco::API::Session::Batch::Feedback.person_ref(entry)
+
msg = "Job ('#{name}':#{type}): "
if (by_pass_filter = entry.external_id && @accept_update_with_no_id)
- msg << "entry errored on creation (failed creation) but will try with person code: #{ref}"
+ msg << "entry errored on creation (failed creation)"
+ msg << " but will try with person code: #{ref}"
else
msg << "excluded unexisting entry (failed creation): #{ref}"
end
- session.logger.warn(msg)
+
+ log(:warn) { msg }
end
+
!unexisting || by_pass_filter
end
- pre_filtered.each {|e| @callbacks[e].call(e) if @callbacks.key?(e) }
- apply_policies(api_included(pre_filtered)).select do |e|
- !as_update(e).empty?
- end.select do |e|
- next true unless e.is_a?(Ecoportal::API::V1::Person)
- next true unless e.new?
- # new people should either have account or details
- e.account || e.details
- end
end
# if there is a config definition to exclude entries
# and the current batch is not a creation batch
# - filter out excluded entries from the api update
def api_included(full_queue)
return full_queue if type == :create
- return full_queue unless excluded_callback = session.config.people.api_excluded
+ return full_queue unless (excluded_callback = session.config.people.api_excluded)
inc_excluded = options.dig(:include, :excluded)
excluded_only = inc_excluded.is_a?(Hash) && excluded[:only]
- is_excluded = Proc.new do |entry|
+
+ is_excluded = proc do |entry|
evaluate(entry, session, options, self, &excluded_callback)
end
+
return full_queue.select(&is_excluded) if excluded_only
return full_queue if inc_excluded
+
full_queue.reject(&is_excluded)
end
# Applies the changes introduced by api policies
def apply_policies(pre_queue)
people(pre_queue).tap do |entries|
+ next if options.dig(:skip, :api_policies)
+
policies = session.policies
- unless policies.empty? || options.dig(:skip, :api_policies)
- policies.launch(people: entries, session: session, options: options, job: self)
- end
+ next if policies.empty?
+
+ policies.launch(
+ people: entries,
+ session: session,
+ options: options,
+ job: self
+ )
end
end
# Shortcut to get the batch (belt) policy
def batch_policy
- unless options.dig(:skip, :batch_policy)
- @batch_policy ||= session.config.batch_policies[self.type]
- end
+ return if options.dig(:skip, :batch_policy)
+
+ @batch_policy ||= session.config.batch_policies[type]
end
# Checks batch policy compliance and displays the feedback on request stats
def pre_checks(requests, simulate: false)
only_stats = options.dig(:feedback, :only_stats)
max_chars = simulate ? 2500 : 800
- msg = feedback.generate(requests, max_chars: max_chars, only_stats: only_stats)
- logger.info(msg)
+ msg = feedback.generate(
+ requests,
+ max_chars: max_chars,
+ only_stats: only_stats
+ )
+ log(:info) { msg }
+
+ return unless batch_policy
+
# batch_policy
- stats = request_stats(requests)
- if simulate && batch_policy && !batch_policy.compliant?(stats)
- logger.warn("Batch Policy Uncompliance: this and next batches will be aborted!")
- logger.warn(batch_policy.uncompliance(stats))
- elsif batch_policy
+ stats = request_stats(requests)
+
+ if simulate && !batch_policy.compliant?(stats)
+ msg = "Batch Policy Uncompliance: this and next batches will be aborted!\n"
+ msg << batch_policy.uncompliance(stats)
+ log(:warn) { msg }
+ else
# will throw an Exception if the policy request_stats is not compliant
batch_policy.validate!(stats)
end
end
# after launched to the server
# 1. `consolidate!` person model if succeeded (person.doc -> person.original_doc)
# 2. if there were errors: launch specific error handlers if they are defined for the type of error
def post_launch(queue: [], simulate: false)
- if !simulate && status
- status.queue.map do |entry|
- if status.success?(entry)
- if type == :create && entry.respond_to?(:id=)
- entry.id = status[entry].body["id"].tap do |id|
- if id.to_s.strip.empty?
- ref = Eco::API::Session::Batch::Feedback.person_ref(entry)
- msg = "Entry has been created but API did not return its 'id': #{ref}"
- if response = status[entry]
- msg << " (Response - status: #{response.status.code}; body: #{response.body.pretty_inspect})"
- end
- logger.error(msg)
- end
- end
- end
- if entry.respond_to?(:consolidate!)
- entry.consolidate!
- if entry.respond_to?(:dirty?) && entry.dirty? && entry.respond_to?(:as_update)
- msg = "After consolidate there's still a dirty model:\n"
- msg << entry.as_update.pretty_inspect
- logger.debug(msg)
- end
- end
- #else # do not entry.reset! (keep track on changes still)
- end
+ return consolidate_changes_dry_run!(queue: queue) if simulate
+
+ consolidate_changes_actual_run!
+ run_error_handlers!
+ end
+
+ def consolidate_changes_dry_run!(queue: [])
+ fake_id = 111_111_111_111_111_111_111_111
+
+ queue.each do |entry|
+ if type == :create && entry.respond_to?(:id=)
+ entry.id = fake_id.to_s
+ fake_id += 1
end
- # launch error_handlers
- handlers = session.config.error_handlers
- if status.errors.any? && !handlers.empty? && !error_handler?
- err_types = status.errors.by_type
- logger.debug("(#{self.name}) got these error types: #{err_types.keys}")
- handlers.each do |handler|
- if entries = err_types[handler.name]
- handler_job = subjobs_add("#{self.name} => #{handler.name}", usecase: handler)
- logger.debug("Running error handler #{handler.name} (against #{entries.count} entries)")
- handler.launch(people: people(entries), session: session, options: options, job: handler_job)
- logger.debug("Launching job of error handler: #{handler_job.name}")
- handler_job.launch(simulate: simulate)
+
+ entry.consolidate! if entry.respond_to?(:consolidate!)
+ end
+ end
+
+ def consolidate_changes_actual_run! # rubocop:disable Metrics/AbcSize
+ return unless status
+
+ status.queue.each do |entry|
+ next unless status.success?(entry)
+
+ if type == :create && entry.respond_to?(:id=)
+ entry.id = status[entry].body["id"].tap do |id|
+ next unless id.to_s.strip.empty?
+
+ ref = Eco::API::Session::Batch::Feedback.person_ref(entry)
+ msg = "Entry has been created but API did not return its 'id': #{ref}"
+
+ if (response = status[entry])
+ msg << " (Response - status: #{response.status.code}; "
+ msg << "body: #{response.body.pretty_inspect})"
end
+
+ log(:error) { msg }
end
end
- elsif simulate
- fake_id = 111111111111111111111111
- queue.map do |entry|
- if type == :create && entry.respond_to?(:id=)
- entry.id = fake_id.to_s
- fake_id += 1
- end
- entry.consolidate! if entry.respond_to?(:consolidate!)
- end
+
+ next unless entry.respond_to?(:consolidate!)
+
+ entry.consolidate!
+
+ next unless entry.respond_to?(:dirty?)
+ next unless entry.dirty?
+ next unless entry.respond_to?(:as_update)
+
+ msg = "After consolidate there's still a dirty model:\n"
+ msg << entry.as_update.pretty_inspect
+ log(:debug) { msg }
end
end
+ def run_error_handlers! # rubocop:disable Metrics/AbcSize
+ return if error_handler?
+ return unless status
+ return unless status.errors.any?
+
+ handlers = session.config.error_handlers
+ return if handlers.empty?
+
+ # launch error_handlers
+ err_types = status.errors.by_type
+ log(:debug) {
+ "(#{name}) got these error types: #{err_types.keys}"
+ }
+
+ handlers.each do |handler|
+ next unless (entries = err_types[handler.name])
+
+ handler_job = subjobs_add(
+ "#{name} => #{handler.name}",
+ usecase: handler
+ )
+
+ msg = "Running error handler #{handler.name}"
+ msg << " (against #{entries.count} entries)"
+ log(:debug) { msg }
+
+ handler.launch(
+ people: people(entries),
+ session: session,
+ options: options,
+ job: handler_job
+ )
+
+ log(:debug) {
+ "Launching job of error handler: #{handler_job.name}"
+ }
+
+ handler_job.launch(simulate: false)
+ end
+ end
+
# Keep a copy of the requests for future reference
def backup_update(requests, simulate: false)
dry_run = simulate ? "_dry_run" : ""
dir = config.people.requests_folder
- filename = name.split(" ").join("-").gsub(/[=\\\/><,"-]+/,"_")
+ filename = name.split(" ").join("-").gsub(/[=\\\/><,"-]+/, "_") # rubocop:disable Style/RedundantArgument
file = File.join(dir, "#{type}_data_#{filename}#{dry_run}.json")
file_manager.save_json(requests, file, :timestamp)
end
# Adds a job tied to the current job
@@ -402,5 +517,7 @@
end
end
end
end
end
+
+# rubocop:enable Naming/MethodParameterName