lib/eco/api/session/batch/job.rb in eco-helpers-1.0.13 vs lib/eco/api/session/batch/job.rb in eco-helpers-1.0.14

- old
+ new

@@ -1,10 +1,16 @@ module Eco module API class Session class Batch - class Job < API::Common::Session::BaseSession + # @attr_reader name [String] the name of this `batch job` + # @attr_reader type [Symbol] a valid batch operation + # @attr_reader sets [Array<Symbol>] the parts of the person model this batch is supposed to affect + # @attr_reader usecase [Eco::API::UseCases::UseCase, nil] when provided: `usecase` that generated this `batch job` + # @attr_reader status [Eco::API::Session::Batch::Status] if launched: the `status` of the `batch` + # @attr_reader feedback [Eco::API::Session::Batch::Feedback] helper class for feedback and decision making + class Job < Eco::API::Common::Session::BaseSession @types = [:get, :create, :update, :delete] @sets = [:core, :details, :account] class << self attr_reader :types, :sets @@ -17,24 +23,30 @@ sts = [value].flatten sts.all? { |s| sets.include?(s) } end end - attr_reader :name, :type, :status + attr_reader :name, :type, :sets attr_reader :usecase + attr_reader :status, :feedback + # @param e [Eco::API::Common::Session::Environment] requires a session environmen, as any child of `Eco::API::Common::Session::BaseSession` + # @param name [String] the name of this `batch job` + # @param type [Symbol] a valid batch operation + # @param usecase [Eco::API::UseCases::UseCase, nil] when provided: `usecase` that generated this `batch job` def initialize(e, name:, type:, sets:, usecase: nil) raise "A name is required to refer a job. Given: #{name}" if !name - raise "Type should be one of #{self.class.types}. Given: #{type}" if !self.class.valid_type?(type) - raise "Sets should be some of #{self.class.sets}. Given: #{sets}" if !self.class.valid_sets?(sets) + raise "Type should be one of #{self.class.types}. Given: #{type}" unless self.class.valid_type?(type) + raise "Sets should be some of #{self.class.sets}. Given: #{sets}" unless self.class.valid_sets?(sets) raise "usecase must be a Eco::API::UseCases::UseCase object. Given: #{usecase.class}" if usecase && !usecase.is_a?(Eco::API::UseCases::UseCase) super(e) - @name = name - @type = type - @usecase = usecase - @sets = [sets].flatten.compact + @name = name + @type = type + @sets = [sets].flatten.compact + @usecase = usecase + @feedback = Eco::API::Session::Batch::Feedback.new(job: self) reset end def reset @queue = [] @@ -42,27 +54,26 @@ @callbacks = {} @pending = true @status = nil end + # @return [Boolean] was this `batch job` generated by a `usecase`? (`Eco::API::UseCases::UseCase`) def usecase? !!usecase end + # @return [Hash] options the root `usecase` is run with def options usecase?? usecase.options : {} end - def signature - "Batch job \"#{name}\" ['#{type.to_s.upcase}': #{sets_title}]" - end - def match?(type:, sets:) sets = [sets].flatten - type == self.type && (sets.order == @sets.order) + type == self.type && (sets.order == self.sets.order) end + # @return [Boolean] has been this `batch job` launched? def pending? @pending end # Adds an entry(ies) to the job queue. @@ -78,27 +89,44 @@ else unless !entry unless unique && @queue_hash.key?(entry) @queue_hash[entry] = true @queue.push(entry) - @callbacks[entry] = Proc.new if block_given? + @callbacks[entry] = Proc.new if block_given? end end end end + # Helper/shortcut to obtain a people object out of `input` + # @note if `input` is not provided, it will use `queue` + # @return [Eco::API::Organization::People] def people(input = @queue) Eco::API::Organization::People.new(input) end + # Processes the `queue` and, unless `simulate` is `true`, launches against the server: + # 1. if the entries of `queue` got pending _callbacks_ (delayed changes), it processes them + # 2. unless type == `:create`: if there's a defined `api_excluded` _callback_ it calls it (see `Eco::API::Session::Config::People#api_excluded`) + # 3. transforms the result to a `Eco::API::Organization::People` object + # 4. if there are `api policies` defined, it passes the entries through them in order (see `Eco::API::Session::Config#policies`) + # 5. at this point all the transformations have taken place... + # 6. only include the entries that, after all above, still hold pending changes (`!as_update.empty?`) to be launched as update + # 7. if we are **not** in `dry-run` (or `simulate`), launch the batch request against the server (see `Eco::API::Session::Batch#launch`) + # 8. next, it links the resulting batch `status` to this `Batch::Job` (see `Eco::API::Session::Batch::Status`) + # 9. the post launch kicks in, and for success requests, it consolidates the associated entries (see `Ecoportal::API::V1::Person#consolidate!`) + # 10. launches specific error handlers, if there were **errors** from the Server as a result of the `batch.launch`, and there are `Error::Handlers` defined + # 11. if we are **not** in `dry-run` (or `simulate`), it backs up the raw queries launched to the Server def launch(simulate: false) - pqueue = processed_queue - launch_feedback(pqueue, simulate ? 2500 : 800) + pqueue = processed_queue + requests = pqueue.map {|e| as_update(e)} + pre_checks(requests, simulate: simulate) + if !simulate if pqueue.length > 0 - backup_update(pqueue) + backup_update(requests) @status = session.batch.launch(pqueue, method: type) @status.root = self end end @@ -109,112 +137,87 @@ return @status end private + def as_update(*args) + feedback.as_update(*args) + end + def processed_queue @queue.each {|e| @callbacks[e].call(e) if @callbacks.key?(e) } apply_policies(api_included(@queue)).select {|e| !as_update(e).empty?} end - def post_launch(queue: [], simulate: false) - if !simulate && @status - @status.queue.map do |entry| - if @status.success?(entry) - entry.consolidate! if entry.respond_to?(:consolidate!) - #else # do not entry.reset! (keep track on changes still) - end - end - # launch_error handlers - handlers = session.config.error_handlers - if @status.errors.any? && !handlers.empty? - err_types = @status.errors.by_type - handlers.each do |handler| - if entries = err_types[handler.name] - handler.launch(people: people(entries), session: session, options: options) - end - end - end - elsif simulate - queue.map do |entry| - entry.consolidate! if entry.respond_to?(:consolidate!) - end - end - end - # if there is a config definition to exclude entries # and the current batch is not a creation batch # - filter out excluded entries from the api update def api_included(full_queue) return full_queue if type == :create return full_queue unless excluded = session.config.people.api_excluded full_queue.select {|entry| !excluded.call(entry, session, options, self)} end def apply_policies(pre_queue) - #pre_queue.tap do |entries| people(pre_queue).tap do |entries| policies = session.config.policies - unless policies.empty? + unless policies.empty? || options.dig(:skip, :api_policies) policies.launch(people: entries, session: session, options: options) end end end - def as_update(entry) - hash = entry if entry.is_a?(Hash) - if only_ids? - hash = entry.as_json.slice("id", "external_id", "email") - else - if entry.is_a?(Ecoportal::API::V1::Person) - hash = entry.as_update - if hfields = hash.dig("details", "fields") - hash["details"]["fields"] = hfields.map do |fld| - fld.merge!("alt_id" => entry.details.get_field(fld["id"]).alt_id) if entry.details - end - end - end - - fields = hash&.dig('details', 'fields') - fields&.map! { |fld| fld&.slice("id", "alt_id", "value") } + def batch_policy + unless options.dig(:skip, :batch_policy) + @batch_policy ||= session.config.batch_policies[self.type] end - hash || {} end - def only_ids? - [:delete, :get].include?(type) - end + def pre_checks(requests, simulate: false) + only_stats = options.dig(:feedback, :only_stats) + max_chars = simulate ? 2500 : 800 + msg = feedback.generate(requests, max_chars: max_chars, only_stats: only_stats) + logger.info(msg) - def sets_title - "#{@sets.map {|s| s.to_s}.join(", ")}" + @request_stats = feedback.request_stats(requests) + if simulate && batch_policy && !batch_policy.compliant?(@request_stats) + logger.warn("Batch Policy Uncompliance: this and next batches will be aborted!") + logger.warn(batch_policy.uncompliance(@request_stats)) + elsif batch_policy + # will throw an Exception if the policy request_stats is not compliant + batch_policy.validate!(@request_stats) + end end - def launch_feedback(data, max_chars = 800) - if !data || !data.is_a?(Enumerable) || data.empty? - logger.warn("#{"*" * 20} Nothing for #{signature} so far :) #{"*" * 20}") - return + def post_launch(queue: [], simulate: false) + if !simulate && @status + @status.queue.map do |entry| + if @status.success?(entry) + entry.consolidate! if entry.respond_to?(:consolidate!) + #else # do not entry.reset! (keep track on changes still) + end + end + # launch_error handlers + handlers = session.config.error_handlers + if @status.errors.any? && !handlers.empty? + err_types = @status.errors.by_type + handlers.each do |handler| + if entries = err_types[handler.name] + handler.launch(people: people(entries), session: session, options: options) + end + end + end + elsif simulate + queue.map do |entry| + entry.consolidate! if entry.respond_to?(:consolidate!) + end end - header = ("*" * 20) + " #{signature} - Feedback Sample " + ("*" * 20) - logger.info(header) - - sample_length = 1 - sample = data.slice(0, 20).map do |entry| - update = as_update(entry) - max_chars -= update.pretty_inspect.length - sample_length += 1 if max_chars > 0 - update - end - - logger.info("#{sample.slice(0, sample_length).pretty_inspect}") - logger.info("#{type.to_s.upcase} length: #{data.length}") - logger.info("*" * header.length) end - def backup_update(data) - data_body = data.map { |u| as_update(u) } + def backup_update(requests) dir = config.people.requests_folder file = File.join(dir, "#{type}_data.json") - file_manager.save_json(data_body, file, :timestamp) + file_manager.save_json(requests, file, :timestamp) end end end end