lib/ecoportal/api/v1/job/awaiter.rb in ecoportal-api-0.10.5 vs lib/ecoportal/api/v1/job/awaiter.rb in ecoportal-api-0.10.6

- old
+ new

@@ -1,114 +1,85 @@ +require_relative 'awaiter/timer' +require_relative 'awaiter/status_frequency' + module Ecoportal module API class V1 class Job class Awaiter include Common::Client::TimeOut + include StatusFrequency - DELAY_STATUS_CHECK = 4 - MIN_STATUS_CHECK = 2 - TIMEOUT_APPROACH = :min # :conservative # adaptative timeout TIMEOUT_FALLBACK = :min - attr_reader :job, :job_id, :total - attr_accessor :timeout_approach + attr_reader :job_api, :job_id, :total - def initialize(job, job_id:, total:) - @job = job + def initialize(job_api, job_id:, total:) + @job_api = job_api @job_id = job_id @total = total - @checked = false - self.timeout_approach = self.class::TIMEOUT_APPROACH end + attr_writer :timeout_approach + + def timeout_approach + @timeout_approach ||= self.class::TIMEOUT_APPROACH + end + # Allows to preserve the learned throughput def new(**kargs) - self.class.new(job, **kargs).tap do |out| - out.throughput = throughput + self.class.new(job_api, **kargs).tap do |out| + out.throughput = throughput + out.timeout_approach = timeout_approach end end def await_completion! # rubocop:disable Metrics/AbcSize - max_timeout = timeout_for(total, approach: timeout_approach) + timeout = timeout_for(total, approach: timeout_approach) - # timeout library is evil. So we make poor-man timeout. - # https://jvns.ca/blog/2015/11/27/why-rubys-timeout-is-dangerous-and-thread-dot-raise-is-terrifying/ - before = Time.now + first = 1 + timer = Timer.new( + total: total, + timeout: timeout, + ldelay: first + ) + delay_status_check = nil loop do - status = job.status(job_id) - done = status.progress - waited = Time.now - before + sleep(first.tap {first = nil}) if first - adapted = waited - adapted = waited - (delay_status_check / 2) if delay_status_check - ratio = throughput!(adapted, count: done) + timer = timer.new( + status: job_api.status(job_id), + ldelay: delay_status_check + ) - break status if status.complete?(total) + # ratio = throughput!(timer.net_waited, count: timer.progress) + ratio = throughput!(timer.lwaited, count: timer.increased) - pending = status.pending(total) - left = max_timeout - waited + break timer.status if timer.complete? - timeout!(status, timeout: max_timeout) unless left.positive? + timer.on_timeout! do + @timeout_approach = self.class::TIMEOUT_FALLBACK + end - delay_status_check = status_check_in(pending, timeout_in: left) + delay_status_check = status_check_in( + timer.pending, + timeout_in: timer.timeout_in + ) msg = " ... Awaiting #{delay_status_check} s. -- " - msg << " TimeOut: #{left.round(2)} s. " + msg << " TimeOut: #{timer.time_left} s. " msg << "(job '#{job_id}') " - msg << "Done: #{done} (est. #{ratio} rec/s) " + msg << "Done: #{timer.progress} (est. #{ratio.round(2)} rec/s) " msg << " \r" print msg $stdout.flush sleep(delay_status_check) end - end - - private - - def timeout!(status, timeout:) - self.timeout_approach = self.class::TIMEOUT_FALLBACK - - msg = "Job '#{job_id}' not complete (size: #{total}).\n" - msg << " Timed out after #{timeout} seconds.\n" - msg << " Current status: #{status}" - - raise API::Errors::TimeOut, msg - end - - def checked? - @checked - end - - def status_check_in(pending, timeout_in:) - unless checked? - @checked = true - return min_delay_status_check - end - - return default_delay_status_check if around_min_throughput? - - eta = eta_for(pending, approach: :optimistic) - check_in_max = [eta, timeout_in].min * 0.90 - - check_in_best = check_in_max / 2.0 - default_5 = default_delay_status_check * 5 - - top_check = [default_5, check_in_best].min.ceil - [top_check, min_delay_status_check].max - end - - def default_delay_status_check - self.class::DELAY_STATUS_CHECK - end - - def min_delay_status_check - self.class::MIN_STATUS_CHECK end end end end end