lib/zold/node/async_entrance.rb in zold-0.16.12 vs lib/zold/node/async_entrance.rb in zold-0.16.13

- old
+ new

@@ -20,123 +20,94 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. require 'concurrent' require 'futex' -require_relative '../log' +require 'securerandom' require_relative '../age' require_relative '../size' require_relative '../id' -require_relative '../verbose_thread' +require_relative '../endless' require_relative '../dir_items' # The async entrance of the web front. # Author:: Yegor Bugayenko (yegor256@gmail.com) # Copyright:: Copyright (c) 2018 Yegor Bugayenko # License:: MIT module Zold # The entrance class AsyncEntrance - # How many threads to use for processing - THREADS = [Concurrent.processor_count, 4].max - - # Queue length - MAX_QUEUE = Concurrent.processor_count * 64 - - def initialize(entrance, dir, log: Log::Quiet.new) + def initialize(entrance, dir, log: Log::Quiet.new, threads: [Concurrent.processor_count, 4].max) @entrance = entrance @dir = dir @log = log + @total = threads @mutex = Mutex.new end def to_json - opts = queue - json = { - 'queue': opts.count, - 'pool.length': @pool.length, - 'pool.running': @pool.running? - } - json['queue_age'] = opts.empty? ? 0 : Time.now - File.mtime(File.join(@dir, opts[0])) - @entrance.to_json.merge(json) + @entrance.to_json.merge( + 'queue': queue.count, + 'threads': @threads.count + ) end def start + raise 'Block must be given to start()' unless block_given? @entrance.start do FileUtils.mkdir_p(@dir) - @pool = Concurrent::FixedThreadPool.new( - AsyncEntrance::THREADS, - max_queue: AsyncEntrance::MAX_QUEUE, - fallback_policy: :abort - ) - AsyncEntrance::THREADS.times do |t| - @pool.post do - Thread.current.name = "async-e##{t}" - loop do - VerboseThread.new(@log).run(true) { take } - break if @pool.shuttingdown? - sleep(1 + Random.rand(100) / 100) + @threads = (0..@total - 1).map do |i| + Thread.start do + Endless.new("async-e##{i}", log: @log).run do + take + sleep(1) end end end begin yield(self) - cycle = 0 - until queue.empty? - @log.info("Stopping async entrance, #{queue.count} still in the queue (cycle=#{cycle})...") - cycle += 1 - raise "Can't wait for async entrance to stop for so long" if cycle > 10 - sleep 1 - end ensure - @log.info("Stopping async entrance, pool length is #{@pool.length}, queue length is #{@pool.queue_length}") - @pool.shutdown - if @pool.wait_for_termination(10) - @log.info("Async entrance terminated peacefully with #{queue.count} wallets left in the queue") - else - @pool.kill - @log.info("Async entrance was killed, #{queue.count} wallets left in the queue") - end + @threads.each(&:kill) end end end # Always returns an array with a single ID of the pushed wallet def push(id, body) - raise "Queue is too long (#{queue.count} wallets), try again later" if queue.count > AsyncEntrance::MAX_QUEUE + raise "Queue is too long (#{queue.count} wallets), try again later" if queue.count > 256 start = Time.now - Futex.new(file(id), log: @log).open do |f| - IO.write(f, body) + loop do + uuid = SecureRandom.uuid + file = File.join(@dir, "#{id}-#{uuid}") + next if File.exist?(file) + IO.write(file, body) + @log.debug("Added #{id}/#{Size.new(body.length)} to the queue at pos.#{queue.count} \ +in #{Age.new(start, limit: 0.05)}: #{uuid}") + break end - @log.debug("Added #{id}/#{Size.new(body.length)} to the queue at pos.#{queue.count} \ -in #{Age.new(start, limit: 0.05)}") [id] end private def take start = Time.now - opts = queue - return if opts.empty? - id = opts[0] - Thread.current.thread_variable_set(:wallet, id.to_s) - body = Futex.new(file(id), log: @log).open do |f| - b = File.exist?(f) ? IO.read(f) : '' - FileUtils.rm_f(f) - b + id, body = @mutex.synchronize do + opts = queue + return if opts.empty? + file = File.join(@dir, opts[0]) + id = opts[0].split('-')[0] + Thread.current.thread_variable_set(:wallet, id) + body = IO.read(file) + FileUtils.rm_f(file) + [id, body] end - return if body.empty? @entrance.push(Id.new(id), body) @log.debug("Pushed #{id}/#{Size.new(body.length)} to #{@entrance.class.name} \ in #{Age.new(start, limit: 0.1)} (#{queue.count} still in the queue)") end def queue - DirItems.new(@dir).fetch.select { |f| f =~ /^[0-9a-f]{16}$/ } - end - - def file(id) - File.join(@dir, id.to_s) + DirItems.new(@dir).fetch.select { |f| f =~ /^[0-9a-f]{16}-/ } end end end