lib/zold/node/farm.rb in zold-0.13.22 vs lib/zold/node/farm.rb in zold-0.13.23
- old
+ new
@@ -40,45 +40,39 @@
def initialize(invoice, cache, log: Log::Quiet.new)
@log = log
@cache = cache
@invoice = invoice
- @scores = []
+ @pipeline = Queue.new
@threads = []
- @best = []
@mutex = Mutex.new
end
def best
- @mutex.synchronize do
- @best.to_a
- end
+ load
end
def to_text
@threads.map do |t|
- "#{t.name}: status=#{t.status}; alive=#{t.alive};\n #{t.backtrace.join("\n ")}"
+ "#{t.name}: status=#{t.status}; alive=#{t.alive?};\n #{t.backtrace.join("\n ")}"
end.join("\n")
end
def to_json
{
threads: @threads.map do |t|
"#{t.name}/#{t.status}/#{t.alive? ? 'A' : 'D'}"
end.join(', '),
- scores: @scores.size,
- best: @best.map(&:value).join(', '),
- history: history.count
+ pipeline: @pipeline.size,
+ best: best.map { |s| "#{s.value}/#{(s.age / 60).round}m" }.join(', ')
}
end
def start(host, port, strength: 8, threads: 8)
- @log.debug('Zero-threads farm won\'t score anything!') if threads.zero?
- @scores = Queue.new
- @best = history
- clean(host, port, strength, threads)
- @log.info("#{@scores.size} scores pre-loaded, the best is: #{@best[0]}")
+ @log.info('Zero-threads farm won\'t score anything!') if threads.zero?
+ cleanup(host, port, strength, threads)
+ @log.info("#{@pipeline.size} scores pre-loaded, the best is: #{best[0]}")
@threads = (1..threads).map do |t|
Thread.new do
Thread.current.name = "f#{t}"
loop do
VerboseThread.new(@log).run do
@@ -86,15 +80,15 @@
end
end
end
end
@threads << Thread.new do
- Thread.current.name = 'cleaner'
+ Thread.current.name = 'cleanup'
loop do
sleep(60) unless strength == 1 # which will only happen in tests
VerboseThread.new(@log).run do
- clean(host, port, strength, threads)
+ cleanup(host, port, strength, threads)
end
end
end
@log.info("Farm started with #{@threads.count} threads at #{host}:#{port}, strength is #{strength}")
return unless block_given?
@@ -116,52 +110,59 @@
@log.info("Farm stopped in #{(Time.now - start).round(2)}s")
end
private
- def clean(host, port, strength, threads)
- @mutex.synchronize do
- before = @best.map(&:value).max.to_i
- @best = @best.select(&:valid?).reject(&:expired?).sort_by(&:value).reverse
- @best = @best.take(threads) unless threads.zero?
- if @best.empty? || !threads.zero? && @best.map(&:age_hours).min > 24 / threads
- @best << Score.new(Time.now, host, port, @invoice, strength: strength)
- end
- @best.sort_by(&:age_hours).each { |b| @scores << b }
- after = @best.map(&:value).max
- @log.debug("#{Thread.current.name}: best score is #{@best[0]}") if before != after && !after.zero?
+ def cleanup(host, port, strength, threads)
+ scores = load
+ before = scores.map(&:value).max.to_i
+ if scores.empty? || !threads.zero? && scores.map(&:age).min > 24 * 60 * 60 / threads
+ save([Score.new(Time.now, host, port, @invoice, strength: strength)])
+ else
+ save
end
+ scores = load
+ @pipeline << scores.min_by(&:age) if @pipeline.size < threads
+ after = scores.map(&:value).max.to_i
+ @log.debug("#{Thread.current.name}: best score is #{scores[0]}") if before != after && !after.zero?
end
def cycle(host, port, strength, threads)
- s = @scores.pop
+ s = @pipeline.pop
return unless s.valid?
return unless s.host == host
return unless s.port == port
return unless s.strength >= strength
- n = s.next
+ save([s.next])
+ cleanup(host, port, strength, threads)
+ end
+
+ def save(list = [])
+ scores = load + list
@mutex.synchronize do
- save(n)
- @best << n
+ AtomicFile.new(@cache).write(
+ scores.select(&:valid?)
+ .reject(&:expired?)
+ .map(&:to_s)
+ .uniq
+ .join("\n")
+ )
end
- clean(host, port, strength, threads)
- @scores << n
end
- def save(score)
- AtomicFile.new(@cache).write((history + [score]).map(&:to_s).uniq.join("\n"))
- end
-
- def history
- if File.exist?(@cache)
- AtomicFile.new(@cache).read
- .split(/\n/)
- .map { |t| Score.parse(t) }
- .select(&:valid?)
- .sort_by(&:value)
- .reverse
- else
- []
+ def load
+ @mutex.synchronize do
+ if File.exist?(@cache)
+ AtomicFile.new(@cache).read
+ .split(/\n/)
+ .map { |t| Score.parse(t) }
+ .select(&:valid?)
+ .reject(&:expired?)
+ .sort_by(&:value)
+ .reverse
+ else
+ []
+ end
end
end
end
end