Sha256: 4c8cfccc10f7f8d5a8c78e381b40e9f597defea05d75c25032438005889730e4
Contents?: true
Size: 971 Bytes
Versions: 10
Compression:
Stored size: 971 Bytes
Contents
module Fasten class TimeoutQueue def initialize @mutex = Mutex.new @queue = [] @received = ConditionVariable.new end def push(object) @mutex.synchronize do @queue << object @received.signal end end def receive_with_timeout(timeout = nil) # rubocop:disable Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity @mutex.synchronize do if timeout.nil? # wait indefinitely until there is an element in the queue @received.wait(@mutex) while @queue.empty? elsif @queue.empty? && timeout != 0 # wait for element or timeout timeout_time = timeout + Time.now.to_f while @queue.empty? && (remaining_time = timeout_time - Time.now.to_f).positive? @received.wait(@mutex, remaining_time) end end items = [] items << @queue.shift until @queue.empty? items end end end end
Version data entries
10 entries across 10 versions & 1 rubygems