lib/zold/node/async_entrance.rb in zold-0.13.46 vs lib/zold/node/async_entrance.rb in zold-0.14.0

- old
+ new

@@ -18,33 +18,50 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. require 'concurrent' require_relative '../log' +require_relative '../id' require_relative '../verbose_thread' # The async entrance of the web front. # Author:: Yegor Bugayenko (yegor256@gmail.com) # Copyright:: Copyright (c) 2018 Yegor Bugayenko # License:: MIT module Zold # The entrance class AsyncEntrance - def initialize(entrance, log: Log::Quiet.new) + THREADS = Concurrent.processor_count * 4 + + def initialize(entrance, dir, log: Log::Quiet.new) raise 'Entrance can\'t be nil' if entrance.nil? @entrance = entrance + raise 'Directory can\'t be nil' if dir.nil? + raise 'Directory must be of type String' unless dir.is_a?(String) + @dir = dir raise 'Log can\'t be nil' if log.nil? @log = log + @mutex = Mutex.new end def start @entrance.start do + FileUtils.mkdir_p(@dir) @pool = Concurrent::FixedThreadPool.new( - Concurrent.processor_count * 8, - max_queue: Concurrent.processor_count * 32, - fallback_policy: :abort + AsyncEntrance::THREADS, max_queue: AsyncEntrance::THREADS, fallback_policy: :abort ) + AsyncEntrance::THREADS.times do + @pool.post do + VerboseThread.new(@log).run(true) do + loop do + take + break if @pool.shuttingdown? + sleep Random.rand(100) / 100 + end + end + end + end begin yield(self) ensure @log.info("Stopping async entrance, pool length is #{@pool.length}, queue length is #{@pool.queue_length}") @pool.shutdown @@ -67,15 +84,37 @@ 'pool.running': @pool.running? ) end def push(id, body) - @pool.post do - VerboseThread.new(@log).run(true) do - @entrance.push(id, body) + @mutex.synchronize do + AtomicFile.new(File.join(@dir, id.to_s)).write(body) + end + end + + private + + def take + id = '' + body = '' + @mutex.synchronize do + opts = queue + unless opts.empty? + file = File.join(@dir, opts[0]) + id = opts[0] + body = File.read(file) + File.delete(file) end end - @log.debug("Pushed #{id}/#{body.length}b to #{@entrance.class.name}, \ -pool: #{@pool.length}/#{@pool.queue_length}") + return if id.empty? || body.empty? + start = Time.now + @entrance.push(Id.new(id), body) + @log.debug("Pushed #{id}/#{body.length}b to #{@entrance.class.name} in #{(Time.now - start).round}s") + end + + def queue + Dir.new(@dir) + .select { |f| f =~ /^[0-9a-f]{16}$/ } + .sort_by { |f| File.mtime(File.join(@dir, f)) } end end end