lib/zold/node/farm.rb in zold-0.18.6 vs lib/zold/node/farm.rb in zold-0.18.7
- old
+ new
@@ -26,10 +26,11 @@
require 'futex'
require 'concurrent'
require 'json'
require 'zold/score'
require_relative '../log'
+require_relative '../thread_pool'
require_relative '../age'
require_relative '../endless'
require_relative 'farmers'
# The farm of scores.
@@ -62,11 +63,11 @@
@log = log
@cache = File.expand_path(cache)
@invoice = invoice
@pipeline = Queue.new
@farmer = farmer
- @threads = []
+ @threads = ThreadPool.new('farm')
@lifetime = lifetime
@strength = strength
end
# Returns the list of best scores the farm managed to find up to now. The
@@ -80,25 +81,18 @@
def to_text
[
"Current time: #{Time.now.utc.iso8601}",
"Ruby processes: #{`ps ax | grep zold | wc -l`}",
JSON.pretty_generate(to_json),
- @threads.map do |t|
- trace = t.backtrace || []
- [
- "#{t.name}: status=#{t.status}; alive=#{t.alive?}",
- 'Vars: ' + t.thread_variables.map { |v| "#{v}=\"#{t.thread_variable_get(v)}\"" }.join('; '),
- " #{trace.join("\n ")}"
- ].join("\n")
- end
+ @threads.to_s
].flatten.join("\n\n")
end
# Renders the Farm into JSON to show for the end-user in front.rb.
def to_json
{
- threads: @threads.map { |t| "#{t.name}/#{t.status}/#{t.alive? ? 'alive' : 'dead'}" }.join(', '),
+ threads: @threads.to_json,
pipeline: @pipeline.size,
best: best.map(&:to_mnemo).join(', '),
farmer: @farmer.class.name
}
end
@@ -119,57 +113,54 @@
if best.empty?
@log.info("No scores found in the cache at #{@cache}")
else
@log.info("#{best.size} scores pre-loaded from #{@cache}, the best is: #{best[0]}")
end
- @threads = (1..threads).map do |t|
- Thread.start do
+ (1..threads).map do |t|
+ @threads.add do
Thread.current.thread_variable_set(:tid, t.to_s)
Endless.new("f#{t}", log: @log).run do
cycle(host, port, threads)
end
end
end
unless threads.zero?
ready = false
- @threads << Thread.start do
+ @threads.add do
Endless.new('cleanup', log: @log).run do
cleanup(host, port, threads)
ready = true
sleep(1)
end
end
loop { break if ready }
end
- if @threads.empty?
+ if threads.zero?
cleanup(host, port, threads)
@log.info("Farm started with no threads (there will be no score) at #{host}:#{port}")
else
@log.info("Farm started with #{@threads.count} threads (one for cleanup) \
at #{host}:#{port}, strength is #{@strength}")
end
begin
yield(self)
ensure
- @log.info("Farm stopping with #{threads} threads...")
- @threads.each(&:kill)
- @threads.each(&:join)
- @log.info("Farm stopped, #{threads} threads killed")
+ @threads.kill
end
end
private
def cleanup(host, port, threads)
scores = load
before = scores.map(&:value).max.to_i
save(threads, [Score.new(host: host, port: port, invoice: @invoice, strength: @strength)])
scores = load
- free = scores.reject { |s| @threads.find { |t| t.name == s.to_mnemo } }
+ free = scores.reject { |s| @threads.exists?(s.to_mnemo) }
@pipeline << free[0] if @pipeline.size.zero? && !free.empty?
after = scores.map(&:value).max.to_i
return unless before != after && !after.zero?
- @log.debug("#{Thread.current.name}: best score of #{scores.count} is #{scores[0]}")
+ @log.debug("#{Thread.current.name}: best score of #{scores.count} is #{scores[0].reduced(4)}")
end
def cycle(host, port, threads)
s = []
loop do