lib/amqp/channel.rb in amqp-0.9.5 vs lib/amqp/channel.rb in amqp-0.9.6

- old
+ new

@@ -225,10 +225,12 @@ # ... # # Read more about EM::Deferrable#callback behavior in EventMachine documentation. MK. @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new + @parameter_checks = {:queue => [:durable, :exclusive, :auto_delete, :arguments], :exchange => [:type, :durable, :arguments]} + # only send channel.open when connection is actually open. Makes it possible to # do c = AMQP.connect; AMQP::Channel.new(c) that is what some people do. MK. @connection.on_connection do self.open do |ch, open_ok| @channel_is_open_deferrable.succeed @@ -377,11 +379,11 @@ # @api public def direct(name = 'amq.direct', opts = {}, &block) if exchange = find_exchange(name) extended_opts = Exchange.add_default_options(:direct, name, opts, block) - validate_parameters_match!(exchange, extended_opts) + validate_parameters_match!(exchange, extended_opts, :exchange) block.call(exchange) if block exchange else register_exchange(Exchange.new(self, :direct, name, opts, &block)) @@ -485,11 +487,11 @@ # @api public def fanout(name = 'amq.fanout', opts = {}, &block) if exchange = find_exchange(name) extended_opts = Exchange.add_default_options(:fanout, name, opts, block) - validate_parameters_match!(exchange, extended_opts) + validate_parameters_match!(exchange, extended_opts, :exchange) block.call(exchange) if block exchange else register_exchange(Exchange.new(self, :fanout, name, opts, &block)) @@ -601,11 +603,11 @@ # @api public def topic(name = 'amq.topic', opts = {}, &block) if exchange = find_exchange(name) extended_opts = Exchange.add_default_options(:topic, name, opts, block) - validate_parameters_match!(exchange, extended_opts) + validate_parameters_match!(exchange, extended_opts, :exchange) block.call(exchange) if block exchange else register_exchange(Exchange.new(self, :topic, name, opts, &block)) @@ -707,11 +709,11 @@ # @api public def headers(name = 'amq.match', opts = {}, &block) if exchange = find_exchange(name) extended_opts = Exchange.add_default_options(:headers, name, opts, block) - validate_parameters_match!(exchange, extended_opts) + validate_parameters_match!(exchange, extended_opts, :exchange) block.call(exchange) if block exchange else register_exchange(Exchange.new(self, :headers, name, opts, &block)) @@ -810,11 +812,11 @@ raise ArgumentError.new("queue name must not be nil; if you want broker to generate queue name for you, pass an empty string") if name.nil? if name && !name.empty? && (queue = find_queue(name)) extended_opts = Queue.add_default_options(name, opts, block) - validate_parameters_match!(queue, extended_opts) + validate_parameters_match!(queue, extended_opts, :queue) block.call(queue) if block queue else self.queue!(name, opts, &block) @@ -1264,17 +1266,15 @@ def self.method_missing(meth, *args, &blk) self.default.__send__(meth, *args, &blk) end - protected - # @private - def validate_parameters_match!(entity, parameters) - parameters.delete(:no_declare) - unless entity.opts == parameters || parameters[:passive] + @private + def validate_parameters_match!(entity, parameters, type) + unless entity.opts.values_at(*@parameter_checks[type]) == parameters.values_at(*@parameter_checks[type]) || parameters[:passive] raise AMQP::IncompatibleOptionsError.new(entity.name, entity.opts, parameters) end - end # validate_parameters_match!(entity, parameters) + end # validate_parameters_match!(entity, parameters, type) end # Channel end # AMQP