lib/eco/api/session/batch/job.rb in eco-helpers-1.5.1 vs lib/eco/api/session/batch/job.rb in eco-helpers-1.5.2
- old
+ new
@@ -5,11 +5,11 @@
# @attr_reader name [String] the name of this `batch job`
# @attr_reader type [Symbol] a valid batch operation
# @attr_reader sets [Array<Symbol>] the parts of the person model this batch is supposed to affect
# @attr_reader usecase [Eco::API::UseCases::UseCase, nil] when provided: `usecase` that generated this `batch job`
# @attr_reader status [Eco::API::Session::Batch::Status] if launched: the `status` of the `batch`
- # @attr_reader feedback [Eco::API::Session::Batch::Feedback] helper class for feedback and decision making
+ # @attr_reader feedback [Eco::API::Session::Batch::Feedback] helper class for feedback and end-user decision making
class Job < Eco::API::Common::Session::BaseSession
@types = [:get, :create, :update, :delete]
@sets = [:core, :details, :account]
class << self
@@ -27,14 +27,15 @@
attr_reader :name, :type, :sets
attr_reader :usecase
attr_reader :status, :feedback
- # @param e [Eco::API::Common::Session::Environment] requires a session environment, as any child of `Eco::API::Common::Session::BaseSession`
+ # @param e [Eco::API::Common::Session::Environment] requires a session environment, as any child of `Eco::API::Common::Session::BaseSession`.
# @param name [String] the name of this `batch job`
- # @param type [Symbol] a valid batch operation
- # @param usecase [Eco::API::UseCases::UseCase, nil] when provided: `usecase` that generated this `batch job`
+ # @param type [Symbol] a valid batch operation.
+ # @param usecase [Eco::API::UseCases::UseCase, nil] when provided: `usecase` that generated this `batch job`.
+ # This is necessary to know the `options` used to run the usecase, which could modify the `Batch::Job` behviour.
def initialize(e, name:, type:, sets:, usecase: nil)
raise "A name is required to refer a job. Given: #{name}" if !name
raise "Type should be one of #{self.class.types}. Given: #{type}" unless self.class.valid_type?(type)
raise "Sets should be some of #{self.class.sets}. Given: #{sets}" unless self.class.valid_sets?(sets)
raise "usecase must be a Eco::API::UseCases::UseCase object. Given: #{usecase.class}" if usecase && !usecase.is_a?(Eco::API::UseCases::UseCase)
@@ -59,14 +60,25 @@
# Creates an empty `Batch::Job` with same behaviour as the current one
# @note
# * this job will not be linked to the `Batch::Jobs` model of the current session
# * mostly used for error_handlers
# @return [Eco::API::Session::Batch::Job]
- def dup(name = "ad-hoc:job-from:#{self.name}", usecase: nil)
+ def dup(name = "ad-hoc:job-from:#{self.name}", usecase: self.usecase)
self.class.new(enviro, name: name, type: type, sets: sets, usecase: usecase)
end
+ # @return [Eco::API::Session::Batch::Jobs] group of subjobs of this `Batch::Job`
+ def subjobs
+ @subjobs ||= Eco::API::Session::Batch::Jobs.new(enviro, name: "childs-of:#{self.name}")
+ end
+
+ def subjobs_add(name = "ad-hoc:job-from:#{self.name}", usecase: self.usecase, &block)
+ dup(name, usecase: usecase).tap do |subjob|
+ subjobs.add(subjob, &block)
+ end
+ end
+
# @return [Boolean] `true` if the current batch job is a result of an error_handler
def error_handler?
usecase? && usecase.is_a?(Eco::API::Error::Handler)
end
@@ -79,15 +91,15 @@
def options
usecase?? usecase.options : {}
end
# Adds an entry(ies) to the job queue.
- # @param entry [Person, Enumberable<Person>] the person(s) we want to update, carrying the changes to be done.
+ # @param entry [Ecoportal::API::V1::Person, Enumberable<Person>] the person(s) we want to update, carrying the changes to be done.
# @param unique [Boolean] specifies if repeated entries should be avoided in the queue.
# @yield [person] callback before launching the batch job request against the server.
- # @yeldparam param [Person] current person object that that should be treated by the callback before launching the batch.
- # @return [Void]
+ # @yieldparam param [Person] current person object that that should be treated by the callback before launching the batch.
+ # @return [Eco::API::Session::Batch::Job] this `Batch::Job`.
def add(entry, unique: true, &block)
case entry
when Enumerable
entry.each {|e| add(e, unique: unique, &block)}
else
@@ -97,17 +109,13 @@
@queue.push(entry)
@callbacks[entry] = Proc.new if block_given?
end
end
end
+ self
end
- #def match?(type:, sets:)
- # sets = [sets].flatten
- # type == self.type && (sets.order == self.sets.order)
- #end
-
# @return [Boolean] has been this `batch job` launched?
def pending?
@pending
end
@@ -138,34 +146,33 @@
end
# Processes the `queue` and, unless `simulate` is `true`, launches against the server:
# 1. pre_processes the queue obtaining the `requests`:
# - if the entries of `queue` got pending _callbacks_ (delayed changes), it processes them
- # - unless type == `:create`: if there's a defined `api_excluded` _callback_ it calls it (see `Eco::API::Session::Config::People#api_excluded`)
+ # - unless type == `:create`: if there's a defined `api_excluded` _callback_ it calls it (see {Eco::API::Session::Config::People#api_excluded})
# - transforms the result to a `Eco::API::Organization::People` object
- # - if there are `api policies` defined, it passes the entries through them in order (see `Eco::API::Session::Config#policies`)
+ # - if there are `api policies` defined, it passes the entries through them in order (see {Eco::API::Session::Config#policies})
# - this step is **skipped** if the option `-skip-api-policies` was used in the command line
# - at this point all the transformations have taken place...
# - only include the entries that, after all above, still hold pending changes (`!as_update.empty?`) to be launched as update
# 2. pre launch checks against the `requests`:
# - it generates `stats` (`Eco::API::Session::Batch::RequestStats`) out of the requests
- # - if there is a batch policy declared for the current job `type`, it checks compliance against `stats` (`Eco::API::Session::Batch::Policies`),
+ # - if there is a batch policy declared for the current job `type`, it checks compliance against `stats` (see {Eco::API::Session::Batch::Policies}),
# - a non-compliant batch will stop the current session by raising an `Exception`
# - this setp is **skipped** if the option `-skip-batch-policy` was used in the command line
# 3. if we are **not** in `dry-run` (or `simulate`), it:
# - backs up the raw queries (`requests`) launched to the Server, if we are **not** in `dry-run` (or `simulate`)
- # - **launches the batch** request against the _Server_ (see `Eco::API::Session::Batch#launch`)
- # - links the resulting batch `status` to this `Batch::Job` (see `Eco::API::Session::Batch::Status`)
+ # - **launches the batch** request against the _Server_ (see {Eco::API::Session::Batch#launch})
+ # - links the resulting batch `status` to this `Batch::Job` (see {Eco::API::Session::Batch::Status})
# - prints any `errors` replied by the _Server_
# 4. the post launch kicks in, and:
# - for success requests, it consolidates the associated entries (see `Ecoportal::API::V1::Person#consolidate!`)
# - launches specific error handlers, if there were **errors** from the Server as a result of the `batch.launch`, and there are `Error::Handlers` defined
# @return [Eco::API::Session::Batch::Status]
def launch(simulate: false)
pqueue = processed_queue
@requests = pqueue.map {|e| as_update(e)}
-
pre_checks(requests, simulate: simulate)
unless simulate
if pqueue.length > 0
backup_update(requests)
@@ -175,20 +182,23 @@
status.errors.print
end
end
end
- unless requests.empty?
- logger.info("--- simulate mode (dry-run) -- job '#{name}' -- this would have launched #{type.to_s.upcase}") if simulate
+ unless requests.empty? || !simulate
+ logger.info("--- simulate mode (dry-run) -- job '#{name}' -- this would have launched #{type.to_s.upcase}")
end
post_launch(queue: pqueue, simulate: simulate)
@pending = false
return status
end
- # Provides a text summary of the current status
+ # Provides a text summary of the current status including:
+ # 1. stats of the changes introduced by the job in the different parts of the person model
+ # 2. if the job is compliant with the batch policy
+ # 3. error messages in case they were errors from the server
# @note if `launch` was not invoked, it specifies so
# @return [String] the summary
def summary
[].tap do |msg|
if pending?
@@ -223,25 +233,28 @@
return full_queue if type == :create
return full_queue unless excluded = session.config.people.api_excluded
full_queue.select {|entry| !excluded.call(entry, session, options, self)}
end
+ # Applies the changes introduced by api policies
def apply_policies(pre_queue)
people(pre_queue).tap do |entries|
policies = session.config.policies
unless policies.empty? || options.dig(:skip, :api_policies)
policies.launch(people: entries, session: session, options: options, job: self)
end
end
end
+ # Shortcut to get the batch (belt) policy
def batch_policy
unless options.dig(:skip, :batch_policy)
@batch_policy ||= session.config.batch_policies[self.type]
end
end
+ # Checks batch policy compliance and displays the feedback on request stats
def pre_checks(requests, simulate: false)
only_stats = options.dig(:feedback, :only_stats)
max_chars = simulate ? 2500 : 800
msg = feedback.generate(requests, max_chars: max_chars, only_stats: only_stats)
logger.info(msg)
@@ -255,10 +268,13 @@
# will throw an Exception if the policy request_stats is not compliant
batch_policy.validate!(stats)
end
end
+ # after launched to the server
+ # 1. `consolidate!` person model if succeeded (person.doc -> person.original_doc)
+ # 2. if there were errors: launch specific error handlers if they are defined for the type of error
def post_launch(queue: [], simulate: false)
if !simulate && status
status.queue.map do |entry|
if status.success?(entry)
entry.consolidate! if entry.respond_to?(:consolidate!)
@@ -269,11 +285,11 @@
handlers = session.config.error_handlers
if status.errors.any? && !handlers.empty? && !error_handler?
err_types = status.errors.by_type
handlers.each do |handler|
if entries = err_types[handler.name]
- handler_job = self.dup("#{self.name} => #{handler.name}", usecase: handler)
+ handler_job = subjobs_add("#{self.name} => #{handler.name}", usecase: handler)
handler.launch(people: people(entries), session: session, options: options, job: handler_job)
handler_job.launch(simulate: simulate)
end
end
end
@@ -282,9 +298,10 @@
entry.consolidate! if entry.respond_to?(:consolidate!)
end
end
end
+ # Keep a copy of the requests for future reference
def backup_update(requests)
dir = config.people.requests_folder
file = File.join(dir, "#{type}_data.json")
file_manager.save_json(requests, file, :timestamp)
end