lib/zold/node/async_entrance.rb in zold-0.16.12 vs lib/zold/node/async_entrance.rb in zold-0.16.13
- old
+ new
@@ -20,123 +20,94 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
require 'concurrent'
require 'futex'
-require_relative '../log'
+require 'securerandom'
require_relative '../age'
require_relative '../size'
require_relative '../id'
-require_relative '../verbose_thread'
+require_relative '../endless'
require_relative '../dir_items'
# The async entrance of the web front.
# Author:: Yegor Bugayenko (yegor256@gmail.com)
# Copyright:: Copyright (c) 2018 Yegor Bugayenko
# License:: MIT
module Zold
# The entrance
class AsyncEntrance
- # How many threads to use for processing
- THREADS = [Concurrent.processor_count, 4].max
-
- # Queue length
- MAX_QUEUE = Concurrent.processor_count * 64
-
- def initialize(entrance, dir, log: Log::Quiet.new)
+ def initialize(entrance, dir, log: Log::Quiet.new, threads: [Concurrent.processor_count, 4].max)
@entrance = entrance
@dir = dir
@log = log
+ @total = threads
@mutex = Mutex.new
end
def to_json
- opts = queue
- json = {
- 'queue': opts.count,
- 'pool.length': @pool.length,
- 'pool.running': @pool.running?
- }
- json['queue_age'] = opts.empty? ? 0 : Time.now - File.mtime(File.join(@dir, opts[0]))
- @entrance.to_json.merge(json)
+ @entrance.to_json.merge(
+ 'queue': queue.count,
+ 'threads': @threads.count
+ )
end
def start
+ raise 'Block must be given to start()' unless block_given?
@entrance.start do
FileUtils.mkdir_p(@dir)
- @pool = Concurrent::FixedThreadPool.new(
- AsyncEntrance::THREADS,
- max_queue: AsyncEntrance::MAX_QUEUE,
- fallback_policy: :abort
- )
- AsyncEntrance::THREADS.times do |t|
- @pool.post do
- Thread.current.name = "async-e##{t}"
- loop do
- VerboseThread.new(@log).run(true) { take }
- break if @pool.shuttingdown?
- sleep(1 + Random.rand(100) / 100)
+ @threads = (0..@total - 1).map do |i|
+ Thread.start do
+ Endless.new("async-e##{i}", log: @log).run do
+ take
+ sleep(1)
end
end
end
begin
yield(self)
- cycle = 0
- until queue.empty?
- @log.info("Stopping async entrance, #{queue.count} still in the queue (cycle=#{cycle})...")
- cycle += 1
- raise "Can't wait for async entrance to stop for so long" if cycle > 10
- sleep 1
- end
ensure
- @log.info("Stopping async entrance, pool length is #{@pool.length}, queue length is #{@pool.queue_length}")
- @pool.shutdown
- if @pool.wait_for_termination(10)
- @log.info("Async entrance terminated peacefully with #{queue.count} wallets left in the queue")
- else
- @pool.kill
- @log.info("Async entrance was killed, #{queue.count} wallets left in the queue")
- end
+ @threads.each(&:kill)
end
end
end
# Always returns an array with a single ID of the pushed wallet
def push(id, body)
- raise "Queue is too long (#{queue.count} wallets), try again later" if queue.count > AsyncEntrance::MAX_QUEUE
+ raise "Queue is too long (#{queue.count} wallets), try again later" if queue.count > 256
start = Time.now
- Futex.new(file(id), log: @log).open do |f|
- IO.write(f, body)
+ loop do
+ uuid = SecureRandom.uuid
+ file = File.join(@dir, "#{id}-#{uuid}")
+ next if File.exist?(file)
+ IO.write(file, body)
+ @log.debug("Added #{id}/#{Size.new(body.length)} to the queue at pos.#{queue.count} \
+in #{Age.new(start, limit: 0.05)}: #{uuid}")
+ break
end
- @log.debug("Added #{id}/#{Size.new(body.length)} to the queue at pos.#{queue.count} \
-in #{Age.new(start, limit: 0.05)}")
[id]
end
private
def take
start = Time.now
- opts = queue
- return if opts.empty?
- id = opts[0]
- Thread.current.thread_variable_set(:wallet, id.to_s)
- body = Futex.new(file(id), log: @log).open do |f|
- b = File.exist?(f) ? IO.read(f) : ''
- FileUtils.rm_f(f)
- b
+ id, body = @mutex.synchronize do
+ opts = queue
+ return if opts.empty?
+ file = File.join(@dir, opts[0])
+ id = opts[0].split('-')[0]
+ Thread.current.thread_variable_set(:wallet, id)
+ body = IO.read(file)
+ FileUtils.rm_f(file)
+ [id, body]
end
- return if body.empty?
@entrance.push(Id.new(id), body)
@log.debug("Pushed #{id}/#{Size.new(body.length)} to #{@entrance.class.name} \
in #{Age.new(start, limit: 0.1)} (#{queue.count} still in the queue)")
end
def queue
- DirItems.new(@dir).fetch.select { |f| f =~ /^[0-9a-f]{16}$/ }
- end
-
- def file(id)
- File.join(@dir, id.to_s)
+ DirItems.new(@dir).fetch.select { |f| f =~ /^[0-9a-f]{16}-/ }
end
end
end