lib/eco/api/session/batch.rb in eco-helpers-3.0.16 vs lib/eco/api/session/batch.rb in eco-helpers-3.0.17
- old
+ new
@@ -1,282 +1,25 @@
module Eco
module API
class Session
class Batch < Common::Session::BaseSession
- DEFAULT_BATCH_SIZE = 50
- DEFAULT_JOB_SIZE = 100
- VALID_METHODS = %i[get create update upsert delete].freeze
+ require_relative 'batch/launcher'
+ require_relative 'batch/searcher'
- class << self
- # @return [Boolean] `true` if the method is supported, `false` otherwise.
- def valid_method?(value)
- VALID_METHODS.include?(value)
- end
- end
+ include Launcher
+ include Searcher
- def batch_size(opts = options)
- return self.class::DEFAULT_JOB_SIZE if job_mode?(opts)
-
- self.class::DEFAULT_BATCH_SIZE
- end
-
- # @return [Symbol] the batch mode to run
- def batch_mode(opts = options)
- opts.dig(:workflow, :batch, :mode) || :batch
- end
-
- # @return [Boolean] are we running in `:job` mode?
- def job_mode?(opts = options)
- batch_mode(opts) == :job
- end
-
- # Gets the _people_ of the organization according `params`.
- # If `people` is not `nil`, scopes to only the people specified.
- # @note
- # - If `people` is given keys `page:` and `q` of `params:`.
- # @param people [Nil, People, Enumerable<Person>, Enumerable<Hash>] target _People_ to launch the batch against.
- # @param params [Hash] api request options.
- # @option params [String] :page the page number `page` based on `:per_page`.
- # @option params [String] :per_page the number of people included per each batch api request.
- # @option params [String] :q some text to search. Omit this parameter to target all the people.
- # @return [Array<People>] all the people based on `params`
- def get_people(people = nil, params: {}, silent: false, options: self.options)
- return get(params: params, silent: silent, options: options) unless people.is_a?(Enumerable)
-
- launch(
- people,
- method: :get,
- params: params,
- silent: silent,
- options: options
- ).people
- end
-
# launches a batch of `method` type using `people` and the specified `params`
# @raise Exception
# - if `people` is `nil` or is not an `Enumerable`.
# - if there's no `api` connection linked to the current `Batch`.
# @param people [People, Enumerable<Person>, Enumerable<Hash>] target _People_ to launch the batch against.
# @param method [Symbol] the method to launch the batch api request with.
# @param params [Hash] api request options.
# @option params [String] :per_page the number of people included per each batch api request.
# @return [Batch::Status] the `status` of this batch launch.
- def launch(people, method:, params: {}, silent: false, options: self.options)
- batch_from(
- people,
- method: method,
- params: params,
- silent: silent,
- options: options
- )
- end
-
- def search(data, silent: false, params: {}, options: self.options) # rubocop:disable Metrics/AbcSize
- params = {per_page: batch_size(options)}.merge(params)
-
- launch(
- data,
- method: :get,
- params: params,
- silent: silent,
- options: options
- ).tap do |status|
- status.mode = :search
-
- entries = status.queue
- puts "\n"
- entries.each_with_index do |entry, i|
- if (i % 10).zero?
- percent = i * 100 / entries.length
- print "Searching: #{percent.round}% (#{i}/#{entries.length} entries)\r"
- $stdout.flush
- end
-
- next if status.success?(entry)
-
- email = nil
- if entry.respond_to?(:email)
- email = entry.email
- elsif entry.respond_to?(:to_h)
- email = entry.to_h["email"]
- end
-
- people_matching = []
- email = email.to_s.strip.downcase
- unless email.empty?
- people_matching = get(
- params: params.merge(q: email),
- silent: silent,
- options: options
- ).select do |person|
- person.email == email
- end
- end
-
- case people_matching.length
- when 1
- status.set_person_match(entry, people_matching.first)
- when 2..Float::INFINITY
- status.set_people_match(entry, people_matching)
- end
- end
- end
- end
-
- private
-
- def get(params: {}, silent: false, options: self.options)
- msg = "cannot batch get without api connnection, please provide a valid api connection!"
- fatal msg unless (people_api = api&.people)
-
- params = {per_page: batch_size(options)}.merge(params)
- people_api.get_all(params: params, silent: silent)
- end
-
- def batch_from(
- data,
- method:,
- params: {},
- silent: false,
- options: self.options
- )
- fatal "Invalid batch method: #{method}." unless self.class.valid_method?(method)
- return nil if !data || !data.is_a?(Enumerable)
-
- msg = "cannot batch #{method} without api connnection, please provide a valid api connection!"
- fatal msg unless (people_api = api&.people)
-
- # param q does not make sense here, even for GET method
- params = {per_page: batch_size(options)}.merge(params)
- per_page = params[:per_page] || batch_size(options)
-
- launch_batch(
- data,
- method: method,
- per_page: per_page,
- people_api: people_api,
- silent: silent,
- options: options
- )
- end
-
- # Default way to retrieve options (unless provided)
- def options
- ASSETS.cli.options
- end
-
- def launch_batch( # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
- data,
- method:,
- status: nil,
- job_mode: true, # rubocop:disable Lint/UnusedMethodArgument
- options: self.options,
- per_page: batch_size(options),
- people_api: api&.people,
- silent: false
- )
- iteration = 1
- done = 0
- iterations = (data.length.to_f / per_page).ceil
-
- status ||= Eco::API::Session::Batch::Status.new(
- enviro,
- queue: data,
- method: method
- )
-
- status.tap do
- pending_for_server_error = data.to_a[0..]
-
- start_time = Time.now
-
- data.each_slice(per_page) do |slice|
- msg = "starting batch '#{method}' iteration #{iteration}/#{iterations}, "
- msg << "with #{slice.length} entries of #{data.length} -- #{done} done"
- msg << (" " * 20)
- log(:info) { msg } unless silent
-
- start_slice = Time.now
-
- offer_retry_on(Ecoportal::API::Errors::TimeOut) do
- people_api.batch(job_mode: job_mode?(options)) do |batch|
- slice.each do |person|
- batch.public_send(method, person) do |response|
- faltal("Request with no response") unless response
-
- next if server_error?(response)
-
- pending_for_server_error.delete(person)
- status[person] = response
- end
- end
- end # end batch
- end
-
- done += slice.length
-
- msg = " ... iteration #{iteration}/#{iterations} done "
- msg << "in #{str_stats(start_slice, slice.length)} "
- msg << "(average: #{str_stats(start_time, done)})"
- msg << (" " * 20)
- log(:info) { msg } unless silent
-
- iteration += 1
- end # next slice
-
- # temporary working around (due to back-end problems with batch/jobs)
- unless pending_for_server_error.empty?
- msg = "Going to re-try #{pending_for_server_error.count} due to server errors"
- log(:info) { msg } unless silent
-
- launch_batch(
- pending_for_server_error,
- status: status,
- method: method,
- job_mode: false,
- per_page: per_page,
- people_api: people_api,
- silent: silent,
- options: options
- )
- end
- end
- end
-
- def server_error?(response)
- res_status = response.status
- server_error = !res_status || res_status.server_error?
- other_error = !server_error && (!res_status.code || res_status.code < 100)
- no_body = !server_error && !other_error && !response.body
- server_error || other_error || no_body
- end
-
- def offer_retry_on(error_type, retries_left = 3, &block)
- yield
- rescue error_type => err
- raise err.class, err.message, cause: nil unless retries_left.positive?
-
- explanation = "#{err}\n"
- explanation << "You have #{retries_left} retries left."
- question = " Do you want to retry (y/N)?"
-
- prompt_user(question, default: "Y", explanation: explanation, timeout: 10) do |response|
- raise unless response.upcase.start_with?("Y")
-
- puts "\nOkay... let's retry!"
- offer_retry_on(error_type, retries_left - 1, &block)
- end
- end
-
- def str_stats(start, count)
- now = Time.now
- secs = (now - start).round(2)
- if secs > 0.0
- per_sec = (count.to_f / secs).round(2)
- "#{secs}s -> #{per_sec} people/s"
- else
- " -- "
- end
+ def launch(...)
+ batch_from(...)
end
end
end
end
end