lib/zold/node/farm.rb in zold-0.16.11 vs lib/zold/node/farm.rb in zold-0.16.12
- old
+ new
@@ -96,12 +96,11 @@
threads: @threads.map do |t|
"#{t.name}/#{t.status}/#{t.alive? ? 'alive' : 'dead'}"
end.join(', '),
cleanup: @cleanup.status,
pipeline: @pipeline.size,
- best: best.map(&:to_mnemo).join(', '),
- alive: @alive
+ best: best.map(&:to_mnemo).join(', ')
}
end
# Starts a farm, all threads, and yields the block provided. You are
# supposed to use it only with the block:
@@ -120,73 +119,43 @@
@log.info("No scores found in cache at #{@cache}")
else
@log.info("#{best.size} scores pre-loaded from #{@cache}, the best is: #{best[0]}")
end
cleanup(host, port, strength, threads)
- @alive = true
@threads = (1..threads).map do |t|
Thread.new do
Thread.current.abort_on_exception = true
Thread.current.name = "f#{t}"
loop do
VerboseThread.new(@log).run do
cycle(host, port, strength, threads)
end
- break unless @alive
end
end
end
@cleanup = Thread.new do
Thread.current.abort_on_exception = true
Thread.current.name = 'cleanup'
loop do
- max = 100
- a = (0..max - 1).take_while do
- sleep 0.01
- @alive
- end
- unless a.count == max
- @log.info("It's time to stop the cleanup thread (#{a.count} != #{max}, alive=#{@alive})...")
- break
- end
VerboseThread.new(@log).run(true) do
cleanup(host, port, strength, threads)
end
end
end
@log.info("Farm started with #{@threads.count} threads at #{host}:#{port}, strength is #{strength}")
return unless block_given?
begin
yield(self)
ensure
- @log.info("Terminating the farm with #{@threads.count} threads...")
- start = Time.now
- finish(@cleanup)
- @threads.each { |t| finish(t) }
- @log.info("Farm stopped in #{Age.new(start)} (threads=#{threads}, strength=#{strength})")
+ @cleanup.kill
+ @threads.each(&:kill)
+ @log.info("Farm stopped (threads=#{threads}, strength=#{strength})")
end
end
private
- def finish(thread)
- start = Time.now
- @alive = false
- @log.info("Attempting to terminate the thread \"#{thread.name}\" of the farm...")
- loop do
- delay = Time.now - start
- if thread.join(0.1)
- @log.info("Thread \"#{thread.name}\" peacefully finished in #{Age.new(start)}")
- break
- end
- if delay > 1
- thread.exit
- @log.error("Thread \"#{thread.name}\" forcefully terminated after #{Age.new(start)}")
- end
- end
- end
-
def cleanup(host, port, strength, threads)
scores = load
before = scores.map(&:value).max.to_i
save(threads, [Score.new(host: host, port: port, invoice: @invoice, strength: strength)])
scores = load
@@ -198,10 +167,9 @@
end
def cycle(host, port, strength, threads)
s = []
loop do
- return unless @alive
begin
s << @pipeline.pop(true)
rescue ThreadError => _
sleep 0.25
end