module Eco module API class Session class Batch < Common::Session::BaseSession DEFAULT_BATCH_SIZE = 50 DEFAULT_JOB_SIZE = 100 VALID_METHODS = %i[get create update upsert delete].freeze class << self # @return [Boolean] `true` if the method is supported, `false` otherwise. def valid_method?(value) VALID_METHODS.include?(value) end end def batch_size(opts = options) return self.class::DEFAULT_JOB_SIZE if job_mode?(opts) self.class::DEFAULT_BATCH_SIZE end # @return [Symbol] the batch mode to run def batch_mode(opts = options) opts.dig(:workflow, :batch, :mode) || :batch end # @return [Boolean] are we running in `:job` mode? def job_mode?(opts = options) batch_mode(opts) == :job end # Gets the _people_ of the organization according `params`. # If `people` is not `nil`, scopes to only the people specified. # @note # - If `people` is given keys `page:` and `q` of `params:`. # @param people [Nil, People, Enumerable, Enumerable] target _People_ to launch the batch against. # @param params [Hash] api request options. # @option params [String] :page the page number `page` based on `:per_page`. # @option params [String] :per_page the number of people included per each batch api request. # @option params [String] :q some text to search. Omit this parameter to target all the people. # @return [Array] all the people based on `params` def get_people(people = nil, params: {}, silent: false, options: self.options) return get(params: params, silent: silent, options: options) unless people.is_a?(Enumerable) launch( people, method: :get, params: params, silent: silent, options: options ).people end # launches a batch of `method` type using `people` and the specified `params` # @raise Exception # - if `people` is `nil` or is not an `Enumerable`. # - if there's no `api` connection linked to the current `Batch`. # @param people [People, Enumerable, Enumerable] target _People_ to launch the batch against. # @param method [Symbol] the method to launch the batch api request with. # @param params [Hash] api request options. # @option params [String] :per_page the number of people included per each batch api request. # @return [Batch::Status] the `status` of this batch launch. def launch(people, method:, params: {}, silent: false, options: self.options) batch_from( people, method: method, params: params, silent: silent, options: options ) end def search(data, silent: false, params: {}, options: self.options) # rubocop:disable Metrics/AbcSize params = {per_page: batch_size(options)}.merge(params) launch( data, method: :get, params: params, silent: silent, options: options ).tap do |status| status.mode = :search entries = status.queue puts "\n" entries.each_with_index do |entry, i| if (i % 10).zero? percent = i * 100 / entries.length print "Searching: #{percent.round}% (#{i}/#{entries.length} entries)\r" $stdout.flush end next if status.success?(entry) email = nil if entry.respond_to?(:email) email = entry.email elsif entry.respond_to?(:to_h) email = entry.to_h["email"] end people_matching = [] email = email.to_s.strip.downcase unless email.empty? people_matching = get( params: params.merge(q: email), silent: silent, options: options ).select do |person| person.email == email end end case people_matching.length when 1 status.set_person_match(entry, people_matching.first) when 2..Float::INFINITY status.set_people_match(entry, people_matching) end end end end private def get(params: {}, silent: false, options: self.options) msg = "cannot batch get without api connnection, please provide a valid api connection!" fatal msg unless (people_api = api&.people) params = {per_page: batch_size(options)}.merge(params) people_api.get_all(params: params, silent: silent) end def batch_from( data, method:, params: {}, silent: false, options: self.options ) fatal "Invalid batch method: #{method}." unless self.class.valid_method?(method) return nil if !data || !data.is_a?(Enumerable) msg = "cannot batch #{method} without api connnection, please provide a valid api connection!" fatal msg unless (people_api = api&.people) # param q does not make sense here, even for GET method params = {per_page: batch_size(options)}.merge(params) per_page = params[:per_page] || batch_size(options) launch_batch( data, method: method, per_page: per_page, people_api: people_api, silent: silent, options: options ) end # Default way to retrieve options (unless provided) def options ASSETS.cli.options end def launch_batch( # rubocop:disable Metrics/AbcSize, Metrics/MethodLength data, method:, status: nil, job_mode: true, # rubocop:disable Lint/UnusedMethodArgument options: self.options, per_page: batch_size(options), people_api: api&.people, silent: false ) iteration = 1 done = 0 iterations = (data.length.to_f / per_page).ceil status ||= Eco::API::Session::Batch::Status.new( enviro, queue: data, method: method ) status.tap do pending_for_server_error = data.to_a[0..] start_time = Time.now data.each_slice(per_page) do |slice| msg = "starting batch '#{method}' iteration #{iteration}/#{iterations}, " msg << "with #{slice.length} entries of #{data.length} -- #{done} done" msg << (" " * 20) log(:info) { msg } unless silent start_slice = Time.now offer_retry_on(Ecoportal::API::Errors::TimeOut) do people_api.batch(job_mode: job_mode?(options)) do |batch| slice.each do |person| batch.public_send(method, person) do |response| faltal("Request with no response") unless response next if server_error?(response) pending_for_server_error.delete(person) status[person] = response end end end # end batch end done += slice.length msg = " ... iteration #{iteration}/#{iterations} done " msg << "in #{str_stats(start_slice, slice.length)} " msg << "(average: #{str_stats(start_time, done)})" msg << (" " * 20) log(:info) { msg } unless silent iteration += 1 end # next slice # temporary working around (due to back-end problems with batch/jobs) unless pending_for_server_error.empty? msg = "Going to re-try #{pending_for_server_error.count} due to server errors" log(:info) { msg } unless silent launch_batch( pending_for_server_error, status: status, method: method, job_mode: false, per_page: per_page, people_api: people_api, silent: silent, options: options ) end end end def server_error?(response) res_status = response.status server_error = !res_status || res_status.server_error? other_error = !server_error && (!res_status.code || res_status.code < 100) no_body = !server_error && !other_error && !response.body server_error || other_error || no_body end def offer_retry_on(error_type, retries_left = 3, &block) yield rescue error_type => err raise err.class, err.message, cause: nil unless retries_left.positive? explanation = "#{err}\n" explanation << "You have #{retries_left} retries left." question = " Do you want to retry (y/N)?" prompt_user(question, default: "Y", explanation: explanation, timeout: 10) do |response| raise unless response.upcase.start_with?("Y") puts "\nOkay... let's retry!" offer_retry_on(error_type, retries_left - 1, &block) end end def str_stats(start, count) now = Time.now secs = (now - start).round(2) if secs > 0.0 per_sec = (count.to_f / secs).round(2) "#{secs}s -> #{per_sec} people/s" else " -- " end end end end end end require_relative 'batch/job' require_relative 'batch/feedback' require_relative 'batch/request_stats' require_relative 'batch/base_policy' require_relative 'batch/policies' require_relative 'batch/status' require_relative 'batch/errors' require_relative 'batch/jobs' require_relative 'batch/jobs_groups'