lib/zold/node/farm.rb in zold-0.16.12 vs lib/zold/node/farm.rb in zold-0.16.13
- old
+ new
@@ -22,15 +22,16 @@
require 'time'
require 'open3'
require 'backtrace'
require 'futex'
+require 'concurrent'
require 'json'
require 'zold/score'
require_relative '../log'
require_relative '../age'
-require_relative '../verbose_thread'
+require_relative '../endless'
require_relative 'farmers'
# The farm of scores.
# Author:: Yegor Bugayenko (yegor256@gmail.com)
# Copyright:: Copyright (c) 2018 Yegor Bugayenko
@@ -94,11 +95,10 @@
def to_json
{
threads: @threads.map do |t|
"#{t.name}/#{t.status}/#{t.alive? ? 'alive' : 'dead'}"
end.join(', '),
- cleanup: @cleanup.status,
pipeline: @pipeline.size,
best: best.map(&:to_mnemo).join(', ')
}
end
@@ -110,45 +110,40 @@
# # Everything else...
# end
#
# The farm will stop all its threads and close all resources safely
# right after the block provided exists.
- def start(host, port, strength: 8, threads: 8)
+ def start(host, port, strength: Score::STRENGTH, threads: Concurrent.processor_count)
raise 'Block is required for the farm to start' unless block_given?
@log.info('Zero-threads farm won\'t score anything!') if threads.zero?
if best.empty?
- @log.info("No scores found in cache at #{@cache}")
+ @log.info("No scores found in the cache at #{@cache}")
else
@log.info("#{best.size} scores pre-loaded from #{@cache}, the best is: #{best[0]}")
end
- cleanup(host, port, strength, threads)
@threads = (1..threads).map do |t|
+ Thread.current.thread_variable_set(:tid, t.to_s)
Thread.new do
- Thread.current.abort_on_exception = true
- Thread.current.name = "f#{t}"
- loop do
- VerboseThread.new(@log).run do
- cycle(host, port, strength, threads)
- end
+ Endless.new("f#{t}", log: @log).run do
+ cycle(host, port, strength, threads)
end
end
end
- @cleanup = Thread.new do
- Thread.current.abort_on_exception = true
- Thread.current.name = 'cleanup'
- loop do
- VerboseThread.new(@log).run(true) do
- cleanup(host, port, strength, threads)
- end
+ ready = false
+ @threads << Thread.new do
+ Endless.new('cleanup', log: @log).run do
+ cleanup(host, port, strength, threads)
+ ready = true
+ sleep(1)
end
end
- @log.info("Farm started with #{@threads.count} threads at #{host}:#{port}, strength is #{strength}")
- return unless block_given?
+ @log.info("Farm started with #{@threads.count} threads (one for cleanup) \
+at #{host}:#{port}, strength is #{strength}")
+ loop { break if ready }
begin
yield(self)
ensure
- @cleanup.kill
@threads.each(&:kill)
@log.info("Farm stopped (threads=#{threads}, strength=#{strength})")
end
end
@@ -170,11 +165,11 @@
s = []
loop do
begin
s << @pipeline.pop(true)
rescue ThreadError => _
- sleep 0.25
+ sleep(0.25)
end
s.compact!
break unless s.empty?
end
s = s[0]
@@ -191,11 +186,11 @@
end
def save(threads, list = [])
scores = load + list
period = @lifetime / [threads, 1].max
- Futex.new(@cache, log: @log).open do |f|
+ Futex.new(@cache).open do |f|
IO.write(
f,
scores.select(&:valid?)
.reject(&:expired?)
.sort_by(&:value)
@@ -208,22 +203,19 @@
)
end
end
def load
- Futex.new(@cache, log: @log).open do |f|
- if File.exist?(f)
- IO.read(f).split(/\n/).map do |t|
- begin
- Score.parse(t)
- rescue StandardError => e
- @log.error(Backtrace.new(e).to_s)
- nil
- end
- end.compact
- else
- []
- end
+ return [] unless File.exist?(@cache)
+ Futex.new(@cache).open do |f|
+ IO.read(f).split(/\n/).map do |t|
+ begin
+ Score.parse(t)
+ rescue StandardError => e
+ @log.error(Backtrace.new(e).to_s)
+ nil
+ end
+ end.compact
end
end
end
end