Sha256: b8cd426f9e41305fe0b448226b1367a0933d9fbdf27f3ea57fe9e1c5ea652013

Contents?: true

Size: 1.52 KB

Versions: 1

Compression:

Stored size: 1.52 KB

Contents

require "agent/errors"

module Agent
  class Queue
    class Buffered < Queue
      attr_reader :size, :max

      def initialize(type, max=1)
        raise Errors::InvalidQueueSize, "queue size must be at least 1" unless max >= 1
        super(type)
        @max = max
      end

      def buffered?;   true; end
      def unbuffered?; false;  end

      def push?; @max > @size; end
      def pop?;  @size > 0;    end

    protected

      def reset_custom_state
        @size = @queue.size
      end

      def process
        return if (pops.empty? && !push?) || (pushes.empty? && !pop?)

        operation = operations.first

        loop do
          if operation.is_a?(Push)
            if push?
              unless operation.closed?
                operation.receive do |obj|
                  @size += 1
                  queue.push(obj)
                end
              end
              operations.delete(operation)
              pushes.delete(operation)
            elsif pop? && operation = pops[0]
              next
            else
              break
            end
          else # Pop
            if pop?
              operation.send do
                @size -= 1
                queue.shift
              end
              operations.delete(operation)
              pops.delete(operation)
            elsif push? && operation = pushes[0]
              next
            else
              break
            end
          end

          operation = operations[0]
          break unless operation
        end
      end

    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
agent-0.12.0 lib/agent/queue/buffered.rb