lib/bunny/channel.rb in bunny-2.6.1 vs lib/bunny/channel.rb in bunny-2.6.2

- old
+ new

@@ -180,10 +180,13 @@ # synchronizes frameset delivery. MK. @publishing_mutex = @connection.mutex_impl.new @consumer_mutex = @connection.mutex_impl.new + @queue_mutex = @connection.mutex_impl.new + @exchange_mutex = @connection.mutex_impl.new + @unconfirmed_set_mutex = @connection.mutex_impl.new self.reset_continuations # threads awaiting on continuations. Used to unblock @@ -1514,21 +1517,21 @@ # Recovers exchanges. Used by the Automatic Network Failure # Recovery feature. # # @api plugin def recover_exchanges - @exchanges.values.dup.each do |x| + @exchange_mutex.synchronize { @exchanges.values }.each do |x| x.recover_from_network_failure end end # Recovers queues and bindings. Used by the Automatic Network Failure # Recovery feature. # # @api plugin def recover_queues - @queues.values.dup.each do |q| + @queue_mutex.synchronize { @queues.values }.each do |q| @logger.debug { "Recovering queue #{q.name}" } q.recover_from_network_failure end end @@ -1539,11 +1542,12 @@ def recover_consumers unless @consumers.empty? @work_pool = ConsumerWorkPool.new(@work_pool.size, @work_pool.abort_on_exception) @work_pool.start end - @consumers.values.dup.each do |c| + + @consumer_mutex.synchronize { @consumers.values }.each do |c| c.recover_from_network_failure end end # @private @@ -1859,40 +1863,40 @@ @connection.read_next_frame(options = {}) end # @private def deregister_queue(queue) - @queues.delete(queue.name) + @queue_mutex.synchronize { @queues.delete(queue.name) } end # @private def deregister_queue_named(name) - @queues.delete(name) + @queue_mutex.synchronize { @queues.delete(name) } end # @private def register_queue(queue) - @queues[queue.name] = queue + @queue_mutex.synchronize { @queues[queue.name] = queue } end # @private def find_queue(name) - @queues[name] + @queue_mutex.synchronize { @queues[name] } end # @private def deregister_exchange(exchange) - @exchanges.delete(exchange.name) + @exchange_mutex.synchronize { @exchanges.delete(exchange.name) } end # @private def register_exchange(exchange) - @exchanges[exchange.name] = exchange + @exchange_mutex.synchronize { @exchanges[exchange.name] = exchange } end # @private def find_exchange(name) - @exchanges[name] + @exchange_mutex.synchronize { @exchanges[name] } end protected # @private