lib/amqp/channel.rb in amqp-0.7.1 vs lib/amqp/channel.rb in amqp-0.7.2

- old
+ new

@@ -136,19 +136,25 @@ @_send_mutex = Mutex.new @get_queue_mutex = Mutex.new @connection = connection || AMQP.start + @queues_awaiting_declare_ok = Array.new + conn.callback { |c| @channel = c.add_channel(self) send Protocol::Channel::Open.new } end attr_reader :channel, :connection, :status alias :conn :connection + attr_reader :queues_awaiting_declare_ok + + + def closed? @status.eql?(:closed) end def open? @@ -238,10 +244,22 @@ else self.exchanges << Exchange.new(self, :direct, name, opts, &block) end end + # Returns exchange object with the same name as default (aka unnamed) exchange. + # Default exchange is a direct exchange and automatically routes messages to + # queues when routing key matches queue name exactly. This feature is known as + # "automatic binding" (of queues to default exchange). + # + # *Use default exchange when you want to route messages directly to specific queues* + # (queue names are known, you don't mind this kind of coupling between applications). + def default_exchange + Exchange.default(self) + end + + # Defines, intializes and returns an Exchange to act as an ingress # point for all published messages. # # == Fanout # A fanout exchange is useful for 1:N communication where one publisher @@ -607,18 +625,23 @@ # If set, the server will not respond to the method. The client should # not wait for a reply method. If the server could not complete the # method it will raise a channel or connection exception. # def queue(name, opts = {}, &block) - if queue = self.queues.find { |queue| queue.name == name } + raise ArgumentError, "queue name must not be nil. Use '' (empty string) for server-named queues." if name.nil? + + if name && !name.empty? && (queue = self.queues.find { |queue| queue.name == name }) extended_opts = Queue.add_default_options(name, opts, block) validate_parameters_match!(queue, extended_opts) queue else - self.queues << Queue.new(self, name, opts, &block) + q = Queue.new(self, name, opts, &block) + self.queues << q + + q end end def queue!(name, opts = {}, &block) self.queues.add! Queue.new(self, name, opts, &block) @@ -744,10 +767,13 @@ end def reset @deferred_status = nil @channel = nil + + @queues_awaiting_declare_ok = Array.new + initialize @connection @consumers = {} exs = @exchanges @@ -855,16 +881,13 @@ exchanges = self.exchanges.select { |exchange| exchange.opts[:nowait].eql?(false) } exchange = exchanges.reverse.find { |exchange| exchange.status.eql?(:unfinished) } exchange.receive_response method when Protocol::Queue::DeclareOk - # We can't use queues[method.queue] because if the name would - # be an empty string, then AMQP broker generated a random one. - queues = self.queues.select { |queue| queue.opts[:nowait].eql?(false) } - queue = queues.reverse.find { |queue| queue.status.eql?(:unfinished) } - queue.receive_status method + queue = @queues_awaiting_declare_ok.shift + queue.receive_status method when Protocol::Queue::BindOk # We can't use queues[method.queue] because if the name would # be an empty string, then AMQP broker generated a random one. queues = self.queues.select { |queue| queue.sync_bind } queue = queues.reverse.find { |queue| queue.status.eql?(:unbound) } @@ -933,5 +956,27 @@ end # Channel end # AMQP MQ = AMQP::Channel + +# +# Backwards compatibility with 0.6.x +# + +class MQ + # unique identifier + def MQ.id + Thread.current[:mq_id] ||= "#{`hostname`.strip}-#{Process.pid}-#{Thread.current.object_id}" + end + + def MQ.default + # TODO: clear this when connection is closed + Thread.current[:mq] ||= MQ.new + end + + # Allows for calls to all MQ instance methods. This implicitly calls + # MQ.new so that a new channel is allocated for subsequent operations. + def MQ.method_missing(meth, *args, &blk) + MQ.default.__send__(meth, *args, &blk) + end +end