module Eco module API class Session class BatchJob < API::Common::Session::BaseSession TYPES = [:get, :create, :update, :delete] SETS = [:core, :details, :account] attr_reader :name, :status class << self def valid_type?(value) TYPES.include?(value) end def valid_sets?(value) sets = [value].flatten sets.all? { |s| SETS.include?(s) } end end def initialize(e, name:, type:, sets:) raise "A name is required to refer a job. Given: #{name}" if !name raise "Type should be one of #{TYPES}. Given: #{type}" if !self.class.valid_type?(type) raise "Sets should be some of #{SETS}. Given: #{sets}" if !self.class.valid_sets?(sets) super(e) @name = name @type = type @sets = [sets].flatten.compact reset end def reset @queue = [] @queue_hash = {} @callbacks = {} @status = nil end def signature "job \"#{@name}\" ['#{@type.to_s.upcase}': #{sets_title}]" end def match?(type:, sets:) sets = [sets].flatten type == @type && (sets.order == @sets.order) end def pending? @queue.length > 0 end def core? sets.include?(:core) end def details? sets.include?(:details) end def account? sets.include?(:account) end # Adds an entry to the job queue. # @param entry [Person] the person we want to update, carrying the changes to be done. # @param unique [Boolean] specifies if repeated entries should be avoided in the queue. # @yield [person] callback before launching the batch job request against the server. # @yeldparam param [Person] current person object that that should be treated by the callback before launching the batch. # @return [Void] def add(entry, unique: true) unless !entry unless unique && @queue_hash.key?(entry) @queue_hash[entry] = true @queue.push(entry) @callbacks[entry] = Proc.new if block_given? end end end def people(input = @queue) Eco::API::Organization::People.new(input) end def processed_queue pre_queue = @queue.map do |e| if callback = @callbacks[e] callback.call(e) end e = nil if as_update(e).empty? e end.compact apply_policies(pre_queue) end def processed_queue @queue.each {|e| @callbacks[e].call(e) if @callbacks.key?(e) } apply_policies(@queue).select {|e| !as_update(e).empty?} end def launch(simulate: false) queue = processed_queue launch_feedback(queue, simulate ? 2500 : 800) if !simulate if queue.length > 0 backup_update(queue) @status = session.batch.launch(queue, method: @type) @status.root = self end end post_launch(queue: queue, simulate: simulate) logger.info("Simulate: this would have launched: '#{@type}'") if simulate return @status end private def post_launch(queue: [], simulate: false) if !simulate && @status @status.queue.map do |entry| if @status.success?(entry) entry.consolidate! if entry.respond_to?(:consolidate!) #else # shouldn't probably reset, as the model remains dirty? (well tracaked) # entry.reset! if entry.respond_to?(:reset!) end end elsif simulate queue.map do |entry| entry.consolidate! if entry.respond_to?(:consolidate!) end end end def apply_policies(pre_queue) pre_queue.tap do |entries| policies = session.config.api_policies.policies unless policies.empty? policies.launch(people: people(entries), session: session) end end end def as_update(entry) hash = entry if entry.is_a?(Hash) if only_ids? hash = entry.as_json.slice("id", "external_id", "email") else if entry.is_a?(Ecoportal::API::V1::Person) hash = entry.as_update if hfields = hash.dig("details", "fields") hash["details"]["fields"] = hfields.map do |fld| fld.merge!("alt_id" => entry.details.get_field(fld["id"]).alt_id) if entry.details end end end fields = hash&.dig('details', 'fields') fields&.map! { |fld| fld&.slice("id", "alt_id", "value") } end hash || {} end def only_ids? [:delete, :get].include?(@type) end def sets_title "#{@sets.map {|s| s.to_s}.join(", ")}" end def launch_feedback(data, max_chars = 800) if !data || !data.is_a?(Enumerable) || data.empty? logger.warn("#{"*" * 20} Nothing for #{signature} so far :) #{"*" * 20}") return end header = ("*" * 20) + " #{signature} - Feedback Sample " + ("*" * 20) logger.info(header) sample_length = 1 sample = data.slice(0, 20).map do |entry| update = as_update(entry) max_chars -= update.pretty_inspect.length sample_length += 1 if max_chars > 0 update end logger.info("#{sample.slice(0, sample_length).pretty_inspect}") logger.info("#{@type.to_s.upcase} length: #{data.length}") logger.info("*" * header.length) end def backup_update(data) data_body = data.map { |u| as_update(u) } dir = config.people.requests_folder file = File.join(dir, "#{@type}_data.json") file_manager.save_json(data_body, file, :timestamp) end end end end end