Sha256: af0a131fdd96d5a5849c15f0bd7c633ae65f9b3a8b39114af83ca4a735670640

Contents?: true

Size: 1.6 KB

Versions: 30

Compression:

Stored size: 1.6 KB

Contents

# frozen_string_literal: true

module Karafka
  module Processing
    # Minimal queue with timeout for Ruby 3.1 and lower.
    #
    # It is needed because only since 3.2, Ruby has a timeout on `#pop`
    class TimedQueue
      include Karafka::Core::Helpers::Time

      def initialize
        @queue = Queue.new
        @mutex = Thread::Mutex.new
        @resource = Thread::ConditionVariable.new
      end

      # Adds element to the queue
      #
      # @param obj [Object] pushes an element onto the queue
      def push(obj)
        @mutex.synchronize do
          @queue << obj
          @resource.broadcast
        end
      end

      alias << push

      # No timeout means waiting up to 31 years
      #
      # @param timeout [Integer] max number of seconds to wait on the pop
      # @return [Object] element inserted on the array or `nil` on timeout
      #
      # @note We use timeout in seconds because this is how Ruby 3.2+ works and we want to have
      #   the same API for newer and older Ruby versions
      def pop(timeout: 10_000_000_000)
        deadline = monotonic_now + timeout * 1000

        @mutex.synchronize do
          loop do
            return @queue.pop unless @queue.empty?
            return @queue.pop if @queue.closed?

            to_wait = (deadline - monotonic_now) / 1_000.0

            return nil if to_wait <= 0

            @resource.wait(@mutex, to_wait)
          end
        end
      end

      # Closes the internal queue and releases the lock
      def close
        @mutex.synchronize do
          @queue.close
          @resource.broadcast
        end
      end
    end
  end
end

Version data entries

30 entries across 30 versions & 1 rubygems

Version Path
karafka-2.4.17 lib/karafka/processing/timed_queue.rb
karafka-2.4.16 lib/karafka/processing/timed_queue.rb
karafka-2.4.15 lib/karafka/processing/timed_queue.rb
karafka-2.4.14 lib/karafka/processing/timed_queue.rb
karafka-2.4.13 lib/karafka/processing/timed_queue.rb
karafka-2.4.12 lib/karafka/processing/timed_queue.rb
karafka-2.4.11 lib/karafka/processing/timed_queue.rb
karafka-2.4.10 lib/karafka/processing/timed_queue.rb
karafka-2.4.9 lib/karafka/processing/timed_queue.rb
karafka-2.4.8 lib/karafka/processing/timed_queue.rb
karafka-2.4.7 lib/karafka/processing/timed_queue.rb
karafka-2.4.6 lib/karafka/processing/timed_queue.rb
karafka-2.4.5 lib/karafka/processing/timed_queue.rb
karafka-2.4.4 lib/karafka/processing/timed_queue.rb
karafka-2.4.3 lib/karafka/processing/timed_queue.rb
karafka-2.4.0 lib/karafka/processing/timed_queue.rb
karafka-2.4.0.rc1 lib/karafka/processing/timed_queue.rb
karafka-2.3.4 lib/karafka/processing/timed_queue.rb
karafka-2.4.0.beta2 lib/karafka/processing/timed_queue.rb
karafka-2.4.0.beta1 lib/karafka/processing/timed_queue.rb