lib/zold/node/farm.rb in zold-0.14.35 vs lib/zold/node/farm.rb in zold-0.14.36

- old
+ new

@@ -40,11 +40,11 @@ def best [] end end - def initialize(invoice, cache, log: Log::Quiet.new) + def initialize(invoice, cache = File.join(Dir.pwd, 'farm'), log: Log::Quiet.new) @log = log @cache = cache @invoice = invoice @pipeline = Queue.new @threads = [] @@ -90,12 +90,19 @@ end @alive = true @cleanup = Thread.new do Thread.current.abort_on_exception = true Thread.current.name = 'cleanup' - while @alive - sleep(60) unless strength == 1 # which will only happen in tests + loop do + a = [0..600].take_while do + sleep 0.1 + @alive + end + unless a.count == 600 + @log.info("It's time to stop the cleanup thread...") + break + end VerboseThread.new(@log).run do cleanup(host, port, strength, threads) end end end @@ -104,29 +111,35 @@ begin yield(self) ensure @log.info("Terminating the farm with #{@threads.count} threads...") start = Time.now - @alive = false - if strength == 1 - @cleanup.join - @log.info("Cleanup thread finished in #{(Time.now - start).round(2)}s") - else - @cleanup.exit - @log.info("Cleanup thread killed in #{(Time.now - start).round(2)}s") - end - @threads.each do |t| - tstart = Time.now - t.join(0.1) - @log.info("Thread #{t.name} finished in #{(Time.now - tstart).round(2)}s") - end + finish(@cleanup) + @threads.each { |t| finish(t) } @log.info("Farm stopped in #{(Time.now - start).round(2)}s") end end private + def finish(thread) + start = Time.now + @alive = false + @log.info("Attempting to terminate the thread \"#{thread.name}\"...") + loop do + delay = (Time.now - start).round(2) + if thread.join(0.1) + @log.info("Thread \"#{thread.name}\" finished in #{delay}s") + break + end + if delay > 10 + thread.exit + @log.error("Thread \"#{thread.name}\" forcefully terminated after #{delay}s") + end + end + end + def cleanup(host, port, strength, threads) scores = load before = scores.map(&:value).max.to_i save(threads, [Score.new(time: Time.now, host: host, port: port, invoice: @invoice, strength: strength)]) scores = load @@ -139,37 +152,51 @@ free = scores.reject { |s| @threads.find { |t| t.name == s.to_mnemo } } @pipeline << free[0] if @pipeline.size.zero? && !free.empty? end def cycle(host, port, strength, threads) - s = @pipeline.pop + s = [] + loop do + return unless @alive + begin + s << @pipeline.pop(true) + rescue ThreadError => _ + sleep 0.25 + end + s.compact! + break unless s.empty? + end + s = s[0] return unless s.valid? return unless s.host == host return unless s.port == port return unless s.strength >= strength Thread.current.name = s.to_mnemo bin = File.expand_path(File.join(File.dirname(__FILE__), '../../../bin/zold')) - Open3.popen2e("ruby #{bin} next \"#{s}\"") do |stdin, stdout, thr| + Open3.popen2e("ruby #{bin} --skip-upgrades next \"#{s}\"") do |stdin, stdout, thr| @log.debug("Score counting started in process ##{thr.pid}") begin stdin.close buffer = +'' loop do begin buffer << stdout.read_nonblock(1024) rescue IO::WaitReadable => e - @log.debug("Still waiting for data from the score provider: #{e.message}") + @log.debug("Still waiting for the data from the process ##{thr.pid}: #{e.message}") end - if buffer.end_with?("\n") + if buffer.end_with?("\n") && thr.value.to_i.zero? score = Score.parse(buffer.strip) @log.debug("New score discovered: #{score}") save(threads, [score]) cleanup(host, port, strength, threads) break end - break if stdout.eof? + if stdout.closed? + raise "Failed to calculate the score (##{thr.value}): #{buffer}" unless thr.value.to_i.zero? + break + end break unless @alive - sleep 0.1 + sleep 0.25 end rescue StandardError => e @log.error(Backtrace.new(e).to_s) ensure kill(thr.pid)