Sha256: 865dfe527d570d52d80c83c32845fafeb317f3d6127b6cc11f8d721b3637007c

Contents?: true

Size: 1.57 KB

Versions: 1

Compression:

Stored size: 1.57 KB

Contents

module Magent
  class GenericChannel
    def initialize(name)
      @name = name

      if !collection.find_one({:_id => @name}, {:fields => [:_id]})
        collection.save({:_id => @name, :messages => []})
      end
    end

    def enqueue(message)
      collection.update({:_id => @name}, {:$push => {:messages => message}, :$inc => {:message_count => 1}}, :repsert => true)
    end

    def message_count
      channel = collection.find({:_id => @name}, :fields => [:message_count]).next_object
      if channel
        channel["message_count"] || 0
      else
        0
      end
    end

    def queue_count
      Magent.database.eval(%@
        function queue_count() {
          var selector = {_id: '#{@name}'};
          var q = db.channels.findOne(selector, {messages: 1 });
          return q.messages.length;
        }
      @)
    end

    def dequeue
      Magent.database.eval(%@
        function dequeue() {
          var selector = {_id: '#{@name}'};
          var q = db.channels.findOne(selector, {messages: 1 });
          var m = q.messages[0];
          if(m)
            db.channels.update(selector, { $pop: { messages : -1 } })
          return m;
        }
      @)
    end

    def collection
      self.class.collection
    end

    def self.collection
      @collection ||= Magent.database.collection("channels")
    end

    def self.all(&block)
      cursor = collection.find({}, :fields => [:_id])
      if block_given?
        cursor.map {|c| name = c["_id"]; yield name; name }
      else
        cursor.map {|c| c["_id"] }
      end
    end
  end # GenericChannel
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
magent-0.1.3 lib/magent/generic_channel.rb