lib/zold/node/farm.rb in zold-0.14.35 vs lib/zold/node/farm.rb in zold-0.14.36
- old
+ new
@@ -40,11 +40,11 @@
def best
[]
end
end
- def initialize(invoice, cache, log: Log::Quiet.new)
+ def initialize(invoice, cache = File.join(Dir.pwd, 'farm'), log: Log::Quiet.new)
@log = log
@cache = cache
@invoice = invoice
@pipeline = Queue.new
@threads = []
@@ -90,12 +90,19 @@
end
@alive = true
@cleanup = Thread.new do
Thread.current.abort_on_exception = true
Thread.current.name = 'cleanup'
- while @alive
- sleep(60) unless strength == 1 # which will only happen in tests
+ loop do
+ a = [0..600].take_while do
+ sleep 0.1
+ @alive
+ end
+ unless a.count == 600
+ @log.info("It's time to stop the cleanup thread...")
+ break
+ end
VerboseThread.new(@log).run do
cleanup(host, port, strength, threads)
end
end
end
@@ -104,29 +111,35 @@
begin
yield(self)
ensure
@log.info("Terminating the farm with #{@threads.count} threads...")
start = Time.now
- @alive = false
- if strength == 1
- @cleanup.join
- @log.info("Cleanup thread finished in #{(Time.now - start).round(2)}s")
- else
- @cleanup.exit
- @log.info("Cleanup thread killed in #{(Time.now - start).round(2)}s")
- end
- @threads.each do |t|
- tstart = Time.now
- t.join(0.1)
- @log.info("Thread #{t.name} finished in #{(Time.now - tstart).round(2)}s")
- end
+ finish(@cleanup)
+ @threads.each { |t| finish(t) }
@log.info("Farm stopped in #{(Time.now - start).round(2)}s")
end
end
private
+ def finish(thread)
+ start = Time.now
+ @alive = false
+ @log.info("Attempting to terminate the thread \"#{thread.name}\"...")
+ loop do
+ delay = (Time.now - start).round(2)
+ if thread.join(0.1)
+ @log.info("Thread \"#{thread.name}\" finished in #{delay}s")
+ break
+ end
+ if delay > 10
+ thread.exit
+ @log.error("Thread \"#{thread.name}\" forcefully terminated after #{delay}s")
+ end
+ end
+ end
+
def cleanup(host, port, strength, threads)
scores = load
before = scores.map(&:value).max.to_i
save(threads, [Score.new(time: Time.now, host: host, port: port, invoice: @invoice, strength: strength)])
scores = load
@@ -139,37 +152,51 @@
free = scores.reject { |s| @threads.find { |t| t.name == s.to_mnemo } }
@pipeline << free[0] if @pipeline.size.zero? && !free.empty?
end
def cycle(host, port, strength, threads)
- s = @pipeline.pop
+ s = []
+ loop do
+ return unless @alive
+ begin
+ s << @pipeline.pop(true)
+ rescue ThreadError => _
+ sleep 0.25
+ end
+ s.compact!
+ break unless s.empty?
+ end
+ s = s[0]
return unless s.valid?
return unless s.host == host
return unless s.port == port
return unless s.strength >= strength
Thread.current.name = s.to_mnemo
bin = File.expand_path(File.join(File.dirname(__FILE__), '../../../bin/zold'))
- Open3.popen2e("ruby #{bin} next \"#{s}\"") do |stdin, stdout, thr|
+ Open3.popen2e("ruby #{bin} --skip-upgrades next \"#{s}\"") do |stdin, stdout, thr|
@log.debug("Score counting started in process ##{thr.pid}")
begin
stdin.close
buffer = +''
loop do
begin
buffer << stdout.read_nonblock(1024)
rescue IO::WaitReadable => e
- @log.debug("Still waiting for data from the score provider: #{e.message}")
+ @log.debug("Still waiting for the data from the process ##{thr.pid}: #{e.message}")
end
- if buffer.end_with?("\n")
+ if buffer.end_with?("\n") && thr.value.to_i.zero?
score = Score.parse(buffer.strip)
@log.debug("New score discovered: #{score}")
save(threads, [score])
cleanup(host, port, strength, threads)
break
end
- break if stdout.eof?
+ if stdout.closed?
+ raise "Failed to calculate the score (##{thr.value}): #{buffer}" unless thr.value.to_i.zero?
+ break
+ end
break unless @alive
- sleep 0.1
+ sleep 0.25
end
rescue StandardError => e
@log.error(Backtrace.new(e).to_s)
ensure
kill(thr.pid)