lib/zold/node/farm.rb in zold-0.16.11 vs lib/zold/node/farm.rb in zold-0.16.12

- old
+ new

@@ -96,12 +96,11 @@ threads: @threads.map do |t| "#{t.name}/#{t.status}/#{t.alive? ? 'alive' : 'dead'}" end.join(', '), cleanup: @cleanup.status, pipeline: @pipeline.size, - best: best.map(&:to_mnemo).join(', '), - alive: @alive + best: best.map(&:to_mnemo).join(', ') } end # Starts a farm, all threads, and yields the block provided. You are # supposed to use it only with the block: @@ -120,73 +119,43 @@ @log.info("No scores found in cache at #{@cache}") else @log.info("#{best.size} scores pre-loaded from #{@cache}, the best is: #{best[0]}") end cleanup(host, port, strength, threads) - @alive = true @threads = (1..threads).map do |t| Thread.new do Thread.current.abort_on_exception = true Thread.current.name = "f#{t}" loop do VerboseThread.new(@log).run do cycle(host, port, strength, threads) end - break unless @alive end end end @cleanup = Thread.new do Thread.current.abort_on_exception = true Thread.current.name = 'cleanup' loop do - max = 100 - a = (0..max - 1).take_while do - sleep 0.01 - @alive - end - unless a.count == max - @log.info("It's time to stop the cleanup thread (#{a.count} != #{max}, alive=#{@alive})...") - break - end VerboseThread.new(@log).run(true) do cleanup(host, port, strength, threads) end end end @log.info("Farm started with #{@threads.count} threads at #{host}:#{port}, strength is #{strength}") return unless block_given? begin yield(self) ensure - @log.info("Terminating the farm with #{@threads.count} threads...") - start = Time.now - finish(@cleanup) - @threads.each { |t| finish(t) } - @log.info("Farm stopped in #{Age.new(start)} (threads=#{threads}, strength=#{strength})") + @cleanup.kill + @threads.each(&:kill) + @log.info("Farm stopped (threads=#{threads}, strength=#{strength})") end end private - def finish(thread) - start = Time.now - @alive = false - @log.info("Attempting to terminate the thread \"#{thread.name}\" of the farm...") - loop do - delay = Time.now - start - if thread.join(0.1) - @log.info("Thread \"#{thread.name}\" peacefully finished in #{Age.new(start)}") - break - end - if delay > 1 - thread.exit - @log.error("Thread \"#{thread.name}\" forcefully terminated after #{Age.new(start)}") - end - end - end - def cleanup(host, port, strength, threads) scores = load before = scores.map(&:value).max.to_i save(threads, [Score.new(host: host, port: port, invoice: @invoice, strength: strength)]) scores = load @@ -198,10 +167,9 @@ end def cycle(host, port, strength, threads) s = [] loop do - return unless @alive begin s << @pipeline.pop(true) rescue ThreadError => _ sleep 0.25 end