Sha256: 398de8170801eed0a7505b486b5c8f6e13fbd6ecf907e50caef4d68aa800a6aa

Contents?: true

Size: 1.04 KB

Versions: 2

Compression:

Stored size: 1.04 KB

Contents

module Magent
  class GenericChannel
    include Magent::Failure
    include Magent::Stats

    attr_reader :name
    attr_reader :current_job

    def initialize(name)
      @name = "magent.#{name}"
    end

    def enqueue(message, priority = 3)
      collection.save({:_id => generate_uid, :message => message, :priority => priority, :created_at => Time.now.to_i})
    end

    def message_count
      collection.count # TODO: number of processed messages (create a collection for stats)
    end

    def queue_count
      collection.count
    end

    def dequeue
      if @current_job = self.next_message
        @current_job["message"]
      end
    end

    def next_message
      collection.find_and_modify(:sort => [[:priority, Mongo::ASCENDING], [:created_at, Mongo::DESCENDING]],
                                 :remove => true) rescue {}
    end

    def collection
      @collection ||= Magent.database.collection(@name)
    end

    protected
    def generate_uid
      UUIDTools::UUID.random_create.hexdigest
    end
  end # GenericChannel
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
magent-0.6.2 lib/magent/generic_channel.rb
magent-0.6.1 lib/magent/generic_channel.rb