lib/cloudist/queue.rb in cloudist-0.4.3 vs lib/cloudist/queue.rb in cloudist-0.4.4
- old
+ new
@@ -1,116 +1,119 @@
module Cloudist
+ #
+ # NOTE: Queue is Deprecated, please use BasicQueue
+ #
class Queue
-
+
attr_reader :options, :name, :channel, :q, :ex
-
+
class_attribute :cached_queues
-
+
def initialize(name, options = {})
self.class.cached_queues ||= {}
-
+
options = {
:auto_delete => false,
:durable => true
}.update(options)
-
+
@name, @options = name, options
-
+
setup
p self.cached_queues.keys
-
+
log.debug(tag)
purge
end
-
+
def purge
q.purge
end
-
+
def inspect
"<#{self.class.name} name=#{name} exchange=#{ex ? ex.name : 'nil'}>"
end
-
+
def log
Cloudist.log
end
def tag
[].tap { |a|
a << "queue=#{q.name}" if q
a << "exchange=#{ex.name}" if ex
}.join(' ')
end
-
+
def publish(msg)
raise ArgumentError, "Publish expects a Cloudist::Message object" unless msg.is_a?(Cloudist::Message)
-
+
body, headers = msg.encoded
# EM.defer {
publish_to_ex(body, headers)
# }
-
+
p msg.body.to_hash
end
-
+
# def channel
# self.class.channel
# end
- #
+ #
# def q
# self.class.q
# end
- #
+ #
# def ex
# self.class.ex
# end
-
+
def publish_to_ex(body, headers = {})
ex.publish(body, headers)
end
-
+
def publish_to_q(body, headers = {})
q.publish(body, headers)
end
-
+
def teardown
q.unsubscribe
channel.close
log.debug "AMQP Unsubscribed: #{tag}"
end
def destroy
teardown
end
-
+
def subscribe(options = {}, &block)
options[:ack] = true
q.subscribe(options) do |queue_header, encoded_message|
request = Cloudist::Request.new(self, encoded_message, queue_header)
-
+
msg = Cloudist::Message.new(*request.for_message)
-
+
EM.defer do
begin
raise Cloudist::ExpiredMessage if request.expired?
yield msg
-
+
rescue Cloudist::ExpiredMessage
log.error "AMQP Message Timeout: #{tag} ttl=#{request.ttl} age=#{request.age}"
-
+
rescue Exception => e
Cloudist.handle_error(e)
-
+
ensure
request.ack unless Cloudist.closing?
end
end
end
end
-
+
private
-
+
def setup
if self.class.cached_queues.keys.include?(name.to_sym)
@q = self.class.cached_queues[name.to_sym][:q]
@ex = self.class.cached_queues[name.to_sym][:ex]
@channel = self.class.cached_queues[name.to_sym][:channel]
@@ -124,29 +127,29 @@
self.class.cached_queues[name.to_sym] = {:q => q, :ex => ex, :channel => channel}
end
setup_binding
end
-
+
def setup_channel
@channel = ::AMQP::Channel.new
-
+
# Set up QOS. If you do not do this then the subscribe in receive_message
# will get overwelmd and the whole thing will collapse in on itself.
channel.prefetch(1)
end
-
+
def setup_queue
@q = channel.queue(name, options)
end
-
+
def setup_exchange
@ex = channel.direct(name)
# setup_binding
end
-
+
def setup_binding
q.bind(ex)
end
-
+
end
end