lib/bunny/channel.rb in bunny-2.6.1 vs lib/bunny/channel.rb in bunny-2.6.2
- old
+ new
@@ -180,10 +180,13 @@
# synchronizes frameset delivery. MK.
@publishing_mutex = @connection.mutex_impl.new
@consumer_mutex = @connection.mutex_impl.new
+ @queue_mutex = @connection.mutex_impl.new
+ @exchange_mutex = @connection.mutex_impl.new
+
@unconfirmed_set_mutex = @connection.mutex_impl.new
self.reset_continuations
# threads awaiting on continuations. Used to unblock
@@ -1514,21 +1517,21 @@
# Recovers exchanges. Used by the Automatic Network Failure
# Recovery feature.
#
# @api plugin
def recover_exchanges
- @exchanges.values.dup.each do |x|
+ @exchange_mutex.synchronize { @exchanges.values }.each do |x|
x.recover_from_network_failure
end
end
# Recovers queues and bindings. Used by the Automatic Network Failure
# Recovery feature.
#
# @api plugin
def recover_queues
- @queues.values.dup.each do |q|
+ @queue_mutex.synchronize { @queues.values }.each do |q|
@logger.debug { "Recovering queue #{q.name}" }
q.recover_from_network_failure
end
end
@@ -1539,11 +1542,12 @@
def recover_consumers
unless @consumers.empty?
@work_pool = ConsumerWorkPool.new(@work_pool.size, @work_pool.abort_on_exception)
@work_pool.start
end
- @consumers.values.dup.each do |c|
+
+ @consumer_mutex.synchronize { @consumers.values }.each do |c|
c.recover_from_network_failure
end
end
# @private
@@ -1859,40 +1863,40 @@
@connection.read_next_frame(options = {})
end
# @private
def deregister_queue(queue)
- @queues.delete(queue.name)
+ @queue_mutex.synchronize { @queues.delete(queue.name) }
end
# @private
def deregister_queue_named(name)
- @queues.delete(name)
+ @queue_mutex.synchronize { @queues.delete(name) }
end
# @private
def register_queue(queue)
- @queues[queue.name] = queue
+ @queue_mutex.synchronize { @queues[queue.name] = queue }
end
# @private
def find_queue(name)
- @queues[name]
+ @queue_mutex.synchronize { @queues[name] }
end
# @private
def deregister_exchange(exchange)
- @exchanges.delete(exchange.name)
+ @exchange_mutex.synchronize { @exchanges.delete(exchange.name) }
end
# @private
def register_exchange(exchange)
- @exchanges[exchange.name] = exchange
+ @exchange_mutex.synchronize { @exchanges[exchange.name] = exchange }
end
# @private
def find_exchange(name)
- @exchanges[name]
+ @exchange_mutex.synchronize { @exchanges[name] }
end
protected
# @private