lib/zold/node/async_entrance.rb in zold-0.13.46 vs lib/zold/node/async_entrance.rb in zold-0.14.0
- old
+ new
@@ -18,33 +18,50 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
require 'concurrent'
require_relative '../log'
+require_relative '../id'
require_relative '../verbose_thread'
# The async entrance of the web front.
# Author:: Yegor Bugayenko (yegor256@gmail.com)
# Copyright:: Copyright (c) 2018 Yegor Bugayenko
# License:: MIT
module Zold
# The entrance
class AsyncEntrance
- def initialize(entrance, log: Log::Quiet.new)
+ THREADS = Concurrent.processor_count * 4
+
+ def initialize(entrance, dir, log: Log::Quiet.new)
raise 'Entrance can\'t be nil' if entrance.nil?
@entrance = entrance
+ raise 'Directory can\'t be nil' if dir.nil?
+ raise 'Directory must be of type String' unless dir.is_a?(String)
+ @dir = dir
raise 'Log can\'t be nil' if log.nil?
@log = log
+ @mutex = Mutex.new
end
def start
@entrance.start do
+ FileUtils.mkdir_p(@dir)
@pool = Concurrent::FixedThreadPool.new(
- Concurrent.processor_count * 8,
- max_queue: Concurrent.processor_count * 32,
- fallback_policy: :abort
+ AsyncEntrance::THREADS, max_queue: AsyncEntrance::THREADS, fallback_policy: :abort
)
+ AsyncEntrance::THREADS.times do
+ @pool.post do
+ VerboseThread.new(@log).run(true) do
+ loop do
+ take
+ break if @pool.shuttingdown?
+ sleep Random.rand(100) / 100
+ end
+ end
+ end
+ end
begin
yield(self)
ensure
@log.info("Stopping async entrance, pool length is #{@pool.length}, queue length is #{@pool.queue_length}")
@pool.shutdown
@@ -67,15 +84,37 @@
'pool.running': @pool.running?
)
end
def push(id, body)
- @pool.post do
- VerboseThread.new(@log).run(true) do
- @entrance.push(id, body)
+ @mutex.synchronize do
+ AtomicFile.new(File.join(@dir, id.to_s)).write(body)
+ end
+ end
+
+ private
+
+ def take
+ id = ''
+ body = ''
+ @mutex.synchronize do
+ opts = queue
+ unless opts.empty?
+ file = File.join(@dir, opts[0])
+ id = opts[0]
+ body = File.read(file)
+ File.delete(file)
end
end
- @log.debug("Pushed #{id}/#{body.length}b to #{@entrance.class.name}, \
-pool: #{@pool.length}/#{@pool.queue_length}")
+ return if id.empty? || body.empty?
+ start = Time.now
+ @entrance.push(Id.new(id), body)
+ @log.debug("Pushed #{id}/#{body.length}b to #{@entrance.class.name} in #{(Time.now - start).round}s")
+ end
+
+ def queue
+ Dir.new(@dir)
+ .select { |f| f =~ /^[0-9a-f]{16}$/ }
+ .sort_by { |f| File.mtime(File.join(@dir, f)) }
end
end
end