Sha256: 4e630a5de18c33a3afccecb182c17f1ae3baeda57a80e0428277225917b16569

Contents?: true

Size: 987 Bytes

Versions: 10

Compression:

Stored size: 987 Bytes

Contents

module Fasten
  class TimeoutQueue
    def initialize
      @mutex = Mutex.new
      @queue = []
      @received = ConditionVariable.new
    end

    def push(object)
      @mutex.synchronize do
        @queue << object
        @received.signal
      end
    end

    def receive_with_timeout(timeout = nil) # rubocop:disable Metrics/AbcSize,Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity
      @mutex.synchronize do
        if timeout.nil?
          # wait indefinitely until there is an element in the queue
          @received.wait(@mutex) while @queue.empty?
        elsif @queue.empty? && timeout != 0
          # wait for element or timeout
          timeout_time = timeout + Time.now.to_f
          while @queue.empty? && (remaining_time = timeout_time - Time.now.to_f).positive?
            @received.wait(@mutex, remaining_time)
          end
        end

        items = []
        items << @queue.shift until @queue.empty?

        items
      end
    end
  end
end

Version data entries

10 entries across 10 versions & 1 rubygems

Version Path
fasten-0.10.0 lib/fasten/timeout_queue.rb
fasten-0.8.8 lib/fasten/timeout_queue.rb
fasten-0.8.6 lib/fasten/timeout_queue.rb
fasten-0.8.4 lib/fasten/timeout_queue.rb
fasten-0.8.0 lib/fasten/timeout_queue.rb
fasten-0.7.6 lib/fasten/timeout_queue.rb
fasten-0.7.4 lib/fasten/timeout_queue.rb
fasten-0.7.2 lib/fasten/timeout_queue.rb
fasten-0.7.0 lib/fasten/timeout_queue.rb
fasten-0.6.0 lib/fasten/timeout_queue.rb