lib/ecoportal/api/v1/job/awaiter.rb in ecoportal-api-0.10.5 vs lib/ecoportal/api/v1/job/awaiter.rb in ecoportal-api-0.10.6
- old
+ new
@@ -1,114 +1,85 @@
+require_relative 'awaiter/timer'
+require_relative 'awaiter/status_frequency'
+
module Ecoportal
module API
class V1
class Job
class Awaiter
include Common::Client::TimeOut
+ include StatusFrequency
- DELAY_STATUS_CHECK = 4
- MIN_STATUS_CHECK = 2
-
TIMEOUT_APPROACH = :min # :conservative # adaptative timeout
TIMEOUT_FALLBACK = :min
- attr_reader :job, :job_id, :total
- attr_accessor :timeout_approach
+ attr_reader :job_api, :job_id, :total
- def initialize(job, job_id:, total:)
- @job = job
+ def initialize(job_api, job_id:, total:)
+ @job_api = job_api
@job_id = job_id
@total = total
- @checked = false
- self.timeout_approach = self.class::TIMEOUT_APPROACH
end
+ attr_writer :timeout_approach
+
+ def timeout_approach
+ @timeout_approach ||= self.class::TIMEOUT_APPROACH
+ end
+
# Allows to preserve the learned throughput
def new(**kargs)
- self.class.new(job, **kargs).tap do |out|
- out.throughput = throughput
+ self.class.new(job_api, **kargs).tap do |out|
+ out.throughput = throughput
+ out.timeout_approach = timeout_approach
end
end
def await_completion! # rubocop:disable Metrics/AbcSize
- max_timeout = timeout_for(total, approach: timeout_approach)
+ timeout = timeout_for(total, approach: timeout_approach)
- # timeout library is evil. So we make poor-man timeout.
- # https://jvns.ca/blog/2015/11/27/why-rubys-timeout-is-dangerous-and-thread-dot-raise-is-terrifying/
- before = Time.now
+ first = 1
+ timer = Timer.new(
+ total: total,
+ timeout: timeout,
+ ldelay: first
+ )
+
delay_status_check = nil
loop do
- status = job.status(job_id)
- done = status.progress
- waited = Time.now - before
+ sleep(first.tap {first = nil}) if first
- adapted = waited
- adapted = waited - (delay_status_check / 2) if delay_status_check
- ratio = throughput!(adapted, count: done)
+ timer = timer.new(
+ status: job_api.status(job_id),
+ ldelay: delay_status_check
+ )
- break status if status.complete?(total)
+ # ratio = throughput!(timer.net_waited, count: timer.progress)
+ ratio = throughput!(timer.lwaited, count: timer.increased)
- pending = status.pending(total)
- left = max_timeout - waited
+ break timer.status if timer.complete?
- timeout!(status, timeout: max_timeout) unless left.positive?
+ timer.on_timeout! do
+ @timeout_approach = self.class::TIMEOUT_FALLBACK
+ end
- delay_status_check = status_check_in(pending, timeout_in: left)
+ delay_status_check = status_check_in(
+ timer.pending,
+ timeout_in: timer.timeout_in
+ )
msg = " ... Awaiting #{delay_status_check} s. -- "
- msg << " TimeOut: #{left.round(2)} s. "
+ msg << " TimeOut: #{timer.time_left} s. "
msg << "(job '#{job_id}') "
- msg << "Done: #{done} (est. #{ratio} rec/s) "
+ msg << "Done: #{timer.progress} (est. #{ratio.round(2)} rec/s) "
msg << " \r"
print msg
$stdout.flush
sleep(delay_status_check)
end
- end
-
- private
-
- def timeout!(status, timeout:)
- self.timeout_approach = self.class::TIMEOUT_FALLBACK
-
- msg = "Job '#{job_id}' not complete (size: #{total}).\n"
- msg << " Timed out after #{timeout} seconds.\n"
- msg << " Current status: #{status}"
-
- raise API::Errors::TimeOut, msg
- end
-
- def checked?
- @checked
- end
-
- def status_check_in(pending, timeout_in:)
- unless checked?
- @checked = true
- return min_delay_status_check
- end
-
- return default_delay_status_check if around_min_throughput?
-
- eta = eta_for(pending, approach: :optimistic)
- check_in_max = [eta, timeout_in].min * 0.90
-
- check_in_best = check_in_max / 2.0
- default_5 = default_delay_status_check * 5
-
- top_check = [default_5, check_in_best].min.ceil
- [top_check, min_delay_status_check].max
- end
-
- def default_delay_status_check
- self.class::DELAY_STATUS_CHECK
- end
-
- def min_delay_status_check
- self.class::MIN_STATUS_CHECK
end
end
end
end
end