lib/amqp/channel.rb in amqp-0.7.1 vs lib/amqp/channel.rb in amqp-0.7.2
- old
+ new
@@ -136,19 +136,25 @@
@_send_mutex = Mutex.new
@get_queue_mutex = Mutex.new
@connection = connection || AMQP.start
+ @queues_awaiting_declare_ok = Array.new
+
conn.callback { |c|
@channel = c.add_channel(self)
send Protocol::Channel::Open.new
}
end
attr_reader :channel, :connection, :status
alias :conn :connection
+ attr_reader :queues_awaiting_declare_ok
+
+
+
def closed?
@status.eql?(:closed)
end
def open?
@@ -238,10 +244,22 @@
else
self.exchanges << Exchange.new(self, :direct, name, opts, &block)
end
end
+ # Returns exchange object with the same name as default (aka unnamed) exchange.
+ # Default exchange is a direct exchange and automatically routes messages to
+ # queues when routing key matches queue name exactly. This feature is known as
+ # "automatic binding" (of queues to default exchange).
+ #
+ # *Use default exchange when you want to route messages directly to specific queues*
+ # (queue names are known, you don't mind this kind of coupling between applications).
+ def default_exchange
+ Exchange.default(self)
+ end
+
+
# Defines, intializes and returns an Exchange to act as an ingress
# point for all published messages.
#
# == Fanout
# A fanout exchange is useful for 1:N communication where one publisher
@@ -607,18 +625,23 @@
# If set, the server will not respond to the method. The client should
# not wait for a reply method. If the server could not complete the
# method it will raise a channel or connection exception.
#
def queue(name, opts = {}, &block)
- if queue = self.queues.find { |queue| queue.name == name }
+ raise ArgumentError, "queue name must not be nil. Use '' (empty string) for server-named queues." if name.nil?
+
+ if name && !name.empty? && (queue = self.queues.find { |queue| queue.name == name })
extended_opts = Queue.add_default_options(name, opts, block)
validate_parameters_match!(queue, extended_opts)
queue
else
- self.queues << Queue.new(self, name, opts, &block)
+ q = Queue.new(self, name, opts, &block)
+ self.queues << q
+
+ q
end
end
def queue!(name, opts = {}, &block)
self.queues.add! Queue.new(self, name, opts, &block)
@@ -744,10 +767,13 @@
end
def reset
@deferred_status = nil
@channel = nil
+
+ @queues_awaiting_declare_ok = Array.new
+
initialize @connection
@consumers = {}
exs = @exchanges
@@ -855,16 +881,13 @@
exchanges = self.exchanges.select { |exchange| exchange.opts[:nowait].eql?(false) }
exchange = exchanges.reverse.find { |exchange| exchange.status.eql?(:unfinished) }
exchange.receive_response method
when Protocol::Queue::DeclareOk
- # We can't use queues[method.queue] because if the name would
- # be an empty string, then AMQP broker generated a random one.
- queues = self.queues.select { |queue| queue.opts[:nowait].eql?(false) }
- queue = queues.reverse.find { |queue| queue.status.eql?(:unfinished) }
- queue.receive_status method
+ queue = @queues_awaiting_declare_ok.shift
+ queue.receive_status method
when Protocol::Queue::BindOk
# We can't use queues[method.queue] because if the name would
# be an empty string, then AMQP broker generated a random one.
queues = self.queues.select { |queue| queue.sync_bind }
queue = queues.reverse.find { |queue| queue.status.eql?(:unbound) }
@@ -933,5 +956,27 @@
end # Channel
end # AMQP
MQ = AMQP::Channel
+
+#
+# Backwards compatibility with 0.6.x
+#
+
+class MQ
+ # unique identifier
+ def MQ.id
+ Thread.current[:mq_id] ||= "#{`hostname`.strip}-#{Process.pid}-#{Thread.current.object_id}"
+ end
+
+ def MQ.default
+ # TODO: clear this when connection is closed
+ Thread.current[:mq] ||= MQ.new
+ end
+
+ # Allows for calls to all MQ instance methods. This implicitly calls
+ # MQ.new so that a new channel is allocated for subsequent operations.
+ def MQ.method_missing(meth, *args, &blk)
+ MQ.default.__send__(meth, *args, &blk)
+ end
+end