Sha256: 7b0a3bf73334b61cf12116cb3630063745ef7c985939f9de0a1b3590b2eba424

Contents?: true

Size: 1.56 KB

Versions: 3

Compression:

Stored size: 1.56 KB

Contents

module Magent
  class Channel
    def initialize(name)
      @name = name

      if !collection.find_one({:_id => @name}, {:fields => [:_id]})
        collection.save({:_id => @name, :messages => [], :errors => []})
      end
    end

    def enqueue(message, args)
      collection.update({:_id => @name}, {:$push => {:messages => [message, args]}}, :repsert => true)
    end

    def failed(info)
      error_collection.save(info.merge({:channel_id => @name, :created_at => Time.now.utc}))
    end

    def errors(conds = {})
      page = conds.delete(:page) || 1
      per_page = conds.delete(:per_page) || 10

      error_collection.find({:channel_id => @name}, {:offset => (page-1)*per_page, :limit => per_page, :sort => ["created_at"]})
    end

    def dequeue
      Magent.database.eval(%@
        function dequeue() {
          var selector = {_id: '#{@name}'};
          var q = db.channels.findOne(selector, {messages: 1 });
          var m = q.messages[0];
          if(m)
            db.channels.update(selector, { $pop: { messages : -1 } })
          return m;
        }
      @)
    end

    def collection
      self.class.collection
    end

    def error_collection
      @error_collection ||= Magent.database.collection("errors")
    end

    def self.collection
      @collection ||= Magent.database.collection("channels")
    end

    def self.all(&block)
      cursor = collection.find({}, :fields => [:_id])
      if block_given?
        cursor.map {|c| name = c["_id"]; yield name; name }
      else
        cursor.map {|c| c["_id"] }
      end
    end
  end # Channel
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
magent-0.1.0 lib/magent/channel.rb
magent-0.0.3 lib/magent/channel.rb
magent-0.0.2 lib/magent/channel.rb