lib/bunny/channel.rb in bunny-1.1.0.pre2 vs lib/bunny/channel.rb in bunny-1.1.0.rc1
- old
+ new
@@ -1279,10 +1279,11 @@
@connection.send_frame(AMQ::Protocol::Tx::Select.encode(@id))
Bunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_tx_select_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
+ @tx_mode = true
@last_tx_select_ok
end
# Commits current transaction
@@ -1348,10 +1349,11 @@
@connection.send_frame(AMQ::Protocol::Confirm::Select.encode(@id, false))
Bunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_confirm_select_ok = wait_on_continuations
end
+ @confirm_mode = true
raise_if_continuation_resulted_in_a_channel_error!
@last_confirm_select_ok
end
# Blocks calling thread until confirms are received for all
@@ -1418,10 +1420,12 @@
def recover_from_network_failure
@logger.debug "Recovering channel #{@id} after network failure"
release_all_continuations
recover_prefetch_setting
+ recover_confirm_flag
+ recover_tx_mode
recover_exchanges
# this includes recovering bindings
recover_queues
recover_consumers
increment_recoveries_counter
@@ -1433,10 +1437,26 @@
# @api plugin
def recover_prefetch_setting
basic_qos(@prefetch_count) if @prefetch_count
end
+ # Recovers publisher confirms mode. Used by the Automatic Network Failure
+ # Recovery feature.
+ #
+ # @api plugin
+ def recover_confirm_mode
+ confirm_select if @confirm_mode
+ end
+
+ # Recovers transaction mode. Used by the Automatic Network Failure
+ # Recovery feature.
+ #
+ # @api plugin
+ def recover_tx_mode
+ tx_select if @tx_mode
+ end
+
# Recovers exchanges. Used by the Automatic Network Failure
# Recovery feature.
#
# @api plugin
def recover_exchanges
@@ -1679,11 +1699,11 @@
@basic_get_continuations.poll(@connection.continuation_timeout)
ensure
@threads_waiting_on_basic_get_continuations.delete(t)
end
else
- connection.event_loop.run_once until @basic_get_continuations.length > 0
+ connection.reader_loop.run_once until @basic_get_continuations.length > 0
@basic_get_continuations.pop
end
end
@@ -1697,10 +1717,10 @@
@confirms_continuations.poll(@connection.continuation_timeout)
ensure
@threads_waiting_on_confirms_continuations.delete(t)
end
else
- connection.event_loop.run_once until @confirms_continuations.length > 0
+ connection.reader_loop.run_once until @confirms_continuations.length > 0
@confirms_continuations.pop
end
end