Sha256: d1d1cb402bd5c637060557370139f5289d7d11054ce43382bbc06f1aa87c420f
Contents?: true
Size: 1.9 KB
Versions: 1
Compression:
Stored size: 1.9 KB
Contents
require 'async' module Async class ResourcePool VERSION = '0.1.0' class Error < StandardError end class DoesNotOwnError < Error def initialize super('current fiber does not own this resource') end end class AlreadyOwnError < Error def initialize super('current fiber already own this resource') end end class TimeoutError < Error def initialize(timeout) super("timeout #{timeout} seconds was elapsed") end end def initialize(limit) @limit = limit @owners = [] @waiters = [] end def acquire(timeout = nil) raise AlreadyOwnError.new if already_acquired? unless can_be_acquired? timeout.nil? ? wait : wait_with_timeout(timeout) end @owners.push(Fiber.current) if block_given? begin yield ensure release end end nil end def release raise DoesNotOwnError.new unless already_acquired? @owners.delete(Fiber.current) wakeup end def already_acquired? @owners.include?(Fiber.current) end def can_be_acquired? @owners.size < @limit end def info { waiters: @waiters.size, owners: @owners.size, limit: @limit } end private def wakeup return if @waiters.empty? fiber = @waiters.shift fiber.resume if fiber.alive? end def wait @waiters.push(Fiber.current) Async::Task.yield end def wait_with_timeout(timeout) fiber = Fiber.current @waiters.push(Fiber.current) Async::Task.current.with_timeout(timeout) do |timer| begin Async::Task.yield timer.cancel rescue Async::TimeoutError => _ @waiters.delete(fiber) raise TimeoutError.new(timeout) end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
async_resource_pool-0.1.0 | lib/async/resource_pool.rb |