lib/magent/channel.rb in magent-0.1.0 vs lib/magent/channel.rb in magent-0.1.1

- old
+ new

@@ -1,17 +1,9 @@ module Magent - class Channel - def initialize(name) - @name = name - - if !collection.find_one({:_id => @name}, {:fields => [:_id]}) - collection.save({:_id => @name, :messages => [], :errors => []}) - end - end - + class Channel < GenericChannel def enqueue(message, args) - collection.update({:_id => @name}, {:$push => {:messages => [message, args]}}, :repsert => true) + super([message, args]) end def failed(info) error_collection.save(info.merge({:channel_id => @name, :created_at => Time.now.utc})) end @@ -21,40 +13,10 @@ per_page = conds.delete(:per_page) || 10 error_collection.find({:channel_id => @name}, {:offset => (page-1)*per_page, :limit => per_page, :sort => ["created_at"]}) end - def dequeue - Magent.database.eval(%@ - function dequeue() { - var selector = {_id: '#{@name}'}; - var q = db.channels.findOne(selector, {messages: 1 }); - var m = q.messages[0]; - if(m) - db.channels.update(selector, { $pop: { messages : -1 } }) - return m; - } - @) - end - - def collection - self.class.collection - end - def error_collection @error_collection ||= Magent.database.collection("errors") - end - - def self.collection - @collection ||= Magent.database.collection("channels") - end - - def self.all(&block) - cursor = collection.find({}, :fields => [:_id]) - if block_given? - cursor.map {|c| name = c["_id"]; yield name; name } - else - cursor.map {|c| c["_id"] } - end end end # Channel end