lib/zold/node/farm.rb in zold-0.16.12 vs lib/zold/node/farm.rb in zold-0.16.13

- old
+ new

@@ -22,15 +22,16 @@ require 'time' require 'open3' require 'backtrace' require 'futex' +require 'concurrent' require 'json' require 'zold/score' require_relative '../log' require_relative '../age' -require_relative '../verbose_thread' +require_relative '../endless' require_relative 'farmers' # The farm of scores. # Author:: Yegor Bugayenko (yegor256@gmail.com) # Copyright:: Copyright (c) 2018 Yegor Bugayenko @@ -94,11 +95,10 @@ def to_json { threads: @threads.map do |t| "#{t.name}/#{t.status}/#{t.alive? ? 'alive' : 'dead'}" end.join(', '), - cleanup: @cleanup.status, pipeline: @pipeline.size, best: best.map(&:to_mnemo).join(', ') } end @@ -110,45 +110,40 @@ # # Everything else... # end # # The farm will stop all its threads and close all resources safely # right after the block provided exists. - def start(host, port, strength: 8, threads: 8) + def start(host, port, strength: Score::STRENGTH, threads: Concurrent.processor_count) raise 'Block is required for the farm to start' unless block_given? @log.info('Zero-threads farm won\'t score anything!') if threads.zero? if best.empty? - @log.info("No scores found in cache at #{@cache}") + @log.info("No scores found in the cache at #{@cache}") else @log.info("#{best.size} scores pre-loaded from #{@cache}, the best is: #{best[0]}") end - cleanup(host, port, strength, threads) @threads = (1..threads).map do |t| + Thread.current.thread_variable_set(:tid, t.to_s) Thread.new do - Thread.current.abort_on_exception = true - Thread.current.name = "f#{t}" - loop do - VerboseThread.new(@log).run do - cycle(host, port, strength, threads) - end + Endless.new("f#{t}", log: @log).run do + cycle(host, port, strength, threads) end end end - @cleanup = Thread.new do - Thread.current.abort_on_exception = true - Thread.current.name = 'cleanup' - loop do - VerboseThread.new(@log).run(true) do - cleanup(host, port, strength, threads) - end + ready = false + @threads << Thread.new do + Endless.new('cleanup', log: @log).run do + cleanup(host, port, strength, threads) + ready = true + sleep(1) end end - @log.info("Farm started with #{@threads.count} threads at #{host}:#{port}, strength is #{strength}") - return unless block_given? + @log.info("Farm started with #{@threads.count} threads (one for cleanup) \ +at #{host}:#{port}, strength is #{strength}") + loop { break if ready } begin yield(self) ensure - @cleanup.kill @threads.each(&:kill) @log.info("Farm stopped (threads=#{threads}, strength=#{strength})") end end @@ -170,11 +165,11 @@ s = [] loop do begin s << @pipeline.pop(true) rescue ThreadError => _ - sleep 0.25 + sleep(0.25) end s.compact! break unless s.empty? end s = s[0] @@ -191,11 +186,11 @@ end def save(threads, list = []) scores = load + list period = @lifetime / [threads, 1].max - Futex.new(@cache, log: @log).open do |f| + Futex.new(@cache).open do |f| IO.write( f, scores.select(&:valid?) .reject(&:expired?) .sort_by(&:value) @@ -208,22 +203,19 @@ ) end end def load - Futex.new(@cache, log: @log).open do |f| - if File.exist?(f) - IO.read(f).split(/\n/).map do |t| - begin - Score.parse(t) - rescue StandardError => e - @log.error(Backtrace.new(e).to_s) - nil - end - end.compact - else - [] - end + return [] unless File.exist?(@cache) + Futex.new(@cache).open do |f| + IO.read(f).split(/\n/).map do |t| + begin + Score.parse(t) + rescue StandardError => e + @log.error(Backtrace.new(e).to_s) + nil + end + end.compact end end end end