Sha256: b740fcaf61cff603fa5ab20d1e9e2a0a5745b0c5c0e8cd91aedab2df96abb1f4

Contents?: true

Size: 807 Bytes

Versions: 3

Compression:

Stored size: 807 Bytes

Contents

# https://vaneyckt.io/posts/ruby_concurrency_building_a_timeout_queue/
class TimeoutQueue
  def initialize
    @elems = []
    @mutex = Mutex.new
    @cond_var = ConditionVariable.new
  end

  def <<(elem)
    @mutex.synchronize do
      @elems << elem
      @cond_var.signal
    end
  end

  def pop(blocking = true, timeout = nil)
    @mutex.synchronize do
      if blocking
        if timeout.nil?
          while @elems.empty?
            @cond_var.wait(@mutex)
          end
        else
          timeout_time = Time.now.to_f + timeout
          while @elems.empty? && (remaining_time = timeout_time - Time.now.to_f) > 0
            @cond_var.wait(@mutex, remaining_time)
          end
        end
      end
      raise ThreadError, 'queue empty' if @elems.empty?
      @elems.shift
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
substrate_client.rb-0.1.7 lib/timeout_queue.rb
substrate_client.rb-0.1.6 lib/timeout_queue.rb
substrate_client.rb-0.1.5 lib/timeout_queue.rb