lib/zold/node/farm.rb in zold-0.17.3 vs lib/zold/node/farm.rb in zold-0.17.4

- old
+ new

@@ -56,18 +56,19 @@ # # <tt>lifetime</tt> is the amount of seconds for a score to live in the farm, by default # it's the entire day, since the Score expires in 24 hours; can be decreased for the # purpose of unit testing. def initialize(invoice, cache = File.join(Dir.pwd, 'farm'), log: Log::NULL, - farmer: Farmers::Plain.new, lifetime: 24 * 60 * 60) + farmer: Farmers::Plain.new, lifetime: 24 * 60 * 60, strength: Score::STRENGTH) @log = log @cache = File.expand_path(cache) @invoice = invoice @pipeline = Queue.new @farmer = farmer @threads = [] @lifetime = lifetime + @strength = strength end # Returns the list of best scores the farm managed to find up to now. The # list is NEVER empty, even if the farm has just started. If it's empty, # it's definitely a bug. If the farm is just fresh start, the list will @@ -110,11 +111,11 @@ # # Everything else... # end # # The farm will stop all its threads and close all resources safely # right after the block provided exists. - def start(host, port, strength: Score::STRENGTH, threads: Concurrent.processor_count) + def start(host, port, threads: Concurrent.processor_count) raise 'Block is required for the farm to start' unless block_given? @log.info('Zero-threads farm won\'t score anything!') if threads.zero? if best.empty? @log.info("No scores found in the cache at #{@cache}") else @@ -122,55 +123,55 @@ end @threads = (1..threads).map do |t| Thread.new do Thread.current.thread_variable_set(:tid, t.to_s) Endless.new("f#{t}", log: @log).run do - cycle(host, port, strength, threads) + cycle(host, port, threads) end end end unless threads.zero? ready = false @threads << Thread.new do Endless.new('cleanup', log: @log).run do - cleanup(host, port, strength, threads) + cleanup(host, port, threads) ready = true sleep(1) end end loop { break if ready } end if @threads.empty? - cleanup(host, port, strength, threads) + cleanup(host, port, threads) @log.info("Farm started with no threads (there will be no score) at #{host}:#{port}") else @log.info("Farm started with #{@threads.count} threads (one for cleanup) \ -at #{host}:#{port}, strength is #{strength}") +at #{host}:#{port}, strength is #{@strength}") end begin yield(self) ensure @threads.each(&:kill) - @log.info("Farm stopped (threads=#{threads}, strength=#{strength})") + @log.info("Farm stopped (threads=#{threads}, strength=#{@strength})") end end private - def cleanup(host, port, strength, threads) + def cleanup(host, port, threads) scores = load before = scores.map(&:value).max.to_i - save(threads, [Score.new(host: host, port: port, invoice: @invoice, strength: strength)]) + save(threads, [Score.new(host: host, port: port, invoice: @invoice, strength: @strength)]) scores = load free = scores.reject { |s| @threads.find { |t| t.name == s.to_mnemo } } @pipeline << free[0] if @pipeline.size.zero? && !free.empty? after = scores.map(&:value).max.to_i return unless before != after && !after.zero? @log.debug("#{Thread.current.name}: best score of #{scores.count} is #{scores[0]}") end - def cycle(host, port, strength, threads) + def cycle(host, port, threads) s = [] loop do begin s << @pipeline.pop(true) rescue ThreadError => _ @@ -181,26 +182,27 @@ end s = s[0] return unless s.valid? return unless s.host == host return unless s.port == port - return unless s.strength >= strength + return unless s.strength >= @strength Thread.current.name = s.to_mnemo Thread.current.thread_variable_set(:start, Time.now.utc.iso8601) score = @farmer.up(s) - @log.debug("New score discovered: #{score}") if strength > 4 + @log.debug("New score discovered: #{score}") if @strength > 4 save(threads, [score]) - cleanup(host, port, strength, threads) + cleanup(host, port, threads) end def save(threads, list = []) scores = load + list period = @lifetime / [threads, 1].max Futex.new(@cache).open do |f| IO.write( f, scores.select(&:valid?) .reject(&:expired?) + .reject { |s| s.strength < @strength } .sort_by(&:value) .reverse .uniq(&:time) .uniq { |s| (s.age / period).round } .map(&:to_s)