Sha256: 4c8cfccc10f7f8d5a8c78e381b40e9f597defea05d75c25032438005889730e4

Contents?: true

Size: 971 Bytes

Versions: 10

Compression:

Stored size: 971 Bytes

Contents

module Fasten
  class TimeoutQueue
    def initialize
      @mutex = Mutex.new
      @queue = []
      @received = ConditionVariable.new
    end

    def push(object)
      @mutex.synchronize do
        @queue << object
        @received.signal
      end
    end

    def receive_with_timeout(timeout = nil) # rubocop:disable Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity
      @mutex.synchronize do
        if timeout.nil?
          # wait indefinitely until there is an element in the queue
          @received.wait(@mutex) while @queue.empty?
        elsif @queue.empty? && timeout != 0
          # wait for element or timeout
          timeout_time = timeout + Time.now.to_f
          while @queue.empty? && (remaining_time = timeout_time - Time.now.to_f).positive?
            @received.wait(@mutex, remaining_time)
          end
        end

        items = []
        items << @queue.shift until @queue.empty?

        items
      end
    end
  end
end

Version data entries

10 entries across 10 versions & 1 rubygems

Version Path
fasten-0.18.0 lib/fasten/timeout_queue.rb
fasten-0.16.0 lib/fasten/timeout_queue.rb
fasten-0.14.4 lib/fasten/timeout_queue.rb
fasten-0.14.2 lib/fasten/timeout_queue.rb
fasten-0.14.0 lib/fasten/timeout_queue.rb
fasten-0.12.8 lib/fasten/timeout_queue.rb
fasten-0.12.6 lib/fasten/timeout_queue.rb
fasten-0.12.4 lib/fasten/timeout_queue.rb
fasten-0.12.2 lib/fasten/timeout_queue.rb
fasten-0.12.0 lib/fasten/timeout_queue.rb