Sha256: b1d60b1940360ad3736d1cfa6f94fc506bf96bdb6f795c1f526e52a709c09c5c

Contents?: true

Size: 1.5 KB

Versions: 1

Compression:

Stored size: 1.5 KB

Contents

require "thread"
require "concurrent"

module LogStash
  # Minimal subset implement of a SizedQueue supporting
  # a timeout option on the lock.
  #
  # This will be part of the main Logstash's sized queue
  class SizedQueueTimeout
    class TimeoutError < StandardError; end

    DEFAULT_TIMEOUT = 2 # in seconds

    def initialize(max_size, options = {})
      # `concurrent-ruby` are deprecating the `Condition`
      # in favor of a Synchonization class that you need to implement.
      # this was bit overkill to only check if the wait did a timeout.
      @condition_in = ConditionVariable.new
      @condition_out = ConditionVariable.new

      @max_size = max_size
      @queue = []
      @mutex = Mutex.new
    end

    def push(obj, timeout = DEFAULT_TIMEOUT)
      @mutex.synchronize do
        while full? # wake up check
          start_time = Concurrent.monotonic_time
          @condition_out.wait(@mutex, timeout) 
          if start_time + timeout - Concurrent.monotonic_time  < 0
            raise TimeoutError
          end
        end

        @queue << obj
        @condition_in.signal

        return obj
      end
    end
    alias_method :<<, :push

    def size
      @mutex.synchronize { @queue.size }
    end

    def pop_no_timeout
      @mutex.synchronize do
        @condition_in.wait(@mutex) while @queue.empty? # Wake up check

        obj = @queue.shift
        @condition_out.signal

        return obj
      end
    end

    private
    def full?
      @queue.size == @max_size
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
logstash-input-lumberjack-1.0.3 lib/logstash/sized_queue_timeout.rb