Sha256: d4cada097095b94772cb8f2147782833f8bab19c9124fc27b8ae836a14ef8f18

Contents?: true

Size: 1.61 KB

Versions: 1

Compression:

Stored size: 1.61 KB

Contents

module Agent
  module Transport

    class Queue

      @@registry = {}

      def initialize(name, max = 1)
        raise ArgumentError, "queue size must be positive" unless max > 0

        @name = name
        @max = max

        if !@@registry[@name]
          @@registry[@name] = {
            :que => [],
            :wait => [],
            :mutex => Mutex.new,
            :cvar => ConditionVariable.new
          }
        end
      end

      def data;         @@registry[@name];  end
      def que;          data[:que]; end
      def wait;  data[:wait]; end
      def mutex; data[:mutex]; end
      def cvar;  data[:cvar]; end

      def max; @max; end
      def size; que.size; end
      def length; que.size; end

      def push(obj)
        mutex.synchronize {
          while true
            break if que.length < @max
            cvar.wait(mutex)
          end

          que.push obj
          cvar.signal
        }
      end
      alias << push
      alias enq push

      def pop(*args)
        mutex.synchronize {
          while true
            break if !que.empty?
            cvar.wait(mutex)
          end

          retval = que.shift
          cvar.signal

          retval
        }
      end
      alias shift pop
      alias deq pop

      def async?; @max > 1; end

      def send(msg, nonblock = false)
        raise ThreadError, "buffer full" if nonblock && que.length >= @max
        push(msg)
      end

      def receive(nonblock = false)
        raise ThreadError, "buffer empty" if nonblock && que.empty?
        pop
      end

      def close
        @@registry.delete @name
      end

    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
agent-0.1.0 lib/agent/transport/queue.rb