lib/zold/node/farm.rb in zold-0.16.9 vs lib/zold/node/farm.rb in zold-0.16.10
- old
+ new
@@ -23,12 +23,12 @@
require 'time'
require 'open3'
require 'backtrace'
require 'futex'
require 'json'
+require 'zold/score'
require_relative '../log'
-require_relative '../score'
require_relative '../age'
require_relative '../verbose_thread'
require_relative 'farmers'
# The farm of scores.
@@ -43,28 +43,44 @@
def best
[]
end
end
+ # Makes an instance of a farm. There should be only farm in the entire
+ # application, but you can, of course, start as many of them as necessary for the
+ # purpose of unit testing.
+ #
+ # <tt>cache</tt> is the file where the farm will keep all the scores it
+ # manages to find. If the file is absent, it will be created, together with
+ # the necesary parent directories.
+ #
+ # <tt>lifetime</tt> is the amount of seconds for a score to live in the farm, by default
+ # it's the entire day, since the Score expires in 24 hours; can be decreased for the
+ # purpose of unit testing.
def initialize(invoice, cache = File.join(Dir.pwd, 'farm'), log: Log::Quiet.new,
- farmer: Farmers::Plain.new)
+ farmer: Farmers::Plain.new, lifetime: 24 * 60 * 60)
@log = log
- @cache = cache
+ @cache = File.expand_path(cache)
@invoice = invoice
@pipeline = Queue.new
@farmer = farmer
@threads = []
+ @lifetime = lifetime
end
+ # Returns the list of best scores the farm managed to find up to now. The
+ # list is NEVER empty, even if the farm has just started. If it's empty,
+ # it's definitely a bug. If the farm is just fresh start, the list will
+ # contain a single score with a zero value.
def best
load
end
def to_text
[
"Current time: #{Time.now.utc.iso8601}",
- "Ruby processes: #{`ps -a | grep zold | wc -l`}",
+ "Ruby processes: #{`ps ax | grep zold | wc -l`}",
JSON.pretty_generate(to_json),
@threads.map do |t|
trace = t.backtrace || []
[
"#{t.name}: status=#{t.status}; alive=#{t.alive?}",
@@ -85,14 +101,29 @@
best: best.map(&:to_mnemo).join(', '),
alive: @alive
}
end
+ # Starts a farm, all threads, and yields the block provided. You are
+ # supposed to use it only with the block:
+ #
+ # Farm.new.start('example.org', 4096) do |farm|
+ # score = farm.best[0]
+ # # Everything else...
+ # end
+ #
+ # The farm will stop all its threads and close all resources safely
+ # right after the block provided exists.
def start(host, port, strength: 8, threads: 8)
+ raise 'Block is required for the farm to start' unless block_given?
@log.info('Zero-threads farm won\'t score anything!') if threads.zero?
+ if best.empty?
+ @log.info("No scores found in cache at #{@cache}")
+ else
+ @log.info("#{best.size} scores pre-loaded from #{@cache}, the best is: #{best[0]}")
+ end
cleanup(host, port, strength, threads)
- @log.info("#{@pipeline.size} scores pre-loaded, the best is: #{best[0]}")
@alive = true
@threads = (1..threads).map do |t|
Thread.new do
Thread.current.abort_on_exception = true
Thread.current.name = "f#{t}"
@@ -106,13 +137,13 @@
end
@cleanup = Thread.new do
Thread.current.abort_on_exception = true
Thread.current.name = 'cleanup'
loop do
- max = 600
- a = (0..max).take_while do
- sleep 0.1
+ max = 100
+ a = (0..max - 1).take_while do
+ sleep 0.01
@alive
end
unless a.count == max
@log.info("It's time to stop the cleanup thread (#{a.count} != #{max}, alive=#{@alive})...")
break
@@ -129,11 +160,11 @@
ensure
@log.info("Terminating the farm with #{@threads.count} threads...")
start = Time.now
finish(@cleanup)
@threads.each { |t| finish(t) }
- @log.info("Farm stopped in #{Age.new(start)}")
+ @log.info("Farm stopped in #{Age.new(start)} (threads=#{threads}, strength=#{strength})")
end
end
private
@@ -142,11 +173,11 @@
@alive = false
@log.info("Attempting to terminate the thread \"#{thread.name}\" of the farm...")
loop do
delay = Time.now - start
if thread.join(0.1)
- @log.info("Thread \"#{thread.name}\" finished in #{Age.new(start)}")
+ @log.info("Thread \"#{thread.name}\" peacefully finished in #{Age.new(start)}")
break
end
if delay > 1
thread.exit
@log.error("Thread \"#{thread.name}\" forcefully terminated after #{Age.new(start)}")
@@ -155,20 +186,17 @@
end
def cleanup(host, port, strength, threads)
scores = load
before = scores.map(&:value).max.to_i
- save(threads, [Score.new(time: Time.now, host: host, port: port, invoice: @invoice, strength: strength)])
+ save(threads, [Score.new(host: host, port: port, invoice: @invoice, strength: strength)])
scores = load
- push(scores)
- after = scores.map(&:value).max.to_i
- @log.debug("#{Thread.current.name}: best score is #{scores[0]}") if before != after && !after.zero?
- end
-
- def push(scores)
free = scores.reject { |s| @threads.find { |t| t.name == s.to_mnemo } }
@pipeline << free[0] if @pipeline.size.zero? && !free.empty?
+ after = scores.map(&:value).max.to_i
+ return unless before != after && !after.zero?
+ @log.debug("#{Thread.current.name}: best score of #{scores.count} is #{scores[0]}")
end
def cycle(host, port, strength, threads)
s = []
loop do
@@ -187,18 +215,18 @@
return unless s.port == port
return unless s.strength >= strength
Thread.current.name = s.to_mnemo
Thread.current.thread_variable_set(:start, Time.now.utc.iso8601)
score = @farmer.up(s)
- @log.debug("New score discovered: #{score}")
+ @log.debug("New score discovered: #{score}") if strength > 4
save(threads, [score])
cleanup(host, port, strength, threads)
end
def save(threads, list = [])
scores = load + list
- period = 24 * 60 * 60 / [threads, 1].max
+ period = @lifetime / [threads, 1].max
Futex.new(@cache, log: @log).open do |f|
IO.write(
f,
scores.select(&:valid?)
.reject(&:expired?)
@@ -214,22 +242,20 @@
end
def load
Futex.new(@cache, log: @log).open do |f|
if File.exist?(f)
- IO.read(f).split(/\n/)
- .map { |t| parse_score_line(t) }
- .reject(&:zero?)
+ IO.read(f).split(/\n/).map do |t|
+ begin
+ Score.parse(t)
+ rescue StandardError => e
+ @log.error(Backtrace.new(e).to_s)
+ nil
+ end
+ end.compact
else
[]
end
end
- end
-
- def parse_score_line(line)
- Score.parse(line)
- rescue StandardError => e
- @log.error(Backtrace.new(e).to_s)
- Score::ZERO
end
end
end