Sha256: 0282b75780f570344b6a23284610b484ebb31db4e5cd1e3a36980b89d0d4ba4f

Contents?: true

Size: 1.46 KB

Versions: 3

Compression:

Stored size: 1.46 KB

Contents

require "agent/errors"

module Agent
  class Queue
    class Buffered < Queue
      attr_reader :size, :max

      def initialize(type, max=1)
        raise Errors::InvalidQueueSize, "queue size must be at least 1" unless max >= 1
        super(type)
        @max = max
      end

      def buffered?;   true; end
      def unbuffered?; false;  end

      def push?; @max > @size; end
      def pop?;  @size > 0;    end

    protected

      def reset_custom_state
        @size = @queue.size
      end

      def process
        return if (pops.empty? && !push?) || (pushes.empty? && !pop?)

        operation = operations.first

        loop do
          if operation.is_a?(Push)
            if push?
              operation.receive do |obj|
                @size += 1
                queue.push(obj)
              end
              operations.delete(operation)
              pushes.delete(operation)
            elsif pop? && operation = pops[0]
              next
            else
              break
            end
          else # Pop
            if pop?
              operation.send do
                @size -= 1
                queue.shift
              end
              operations.delete(operation)
              pops.delete(operation)
            elsif push? && operation = pushes[0]
              next
            else
              break
            end
          end

          operation = operations[0]
          break unless operation
        end
      end

    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
agent-0.11.0 lib/agent/queue/buffered.rb
agent-0.10.0 lib/agent/queue/buffered.rb
agent-0.9.1 lib/agent/queue/buffered.rb