lib/zold/node/farm.rb in zold-0.18.6 vs lib/zold/node/farm.rb in zold-0.18.7

- old
+ new

@@ -26,10 +26,11 @@ require 'futex' require 'concurrent' require 'json' require 'zold/score' require_relative '../log' +require_relative '../thread_pool' require_relative '../age' require_relative '../endless' require_relative 'farmers' # The farm of scores. @@ -62,11 +63,11 @@ @log = log @cache = File.expand_path(cache) @invoice = invoice @pipeline = Queue.new @farmer = farmer - @threads = [] + @threads = ThreadPool.new('farm') @lifetime = lifetime @strength = strength end # Returns the list of best scores the farm managed to find up to now. The @@ -80,25 +81,18 @@ def to_text [ "Current time: #{Time.now.utc.iso8601}", "Ruby processes: #{`ps ax | grep zold | wc -l`}", JSON.pretty_generate(to_json), - @threads.map do |t| - trace = t.backtrace || [] - [ - "#{t.name}: status=#{t.status}; alive=#{t.alive?}", - 'Vars: ' + t.thread_variables.map { |v| "#{v}=\"#{t.thread_variable_get(v)}\"" }.join('; '), - " #{trace.join("\n ")}" - ].join("\n") - end + @threads.to_s ].flatten.join("\n\n") end # Renders the Farm into JSON to show for the end-user in front.rb. def to_json { - threads: @threads.map { |t| "#{t.name}/#{t.status}/#{t.alive? ? 'alive' : 'dead'}" }.join(', '), + threads: @threads.to_json, pipeline: @pipeline.size, best: best.map(&:to_mnemo).join(', '), farmer: @farmer.class.name } end @@ -119,57 +113,54 @@ if best.empty? @log.info("No scores found in the cache at #{@cache}") else @log.info("#{best.size} scores pre-loaded from #{@cache}, the best is: #{best[0]}") end - @threads = (1..threads).map do |t| - Thread.start do + (1..threads).map do |t| + @threads.add do Thread.current.thread_variable_set(:tid, t.to_s) Endless.new("f#{t}", log: @log).run do cycle(host, port, threads) end end end unless threads.zero? ready = false - @threads << Thread.start do + @threads.add do Endless.new('cleanup', log: @log).run do cleanup(host, port, threads) ready = true sleep(1) end end loop { break if ready } end - if @threads.empty? + if threads.zero? cleanup(host, port, threads) @log.info("Farm started with no threads (there will be no score) at #{host}:#{port}") else @log.info("Farm started with #{@threads.count} threads (one for cleanup) \ at #{host}:#{port}, strength is #{@strength}") end begin yield(self) ensure - @log.info("Farm stopping with #{threads} threads...") - @threads.each(&:kill) - @threads.each(&:join) - @log.info("Farm stopped, #{threads} threads killed") + @threads.kill end end private def cleanup(host, port, threads) scores = load before = scores.map(&:value).max.to_i save(threads, [Score.new(host: host, port: port, invoice: @invoice, strength: @strength)]) scores = load - free = scores.reject { |s| @threads.find { |t| t.name == s.to_mnemo } } + free = scores.reject { |s| @threads.exists?(s.to_mnemo) } @pipeline << free[0] if @pipeline.size.zero? && !free.empty? after = scores.map(&:value).max.to_i return unless before != after && !after.zero? - @log.debug("#{Thread.current.name}: best score of #{scores.count} is #{scores[0]}") + @log.debug("#{Thread.current.name}: best score of #{scores.count} is #{scores[0].reduced(4)}") end def cycle(host, port, threads) s = [] loop do