lib/eco/api/session/batch/job.rb in eco-helpers-1.0.13 vs lib/eco/api/session/batch/job.rb in eco-helpers-1.0.14
- old
+ new
@@ -1,10 +1,16 @@
module Eco
module API
class Session
class Batch
- class Job < API::Common::Session::BaseSession
+ # @attr_reader name [String] the name of this `batch job`
+ # @attr_reader type [Symbol] a valid batch operation
+ # @attr_reader sets [Array<Symbol>] the parts of the person model this batch is supposed to affect
+ # @attr_reader usecase [Eco::API::UseCases::UseCase, nil] when provided: `usecase` that generated this `batch job`
+ # @attr_reader status [Eco::API::Session::Batch::Status] if launched: the `status` of the `batch`
+ # @attr_reader feedback [Eco::API::Session::Batch::Feedback] helper class for feedback and decision making
+ class Job < Eco::API::Common::Session::BaseSession
@types = [:get, :create, :update, :delete]
@sets = [:core, :details, :account]
class << self
attr_reader :types, :sets
@@ -17,24 +23,30 @@
sts = [value].flatten
sts.all? { |s| sets.include?(s) }
end
end
- attr_reader :name, :type, :status
+ attr_reader :name, :type, :sets
attr_reader :usecase
+ attr_reader :status, :feedback
+ # @param e [Eco::API::Common::Session::Environment] requires a session environmen, as any child of `Eco::API::Common::Session::BaseSession`
+ # @param name [String] the name of this `batch job`
+ # @param type [Symbol] a valid batch operation
+ # @param usecase [Eco::API::UseCases::UseCase, nil] when provided: `usecase` that generated this `batch job`
def initialize(e, name:, type:, sets:, usecase: nil)
raise "A name is required to refer a job. Given: #{name}" if !name
- raise "Type should be one of #{self.class.types}. Given: #{type}" if !self.class.valid_type?(type)
- raise "Sets should be some of #{self.class.sets}. Given: #{sets}" if !self.class.valid_sets?(sets)
+ raise "Type should be one of #{self.class.types}. Given: #{type}" unless self.class.valid_type?(type)
+ raise "Sets should be some of #{self.class.sets}. Given: #{sets}" unless self.class.valid_sets?(sets)
raise "usecase must be a Eco::API::UseCases::UseCase object. Given: #{usecase.class}" if usecase && !usecase.is_a?(Eco::API::UseCases::UseCase)
super(e)
- @name = name
- @type = type
- @usecase = usecase
- @sets = [sets].flatten.compact
+ @name = name
+ @type = type
+ @sets = [sets].flatten.compact
+ @usecase = usecase
+ @feedback = Eco::API::Session::Batch::Feedback.new(job: self)
reset
end
def reset
@queue = []
@@ -42,27 +54,26 @@
@callbacks = {}
@pending = true
@status = nil
end
+ # @return [Boolean] was this `batch job` generated by a `usecase`? (`Eco::API::UseCases::UseCase`)
def usecase?
!!usecase
end
+ # @return [Hash] options the root `usecase` is run with
def options
usecase?? usecase.options : {}
end
- def signature
- "Batch job \"#{name}\" ['#{type.to_s.upcase}': #{sets_title}]"
- end
-
def match?(type:, sets:)
sets = [sets].flatten
- type == self.type && (sets.order == @sets.order)
+ type == self.type && (sets.order == self.sets.order)
end
+ # @return [Boolean] has been this `batch job` launched?
def pending?
@pending
end
# Adds an entry(ies) to the job queue.
@@ -78,27 +89,44 @@
else
unless !entry
unless unique && @queue_hash.key?(entry)
@queue_hash[entry] = true
@queue.push(entry)
- @callbacks[entry] = Proc.new if block_given?
+ @callbacks[entry] = Proc.new if block_given?
end
end
end
end
+ # Helper/shortcut to obtain a people object out of `input`
+ # @note if `input` is not provided, it will use `queue`
+ # @return [Eco::API::Organization::People]
def people(input = @queue)
Eco::API::Organization::People.new(input)
end
+ # Processes the `queue` and, unless `simulate` is `true`, launches against the server:
+ # 1. if the entries of `queue` got pending _callbacks_ (delayed changes), it processes them
+ # 2. unless type == `:create`: if there's a defined `api_excluded` _callback_ it calls it (see `Eco::API::Session::Config::People#api_excluded`)
+ # 3. transforms the result to a `Eco::API::Organization::People` object
+ # 4. if there are `api policies` defined, it passes the entries through them in order (see `Eco::API::Session::Config#policies`)
+ # 5. at this point all the transformations have taken place...
+ # 6. only include the entries that, after all above, still hold pending changes (`!as_update.empty?`) to be launched as update
+ # 7. if we are **not** in `dry-run` (or `simulate`), launch the batch request against the server (see `Eco::API::Session::Batch#launch`)
+ # 8. next, it links the resulting batch `status` to this `Batch::Job` (see `Eco::API::Session::Batch::Status`)
+ # 9. the post launch kicks in, and for success requests, it consolidates the associated entries (see `Ecoportal::API::V1::Person#consolidate!`)
+ # 10. launches specific error handlers, if there were **errors** from the Server as a result of the `batch.launch`, and there are `Error::Handlers` defined
+ # 11. if we are **not** in `dry-run` (or `simulate`), it backs up the raw queries launched to the Server
def launch(simulate: false)
- pqueue = processed_queue
- launch_feedback(pqueue, simulate ? 2500 : 800)
+ pqueue = processed_queue
+ requests = pqueue.map {|e| as_update(e)}
+ pre_checks(requests, simulate: simulate)
+
if !simulate
if pqueue.length > 0
- backup_update(pqueue)
+ backup_update(requests)
@status = session.batch.launch(pqueue, method: type)
@status.root = self
end
end
@@ -109,112 +137,87 @@
return @status
end
private
+ def as_update(*args)
+ feedback.as_update(*args)
+ end
+
def processed_queue
@queue.each {|e| @callbacks[e].call(e) if @callbacks.key?(e) }
apply_policies(api_included(@queue)).select {|e| !as_update(e).empty?}
end
- def post_launch(queue: [], simulate: false)
- if !simulate && @status
- @status.queue.map do |entry|
- if @status.success?(entry)
- entry.consolidate! if entry.respond_to?(:consolidate!)
- #else # do not entry.reset! (keep track on changes still)
- end
- end
- # launch_error handlers
- handlers = session.config.error_handlers
- if @status.errors.any? && !handlers.empty?
- err_types = @status.errors.by_type
- handlers.each do |handler|
- if entries = err_types[handler.name]
- handler.launch(people: people(entries), session: session, options: options)
- end
- end
- end
- elsif simulate
- queue.map do |entry|
- entry.consolidate! if entry.respond_to?(:consolidate!)
- end
- end
- end
-
# if there is a config definition to exclude entries
# and the current batch is not a creation batch
# - filter out excluded entries from the api update
def api_included(full_queue)
return full_queue if type == :create
return full_queue unless excluded = session.config.people.api_excluded
full_queue.select {|entry| !excluded.call(entry, session, options, self)}
end
def apply_policies(pre_queue)
- #pre_queue.tap do |entries|
people(pre_queue).tap do |entries|
policies = session.config.policies
- unless policies.empty?
+ unless policies.empty? || options.dig(:skip, :api_policies)
policies.launch(people: entries, session: session, options: options)
end
end
end
- def as_update(entry)
- hash = entry if entry.is_a?(Hash)
- if only_ids?
- hash = entry.as_json.slice("id", "external_id", "email")
- else
- if entry.is_a?(Ecoportal::API::V1::Person)
- hash = entry.as_update
- if hfields = hash.dig("details", "fields")
- hash["details"]["fields"] = hfields.map do |fld|
- fld.merge!("alt_id" => entry.details.get_field(fld["id"]).alt_id) if entry.details
- end
- end
- end
-
- fields = hash&.dig('details', 'fields')
- fields&.map! { |fld| fld&.slice("id", "alt_id", "value") }
+ def batch_policy
+ unless options.dig(:skip, :batch_policy)
+ @batch_policy ||= session.config.batch_policies[self.type]
end
- hash || {}
end
- def only_ids?
- [:delete, :get].include?(type)
- end
+ def pre_checks(requests, simulate: false)
+ only_stats = options.dig(:feedback, :only_stats)
+ max_chars = simulate ? 2500 : 800
+ msg = feedback.generate(requests, max_chars: max_chars, only_stats: only_stats)
+ logger.info(msg)
- def sets_title
- "#{@sets.map {|s| s.to_s}.join(", ")}"
+ @request_stats = feedback.request_stats(requests)
+ if simulate && batch_policy && !batch_policy.compliant?(@request_stats)
+ logger.warn("Batch Policy Uncompliance: this and next batches will be aborted!")
+ logger.warn(batch_policy.uncompliance(@request_stats))
+ elsif batch_policy
+ # will throw an Exception if the policy request_stats is not compliant
+ batch_policy.validate!(@request_stats)
+ end
end
- def launch_feedback(data, max_chars = 800)
- if !data || !data.is_a?(Enumerable) || data.empty?
- logger.warn("#{"*" * 20} Nothing for #{signature} so far :) #{"*" * 20}")
- return
+ def post_launch(queue: [], simulate: false)
+ if !simulate && @status
+ @status.queue.map do |entry|
+ if @status.success?(entry)
+ entry.consolidate! if entry.respond_to?(:consolidate!)
+ #else # do not entry.reset! (keep track on changes still)
+ end
+ end
+ # launch_error handlers
+ handlers = session.config.error_handlers
+ if @status.errors.any? && !handlers.empty?
+ err_types = @status.errors.by_type
+ handlers.each do |handler|
+ if entries = err_types[handler.name]
+ handler.launch(people: people(entries), session: session, options: options)
+ end
+ end
+ end
+ elsif simulate
+ queue.map do |entry|
+ entry.consolidate! if entry.respond_to?(:consolidate!)
+ end
end
- header = ("*" * 20) + " #{signature} - Feedback Sample " + ("*" * 20)
- logger.info(header)
-
- sample_length = 1
- sample = data.slice(0, 20).map do |entry|
- update = as_update(entry)
- max_chars -= update.pretty_inspect.length
- sample_length += 1 if max_chars > 0
- update
- end
-
- logger.info("#{sample.slice(0, sample_length).pretty_inspect}")
- logger.info("#{type.to_s.upcase} length: #{data.length}")
- logger.info("*" * header.length)
end
- def backup_update(data)
- data_body = data.map { |u| as_update(u) }
+ def backup_update(requests)
dir = config.people.requests_folder
file = File.join(dir, "#{type}_data.json")
- file_manager.save_json(data_body, file, :timestamp)
+ file_manager.save_json(requests, file, :timestamp)
end
end
end
end