lib/zold/node/farm.rb in zold-0.13.22 vs lib/zold/node/farm.rb in zold-0.13.23

- old
+ new

@@ -40,45 +40,39 @@ def initialize(invoice, cache, log: Log::Quiet.new) @log = log @cache = cache @invoice = invoice - @scores = [] + @pipeline = Queue.new @threads = [] - @best = [] @mutex = Mutex.new end def best - @mutex.synchronize do - @best.to_a - end + load end def to_text @threads.map do |t| - "#{t.name}: status=#{t.status}; alive=#{t.alive};\n #{t.backtrace.join("\n ")}" + "#{t.name}: status=#{t.status}; alive=#{t.alive?};\n #{t.backtrace.join("\n ")}" end.join("\n") end def to_json { threads: @threads.map do |t| "#{t.name}/#{t.status}/#{t.alive? ? 'A' : 'D'}" end.join(', '), - scores: @scores.size, - best: @best.map(&:value).join(', '), - history: history.count + pipeline: @pipeline.size, + best: best.map { |s| "#{s.value}/#{(s.age / 60).round}m" }.join(', ') } end def start(host, port, strength: 8, threads: 8) - @log.debug('Zero-threads farm won\'t score anything!') if threads.zero? - @scores = Queue.new - @best = history - clean(host, port, strength, threads) - @log.info("#{@scores.size} scores pre-loaded, the best is: #{@best[0]}") + @log.info('Zero-threads farm won\'t score anything!') if threads.zero? + cleanup(host, port, strength, threads) + @log.info("#{@pipeline.size} scores pre-loaded, the best is: #{best[0]}") @threads = (1..threads).map do |t| Thread.new do Thread.current.name = "f#{t}" loop do VerboseThread.new(@log).run do @@ -86,15 +80,15 @@ end end end end @threads << Thread.new do - Thread.current.name = 'cleaner' + Thread.current.name = 'cleanup' loop do sleep(60) unless strength == 1 # which will only happen in tests VerboseThread.new(@log).run do - clean(host, port, strength, threads) + cleanup(host, port, strength, threads) end end end @log.info("Farm started with #{@threads.count} threads at #{host}:#{port}, strength is #{strength}") return unless block_given? @@ -116,52 +110,59 @@ @log.info("Farm stopped in #{(Time.now - start).round(2)}s") end private - def clean(host, port, strength, threads) - @mutex.synchronize do - before = @best.map(&:value).max.to_i - @best = @best.select(&:valid?).reject(&:expired?).sort_by(&:value).reverse - @best = @best.take(threads) unless threads.zero? - if @best.empty? || !threads.zero? && @best.map(&:age_hours).min > 24 / threads - @best << Score.new(Time.now, host, port, @invoice, strength: strength) - end - @best.sort_by(&:age_hours).each { |b| @scores << b } - after = @best.map(&:value).max - @log.debug("#{Thread.current.name}: best score is #{@best[0]}") if before != after && !after.zero? + def cleanup(host, port, strength, threads) + scores = load + before = scores.map(&:value).max.to_i + if scores.empty? || !threads.zero? && scores.map(&:age).min > 24 * 60 * 60 / threads + save([Score.new(Time.now, host, port, @invoice, strength: strength)]) + else + save end + scores = load + @pipeline << scores.min_by(&:age) if @pipeline.size < threads + after = scores.map(&:value).max.to_i + @log.debug("#{Thread.current.name}: best score is #{scores[0]}") if before != after && !after.zero? end def cycle(host, port, strength, threads) - s = @scores.pop + s = @pipeline.pop return unless s.valid? return unless s.host == host return unless s.port == port return unless s.strength >= strength - n = s.next + save([s.next]) + cleanup(host, port, strength, threads) + end + + def save(list = []) + scores = load + list @mutex.synchronize do - save(n) - @best << n + AtomicFile.new(@cache).write( + scores.select(&:valid?) + .reject(&:expired?) + .map(&:to_s) + .uniq + .join("\n") + ) end - clean(host, port, strength, threads) - @scores << n end - def save(score) - AtomicFile.new(@cache).write((history + [score]).map(&:to_s).uniq.join("\n")) - end - - def history - if File.exist?(@cache) - AtomicFile.new(@cache).read - .split(/\n/) - .map { |t| Score.parse(t) } - .select(&:valid?) - .sort_by(&:value) - .reverse - else - [] + def load + @mutex.synchronize do + if File.exist?(@cache) + AtomicFile.new(@cache).read + .split(/\n/) + .map { |t| Score.parse(t) } + .select(&:valid?) + .reject(&:expired?) + .sort_by(&:value) + .reverse + else + [] + end end end end end