lib/cloudist/queues/basic_queue.rb in cloudist-0.4.4 vs lib/cloudist/queues/basic_queue.rb in cloudist-0.5.0
- old
+ new
@@ -1,50 +1,50 @@
module Cloudist
class UnknownReplyTo < RuntimeError; end
class ExpiredMessage < RuntimeError; end
-
+
module Queues
class BasicQueue
attr_reader :queue_name, :options
attr_reader :queue, :exchange, :channel, :prefetch
-
+
alias :q :queue
alias :ex :exchange
alias :mq :channel
-
+
def initialize(queue_name, options = {})
@prefetch ||= options.delete(:prefetch) || 1
-
+
options = {
:auto_delete => true,
:durable => false,
:nowait => true
}.update(options)
@queue_name, @options = queue_name, options
-
+
setup
end
-
+
def inspect
"<#{self.class.name} queue_name=#{queue_name}>"
end
def setup
return if @setup.eql?(true)
-
+
@channel ||= AMQP::Channel.new(Cloudist.connection) do
- channel.prefetch(self.prefetch, false) if self.prefetch.present?
+ channel.prefetch(self.prefetch, false) if self.prefetch
end
-
+
@queue = @channel.queue(queue_name, options)
-
+
setup_exchange
-
+
@setup = true
end
-
+
def setup_exchange
@exchange = channel.direct("")
end
# def setup_exchange
@@ -59,17 +59,17 @@
def tag
s = "queue=#{queue.name}"
s += " exchange=#{exchange.name}" if exchange
s
end
-
+
def subscribe(&block)
queue.subscribe(:ack => true) do |queue_header, encoded_message|
# next if Cloudist.closing?
request = Cloudist::Request.new(self, encoded_message, queue_header)
-
+
handle_request = proc {
begin
raise Cloudist::ExpiredMessage if request.expired?
# yield request if block_given?
block.call(request)
@@ -84,15 +84,15 @@
# unless Cloudist.closing?
# finished = Time.now.utc.to_i
# log.debug("Finished Job in #{finished - request.start} seconds")
end
}
-
+
handle_ack = proc {
request.ack
}
-
+
EM.defer(handle_request, handle_ack)
end
log.info "AMQP Subscribed: #{tag}"
self
end
@@ -126,6 +126,6 @@
def destroy
teardown
end
end
end
-end
\ No newline at end of file
+end