module Eco module API class Session class Batch class Job < API::Common::Session::BaseSession @types = [:get, :create, :update, :delete] @sets = [:core, :details, :account] class << self attr_reader :types, :sets def valid_type?(value) types.include?(value) end def valid_sets?(value) sts = [value].flatten sts.all? { |s| sets.include?(s) } end end attr_reader :name, :type, :status attr_reader :usecase def initialize(e, name:, type:, sets:, usecase: nil) raise "A name is required to refer a job. Given: #{name}" if !name raise "Type should be one of #{self.class.types}. Given: #{type}" if !self.class.valid_type?(type) raise "Sets should be some of #{self.class.sets}. Given: #{sets}" if !self.class.valid_sets?(sets) raise "usecase must be a Eco::API::UseCases::UseCase object. Given: #{usecase.class}" if usecase && !usecase.is_a?(Eco::API::UseCases::UseCase) super(e) @name = name @type = type @usecase = usecase @sets = [sets].flatten.compact reset end def reset @queue = [] @queue_hash = {} @callbacks = {} @pending = true @status = nil end def usecase? !!usecase end def options usecase?? usecase.options : {} end def signature "Batch job \"#{name}\" ['#{type.to_s.upcase}': #{sets_title}]" end def match?(type:, sets:) sets = [sets].flatten type == self.type && (sets.order == @sets.order) end def pending? @pending end # Adds an entry(ies) to the job queue. # @param entry [Person, Enumberable] the person(s) we want to update, carrying the changes to be done. # @param unique [Boolean] specifies if repeated entries should be avoided in the queue. # @yield [person] callback before launching the batch job request against the server. # @yeldparam param [Person] current person object that that should be treated by the callback before launching the batch. # @return [Void] def add(entry, unique: true, &block) case entry when Enumerable entry.each {|e| add(e, unique: unique, &block)} else unless !entry unless unique && @queue_hash.key?(entry) @queue_hash[entry] = true @queue.push(entry) @callbacks[entry] = Proc.new if block_given? end end end end def people(input = @queue) Eco::API::Organization::People.new(input) end def launch(simulate: false) pqueue = processed_queue launch_feedback(pqueue, simulate ? 2500 : 800) if !simulate if pqueue.length > 0 backup_update(pqueue) @status = session.batch.launch(pqueue, method: type) @status.root = self end end post_launch(queue: pqueue, simulate: simulate) logger.info("Simulate: this would have launched: '#{type}'") if simulate @pending = false return @status end private def processed_queue @queue.each {|e| @callbacks[e].call(e) if @callbacks.key?(e) } apply_policies(api_included(@queue)).select {|e| !as_update(e).empty?} end def post_launch(queue: [], simulate: false) if !simulate && @status @status.queue.map do |entry| if @status.success?(entry) entry.consolidate! if entry.respond_to?(:consolidate!) #else # do not entry.reset! (keep track on changes still) end end # launch_error handlers handlers = session.config.error_handlers if @status.errors.any? && !handlers.empty? err_types = @status.errors.by_type handlers.each do |handler| if entries = err_types[handler.name] handler.launch(people: people(entries), session: session, options: options) end end end elsif simulate queue.map do |entry| entry.consolidate! if entry.respond_to?(:consolidate!) end end end # if there is a config definition to exclude entries # and the current batch is not a creation batch # - filter out excluded entries from the api update def api_included(full_queue) return full_queue if type == :create return full_queue unless excluded = session.config.people.api_excluded full_queue.select {|entry| !excluded.call(entry, session, options, self)} end def apply_policies(pre_queue) #pre_queue.tap do |entries| people(pre_queue).tap do |entries| policies = session.config.policies unless policies.empty? policies.launch(people: entries, session: session, options: options) end end end def as_update(entry) hash = entry if entry.is_a?(Hash) if only_ids? hash = entry.as_json.slice("id", "external_id", "email") else if entry.is_a?(Ecoportal::API::V1::Person) hash = entry.as_update if hfields = hash.dig("details", "fields") hash["details"]["fields"] = hfields.map do |fld| fld.merge!("alt_id" => entry.details.get_field(fld["id"]).alt_id) if entry.details end end end fields = hash&.dig('details', 'fields') fields&.map! { |fld| fld&.slice("id", "alt_id", "value") } end hash || {} end def only_ids? [:delete, :get].include?(type) end def sets_title "#{@sets.map {|s| s.to_s}.join(", ")}" end def launch_feedback(data, max_chars = 800) if !data || !data.is_a?(Enumerable) || data.empty? logger.warn("#{"*" * 20} Nothing for #{signature} so far :) #{"*" * 20}") return end header = ("*" * 20) + " #{signature} - Feedback Sample " + ("*" * 20) logger.info(header) sample_length = 1 sample = data.slice(0, 20).map do |entry| update = as_update(entry) max_chars -= update.pretty_inspect.length sample_length += 1 if max_chars > 0 update end logger.info("#{sample.slice(0, sample_length).pretty_inspect}") logger.info("#{type.to_s.upcase} length: #{data.length}") logger.info("*" * header.length) end def backup_update(data) data_body = data.map { |u| as_update(u) } dir = config.people.requests_folder file = File.join(dir, "#{type}_data.json") file_manager.save_json(data_body, file, :timestamp) end end end end end end