# frozen_string_literal: true # Copyright (c) 2018-2019 Zerocracy, Inc. # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the 'Software'), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. require 'time' require 'open3' require 'backtrace' require 'futex' require 'concurrent' require 'json' require 'zold/score' require_relative '../log' require_relative '../thread_pool' require_relative '../age' require_relative '../endless' require_relative 'farmers' # The farm of scores. # Author:: Yegor Bugayenko (yegor256@gmail.com) # Copyright:: Copyright (c) 2018 Yegor Bugayenko # License:: MIT module Zold # Farm class Farm # Empty farm class Empty def best [] end end # Makes an instance of a farm. There should be only farm in the entire # application, but you can, of course, start as many of them as necessary for the # purpose of unit testing. # # cache is the file where the farm will keep all the scores it # manages to find. If the file is absent, it will be created, together with # the necessary parent directories. # # lifetime is the amount of seconds for a score to live in the farm, by default # it's the entire day, since the Score expires in 24 hours; can be decreased for the # purpose of unit testing. def initialize(invoice, cache = File.join(Dir.pwd, 'farm'), log: Log::NULL, farmer: Farmers::Plain.new, lifetime: 24 * 60 * 60, strength: Score::STRENGTH) @log = log @cache = File.expand_path(cache) @invoice = invoice @pipeline = Queue.new @farmer = farmer @threads = ThreadPool.new('farm', log: log) @lifetime = lifetime @strength = strength end # Returns the list of best scores the farm managed to find up to now. The # list is NEVER empty, even if the farm has just started. If it's empty, # it's definitely a bug. If the farm is just fresh start, the list will # contain a single score with a zero value. def best load end def to_text [ "Current time: #{Time.now.utc.iso8601}", "Ruby processes: #{`ps ax | grep zold | wc -l`}", JSON.pretty_generate(to_json), @threads.to_s ].flatten.join("\n\n") end # Renders the Farm into JSON to show for the end-user in front.rb. def to_json { threads: @threads.to_json, pipeline: @pipeline.size, best: best.map(&:to_mnemo).join(', '), farmer: @farmer.class.name } end # Starts a farm, all threads, and yields the block provided. You are # supposed to use it only with the block: # # Farm.new.start('example.org', 4096) do |farm| # score = farm.best[0] # # Everything else... # end # # The farm will stop all its threads and close all resources safely # right after the block provided exists. def start(host, port, threads: Concurrent.processor_count) raise 'Block is required for the farm to start' unless block_given? @log.info('Zero-threads farm won\'t score anything!') if threads.zero? if best.empty? @log.info("No scores found in the cache at #{@cache}") else @log.info("#{best.size} scores pre-loaded from #{@cache}, the best is: #{best[0]}") end (1..threads).map do |t| @threads.add do Thread.current.thread_variable_set(:tid, t.to_s) Endless.new("f#{t}", log: @log).run do cycle(host, port, threads) end end end unless threads.zero? ready = false @threads.add do Endless.new('cleanup', log: @log).run do cleanup(host, port, threads) ready = true sleep(1) end end loop { break if ready } end if threads.zero? cleanup(host, port, threads) @log.info("Farm started with no threads (there will be no score) at #{host}:#{port}") else @log.info("Farm started with #{@threads.count} threads (one for cleanup) \ at #{host}:#{port}, strength is #{@strength}") end begin yield(self) ensure @threads.kill end end private def cleanup(host, port, threads) scores = load before = scores.map(&:value).max.to_i save(threads, [Score.new(host: host, port: port, invoice: @invoice, strength: @strength)]) scores = load free = scores.reject { |s| @threads.exists?(s.to_mnemo) } @pipeline << free[0] if @pipeline.size.zero? && !free.empty? after = scores.map(&:value).max.to_i return unless before != after && !after.zero? @log.debug("#{Thread.current.name}: best score of #{scores.count} is #{scores[0].reduced(4)}") end def cycle(host, port, threads) s = [] loop do begin s << @pipeline.pop(true) rescue ThreadError => _ sleep(0.25) end s.compact! break unless s.empty? end s = s[0] return unless s.valid? return unless s.host == host return unless s.port == port return unless s.strength >= @strength Thread.current.name = s.to_mnemo Thread.current.thread_variable_set(:start, Time.now.utc.iso8601) score = @farmer.up(s) @log.debug("New score discovered: #{score}") if @strength > 4 save(threads, [score]) cleanup(host, port, threads) end def save(threads, list = []) scores = load + list period = @lifetime / [threads, 1].max body = scores.select(&:valid?) .reject(&:expired?) .reject { |s| s.strength < @strength } .sort_by(&:value) .reverse .uniq(&:time) .uniq { |s| (s.age / period).round } .map(&:to_s) .uniq .join("\n") Futex.new(@cache).open { |f| IO.write(f, body) } end def load return [] unless File.exist?(@cache) Futex.new(@cache).open(false) { |f| IO.readlines(f, "\n") }.reject(&:empty?).map do |t| Score.parse(t) rescue StandardError => e @log.error(Backtrace.new(e).to_s) nil end.compact end end end