Sha256: b8cd426f9e41305fe0b448226b1367a0933d9fbdf27f3ea57fe9e1c5ea652013
Contents?: true
Size: 1.52 KB
Versions: 1
Compression:
Stored size: 1.52 KB
Contents
require "agent/errors" module Agent class Queue class Buffered < Queue attr_reader :size, :max def initialize(type, max=1) raise Errors::InvalidQueueSize, "queue size must be at least 1" unless max >= 1 super(type) @max = max end def buffered?; true; end def unbuffered?; false; end def push?; @max > @size; end def pop?; @size > 0; end protected def reset_custom_state @size = @queue.size end def process return if (pops.empty? && !push?) || (pushes.empty? && !pop?) operation = operations.first loop do if operation.is_a?(Push) if push? unless operation.closed? operation.receive do |obj| @size += 1 queue.push(obj) end end operations.delete(operation) pushes.delete(operation) elsif pop? && operation = pops[0] next else break end else # Pop if pop? operation.send do @size -= 1 queue.shift end operations.delete(operation) pops.delete(operation) elsif push? && operation = pushes[0] next else break end end operation = operations[0] break unless operation end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
agent-0.12.0 | lib/agent/queue/buffered.rb |