lib/amqp/channel.rb in amqp-0.9.5 vs lib/amqp/channel.rb in amqp-0.9.6
- old
+ new
@@ -225,10 +225,12 @@
# ...
#
# Read more about EM::Deferrable#callback behavior in EventMachine documentation. MK.
@channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new
+ @parameter_checks = {:queue => [:durable, :exclusive, :auto_delete, :arguments], :exchange => [:type, :durable, :arguments]}
+
# only send channel.open when connection is actually open. Makes it possible to
# do c = AMQP.connect; AMQP::Channel.new(c) that is what some people do. MK.
@connection.on_connection do
self.open do |ch, open_ok|
@channel_is_open_deferrable.succeed
@@ -377,11 +379,11 @@
# @api public
def direct(name = 'amq.direct', opts = {}, &block)
if exchange = find_exchange(name)
extended_opts = Exchange.add_default_options(:direct, name, opts, block)
- validate_parameters_match!(exchange, extended_opts)
+ validate_parameters_match!(exchange, extended_opts, :exchange)
block.call(exchange) if block
exchange
else
register_exchange(Exchange.new(self, :direct, name, opts, &block))
@@ -485,11 +487,11 @@
# @api public
def fanout(name = 'amq.fanout', opts = {}, &block)
if exchange = find_exchange(name)
extended_opts = Exchange.add_default_options(:fanout, name, opts, block)
- validate_parameters_match!(exchange, extended_opts)
+ validate_parameters_match!(exchange, extended_opts, :exchange)
block.call(exchange) if block
exchange
else
register_exchange(Exchange.new(self, :fanout, name, opts, &block))
@@ -601,11 +603,11 @@
# @api public
def topic(name = 'amq.topic', opts = {}, &block)
if exchange = find_exchange(name)
extended_opts = Exchange.add_default_options(:topic, name, opts, block)
- validate_parameters_match!(exchange, extended_opts)
+ validate_parameters_match!(exchange, extended_opts, :exchange)
block.call(exchange) if block
exchange
else
register_exchange(Exchange.new(self, :topic, name, opts, &block))
@@ -707,11 +709,11 @@
# @api public
def headers(name = 'amq.match', opts = {}, &block)
if exchange = find_exchange(name)
extended_opts = Exchange.add_default_options(:headers, name, opts, block)
- validate_parameters_match!(exchange, extended_opts)
+ validate_parameters_match!(exchange, extended_opts, :exchange)
block.call(exchange) if block
exchange
else
register_exchange(Exchange.new(self, :headers, name, opts, &block))
@@ -810,11 +812,11 @@
raise ArgumentError.new("queue name must not be nil; if you want broker to generate queue name for you, pass an empty string") if name.nil?
if name && !name.empty? && (queue = find_queue(name))
extended_opts = Queue.add_default_options(name, opts, block)
- validate_parameters_match!(queue, extended_opts)
+ validate_parameters_match!(queue, extended_opts, :queue)
block.call(queue) if block
queue
else
self.queue!(name, opts, &block)
@@ -1264,17 +1266,15 @@
def self.method_missing(meth, *args, &blk)
self.default.__send__(meth, *args, &blk)
end
-
protected
- # @private
- def validate_parameters_match!(entity, parameters)
- parameters.delete(:no_declare)
- unless entity.opts == parameters || parameters[:passive]
+ @private
+ def validate_parameters_match!(entity, parameters, type)
+ unless entity.opts.values_at(*@parameter_checks[type]) == parameters.values_at(*@parameter_checks[type]) || parameters[:passive]
raise AMQP::IncompatibleOptionsError.new(entity.name, entity.opts, parameters)
end
- end # validate_parameters_match!(entity, parameters)
+ end # validate_parameters_match!(entity, parameters, type)
end # Channel
end # AMQP