lib/cloudist/queues/basic_queue.rb in cloudist-0.4.4 vs lib/cloudist/queues/basic_queue.rb in cloudist-0.5.0

- old
+ new

@@ -1,50 +1,50 @@ module Cloudist class UnknownReplyTo < RuntimeError; end class ExpiredMessage < RuntimeError; end - + module Queues class BasicQueue attr_reader :queue_name, :options attr_reader :queue, :exchange, :channel, :prefetch - + alias :q :queue alias :ex :exchange alias :mq :channel - + def initialize(queue_name, options = {}) @prefetch ||= options.delete(:prefetch) || 1 - + options = { :auto_delete => true, :durable => false, :nowait => true }.update(options) @queue_name, @options = queue_name, options - + setup end - + def inspect "<#{self.class.name} queue_name=#{queue_name}>" end def setup return if @setup.eql?(true) - + @channel ||= AMQP::Channel.new(Cloudist.connection) do - channel.prefetch(self.prefetch, false) if self.prefetch.present? + channel.prefetch(self.prefetch, false) if self.prefetch end - + @queue = @channel.queue(queue_name, options) - + setup_exchange - + @setup = true end - + def setup_exchange @exchange = channel.direct("") end # def setup_exchange @@ -59,17 +59,17 @@ def tag s = "queue=#{queue.name}" s += " exchange=#{exchange.name}" if exchange s end - + def subscribe(&block) queue.subscribe(:ack => true) do |queue_header, encoded_message| # next if Cloudist.closing? request = Cloudist::Request.new(self, encoded_message, queue_header) - + handle_request = proc { begin raise Cloudist::ExpiredMessage if request.expired? # yield request if block_given? block.call(request) @@ -84,15 +84,15 @@ # unless Cloudist.closing? # finished = Time.now.utc.to_i # log.debug("Finished Job in #{finished - request.start} seconds") end } - + handle_ack = proc { request.ack } - + EM.defer(handle_request, handle_ack) end log.info "AMQP Subscribed: #{tag}" self end @@ -126,6 +126,6 @@ def destroy teardown end end end -end \ No newline at end of file +end