lib/zold/node/farm.rb in zold-0.17.3 vs lib/zold/node/farm.rb in zold-0.17.4
- old
+ new
@@ -56,18 +56,19 @@
#
# <tt>lifetime</tt> is the amount of seconds for a score to live in the farm, by default
# it's the entire day, since the Score expires in 24 hours; can be decreased for the
# purpose of unit testing.
def initialize(invoice, cache = File.join(Dir.pwd, 'farm'), log: Log::NULL,
- farmer: Farmers::Plain.new, lifetime: 24 * 60 * 60)
+ farmer: Farmers::Plain.new, lifetime: 24 * 60 * 60, strength: Score::STRENGTH)
@log = log
@cache = File.expand_path(cache)
@invoice = invoice
@pipeline = Queue.new
@farmer = farmer
@threads = []
@lifetime = lifetime
+ @strength = strength
end
# Returns the list of best scores the farm managed to find up to now. The
# list is NEVER empty, even if the farm has just started. If it's empty,
# it's definitely a bug. If the farm is just fresh start, the list will
@@ -110,11 +111,11 @@
# # Everything else...
# end
#
# The farm will stop all its threads and close all resources safely
# right after the block provided exists.
- def start(host, port, strength: Score::STRENGTH, threads: Concurrent.processor_count)
+ def start(host, port, threads: Concurrent.processor_count)
raise 'Block is required for the farm to start' unless block_given?
@log.info('Zero-threads farm won\'t score anything!') if threads.zero?
if best.empty?
@log.info("No scores found in the cache at #{@cache}")
else
@@ -122,55 +123,55 @@
end
@threads = (1..threads).map do |t|
Thread.new do
Thread.current.thread_variable_set(:tid, t.to_s)
Endless.new("f#{t}", log: @log).run do
- cycle(host, port, strength, threads)
+ cycle(host, port, threads)
end
end
end
unless threads.zero?
ready = false
@threads << Thread.new do
Endless.new('cleanup', log: @log).run do
- cleanup(host, port, strength, threads)
+ cleanup(host, port, threads)
ready = true
sleep(1)
end
end
loop { break if ready }
end
if @threads.empty?
- cleanup(host, port, strength, threads)
+ cleanup(host, port, threads)
@log.info("Farm started with no threads (there will be no score) at #{host}:#{port}")
else
@log.info("Farm started with #{@threads.count} threads (one for cleanup) \
-at #{host}:#{port}, strength is #{strength}")
+at #{host}:#{port}, strength is #{@strength}")
end
begin
yield(self)
ensure
@threads.each(&:kill)
- @log.info("Farm stopped (threads=#{threads}, strength=#{strength})")
+ @log.info("Farm stopped (threads=#{threads}, strength=#{@strength})")
end
end
private
- def cleanup(host, port, strength, threads)
+ def cleanup(host, port, threads)
scores = load
before = scores.map(&:value).max.to_i
- save(threads, [Score.new(host: host, port: port, invoice: @invoice, strength: strength)])
+ save(threads, [Score.new(host: host, port: port, invoice: @invoice, strength: @strength)])
scores = load
free = scores.reject { |s| @threads.find { |t| t.name == s.to_mnemo } }
@pipeline << free[0] if @pipeline.size.zero? && !free.empty?
after = scores.map(&:value).max.to_i
return unless before != after && !after.zero?
@log.debug("#{Thread.current.name}: best score of #{scores.count} is #{scores[0]}")
end
- def cycle(host, port, strength, threads)
+ def cycle(host, port, threads)
s = []
loop do
begin
s << @pipeline.pop(true)
rescue ThreadError => _
@@ -181,26 +182,27 @@
end
s = s[0]
return unless s.valid?
return unless s.host == host
return unless s.port == port
- return unless s.strength >= strength
+ return unless s.strength >= @strength
Thread.current.name = s.to_mnemo
Thread.current.thread_variable_set(:start, Time.now.utc.iso8601)
score = @farmer.up(s)
- @log.debug("New score discovered: #{score}") if strength > 4
+ @log.debug("New score discovered: #{score}") if @strength > 4
save(threads, [score])
- cleanup(host, port, strength, threads)
+ cleanup(host, port, threads)
end
def save(threads, list = [])
scores = load + list
period = @lifetime / [threads, 1].max
Futex.new(@cache).open do |f|
IO.write(
f,
scores.select(&:valid?)
.reject(&:expired?)
+ .reject { |s| s.strength < @strength }
.sort_by(&:value)
.reverse
.uniq(&:time)
.uniq { |s| (s.age / period).round }
.map(&:to_s)