lib/eco/api/session/batch.rb in eco-helpers-3.0.16 vs lib/eco/api/session/batch.rb in eco-helpers-3.0.17

- old
+ new

@@ -1,282 +1,25 @@ module Eco module API class Session class Batch < Common::Session::BaseSession - DEFAULT_BATCH_SIZE = 50 - DEFAULT_JOB_SIZE = 100 - VALID_METHODS = %i[get create update upsert delete].freeze + require_relative 'batch/launcher' + require_relative 'batch/searcher' - class << self - # @return [Boolean] `true` if the method is supported, `false` otherwise. - def valid_method?(value) - VALID_METHODS.include?(value) - end - end + include Launcher + include Searcher - def batch_size(opts = options) - return self.class::DEFAULT_JOB_SIZE if job_mode?(opts) - - self.class::DEFAULT_BATCH_SIZE - end - - # @return [Symbol] the batch mode to run - def batch_mode(opts = options) - opts.dig(:workflow, :batch, :mode) || :batch - end - - # @return [Boolean] are we running in `:job` mode? - def job_mode?(opts = options) - batch_mode(opts) == :job - end - - # Gets the _people_ of the organization according `params`. - # If `people` is not `nil`, scopes to only the people specified. - # @note - # - If `people` is given keys `page:` and `q` of `params:`. - # @param people [Nil, People, Enumerable<Person>, Enumerable<Hash>] target _People_ to launch the batch against. - # @param params [Hash] api request options. - # @option params [String] :page the page number `page` based on `:per_page`. - # @option params [String] :per_page the number of people included per each batch api request. - # @option params [String] :q some text to search. Omit this parameter to target all the people. - # @return [Array<People>] all the people based on `params` - def get_people(people = nil, params: {}, silent: false, options: self.options) - return get(params: params, silent: silent, options: options) unless people.is_a?(Enumerable) - - launch( - people, - method: :get, - params: params, - silent: silent, - options: options - ).people - end - # launches a batch of `method` type using `people` and the specified `params` # @raise Exception # - if `people` is `nil` or is not an `Enumerable`. # - if there's no `api` connection linked to the current `Batch`. # @param people [People, Enumerable<Person>, Enumerable<Hash>] target _People_ to launch the batch against. # @param method [Symbol] the method to launch the batch api request with. # @param params [Hash] api request options. # @option params [String] :per_page the number of people included per each batch api request. # @return [Batch::Status] the `status` of this batch launch. - def launch(people, method:, params: {}, silent: false, options: self.options) - batch_from( - people, - method: method, - params: params, - silent: silent, - options: options - ) - end - - def search(data, silent: false, params: {}, options: self.options) # rubocop:disable Metrics/AbcSize - params = {per_page: batch_size(options)}.merge(params) - - launch( - data, - method: :get, - params: params, - silent: silent, - options: options - ).tap do |status| - status.mode = :search - - entries = status.queue - puts "\n" - entries.each_with_index do |entry, i| - if (i % 10).zero? - percent = i * 100 / entries.length - print "Searching: #{percent.round}% (#{i}/#{entries.length} entries)\r" - $stdout.flush - end - - next if status.success?(entry) - - email = nil - if entry.respond_to?(:email) - email = entry.email - elsif entry.respond_to?(:to_h) - email = entry.to_h["email"] - end - - people_matching = [] - email = email.to_s.strip.downcase - unless email.empty? - people_matching = get( - params: params.merge(q: email), - silent: silent, - options: options - ).select do |person| - person.email == email - end - end - - case people_matching.length - when 1 - status.set_person_match(entry, people_matching.first) - when 2..Float::INFINITY - status.set_people_match(entry, people_matching) - end - end - end - end - - private - - def get(params: {}, silent: false, options: self.options) - msg = "cannot batch get without api connnection, please provide a valid api connection!" - fatal msg unless (people_api = api&.people) - - params = {per_page: batch_size(options)}.merge(params) - people_api.get_all(params: params, silent: silent) - end - - def batch_from( - data, - method:, - params: {}, - silent: false, - options: self.options - ) - fatal "Invalid batch method: #{method}." unless self.class.valid_method?(method) - return nil if !data || !data.is_a?(Enumerable) - - msg = "cannot batch #{method} without api connnection, please provide a valid api connection!" - fatal msg unless (people_api = api&.people) - - # param q does not make sense here, even for GET method - params = {per_page: batch_size(options)}.merge(params) - per_page = params[:per_page] || batch_size(options) - - launch_batch( - data, - method: method, - per_page: per_page, - people_api: people_api, - silent: silent, - options: options - ) - end - - # Default way to retrieve options (unless provided) - def options - ASSETS.cli.options - end - - def launch_batch( # rubocop:disable Metrics/AbcSize, Metrics/MethodLength - data, - method:, - status: nil, - job_mode: true, # rubocop:disable Lint/UnusedMethodArgument - options: self.options, - per_page: batch_size(options), - people_api: api&.people, - silent: false - ) - iteration = 1 - done = 0 - iterations = (data.length.to_f / per_page).ceil - - status ||= Eco::API::Session::Batch::Status.new( - enviro, - queue: data, - method: method - ) - - status.tap do - pending_for_server_error = data.to_a[0..] - - start_time = Time.now - - data.each_slice(per_page) do |slice| - msg = "starting batch '#{method}' iteration #{iteration}/#{iterations}, " - msg << "with #{slice.length} entries of #{data.length} -- #{done} done" - msg << (" " * 20) - log(:info) { msg } unless silent - - start_slice = Time.now - - offer_retry_on(Ecoportal::API::Errors::TimeOut) do - people_api.batch(job_mode: job_mode?(options)) do |batch| - slice.each do |person| - batch.public_send(method, person) do |response| - faltal("Request with no response") unless response - - next if server_error?(response) - - pending_for_server_error.delete(person) - status[person] = response - end - end - end # end batch - end - - done += slice.length - - msg = " ... iteration #{iteration}/#{iterations} done " - msg << "in #{str_stats(start_slice, slice.length)} " - msg << "(average: #{str_stats(start_time, done)})" - msg << (" " * 20) - log(:info) { msg } unless silent - - iteration += 1 - end # next slice - - # temporary working around (due to back-end problems with batch/jobs) - unless pending_for_server_error.empty? - msg = "Going to re-try #{pending_for_server_error.count} due to server errors" - log(:info) { msg } unless silent - - launch_batch( - pending_for_server_error, - status: status, - method: method, - job_mode: false, - per_page: per_page, - people_api: people_api, - silent: silent, - options: options - ) - end - end - end - - def server_error?(response) - res_status = response.status - server_error = !res_status || res_status.server_error? - other_error = !server_error && (!res_status.code || res_status.code < 100) - no_body = !server_error && !other_error && !response.body - server_error || other_error || no_body - end - - def offer_retry_on(error_type, retries_left = 3, &block) - yield - rescue error_type => err - raise err.class, err.message, cause: nil unless retries_left.positive? - - explanation = "#{err}\n" - explanation << "You have #{retries_left} retries left." - question = " Do you want to retry (y/N)?" - - prompt_user(question, default: "Y", explanation: explanation, timeout: 10) do |response| - raise unless response.upcase.start_with?("Y") - - puts "\nOkay... let's retry!" - offer_retry_on(error_type, retries_left - 1, &block) - end - end - - def str_stats(start, count) - now = Time.now - secs = (now - start).round(2) - if secs > 0.0 - per_sec = (count.to_f / secs).round(2) - "#{secs}s -> #{per_sec} people/s" - else - " -- " - end + def launch(...) + batch_from(...) end end end end end