lib/magent/channel.rb in magent-0.0.1 vs lib/magent/channel.rb in magent-0.0.2
- old
+ new
@@ -2,32 +2,47 @@
class Channel
def initialize(name)
@name = name
if !collection.find_one({:_id => @name}, {:fields => [:_id]})
- collection.save({:_id => @name, :messages => []})
+ collection.save({:_id => @name, :messages => [], :errors => []})
end
end
def enqueue(message, args)
collection.update({:_id => @name}, {:$push => {:messages => [message, args]}}, :repsert => true)
end
+ def failed(info)
+ error_collection.save(info.merge({:channel_id => @name, :created_at => Time.now.utc}))
+ end
+
+ def errors(conds = {})
+ page = conds.delete(:page) || 1
+ per_page = conds.delete(:per_page) || 10
+
+ error_collection.find({:channel_id => @name}, {:offset => (page-1)*per_page, :limit => per_page, :sort => ["created_at"]})
+ end
+
def dequeue
Magent.database.eval(%@
function dequeue() {
- return db.eval(function() {
- var q = db.channels.findOne({_id: '#{@name}'});
- var m = q.messages.shift();
- db.channels.save(q); //slow
- return m;
- });
+ var selector = {_id: '#{@name}'};
+ var q = db.channels.findOne(selector, {messages: 1 });
+ var m = q.messages[0];
+ if(m)
+ db.channels.update(selector, { $pop: { messages : -1 } })
+ return m;
}
@)
end
def collection
self.class.collection
+ end
+
+ def error_collection
+ @error_collection ||= Magent.database.collection("errors")
end
def self.collection
@collection ||= Magent.database.collection("channels")
end