lib/bunny/channel.rb in bunny-1.1.0.pre2 vs lib/bunny/channel.rb in bunny-1.1.0.rc1

- old
+ new

@@ -1279,10 +1279,11 @@ @connection.send_frame(AMQ::Protocol::Tx::Select.encode(@id)) Bunny::Timeout.timeout(read_write_timeout, ClientTimeout) do @last_tx_select_ok = wait_on_continuations end raise_if_continuation_resulted_in_a_channel_error! + @tx_mode = true @last_tx_select_ok end # Commits current transaction @@ -1348,10 +1349,11 @@ @connection.send_frame(AMQ::Protocol::Confirm::Select.encode(@id, false)) Bunny::Timeout.timeout(read_write_timeout, ClientTimeout) do @last_confirm_select_ok = wait_on_continuations end + @confirm_mode = true raise_if_continuation_resulted_in_a_channel_error! @last_confirm_select_ok end # Blocks calling thread until confirms are received for all @@ -1418,10 +1420,12 @@ def recover_from_network_failure @logger.debug "Recovering channel #{@id} after network failure" release_all_continuations recover_prefetch_setting + recover_confirm_flag + recover_tx_mode recover_exchanges # this includes recovering bindings recover_queues recover_consumers increment_recoveries_counter @@ -1433,10 +1437,26 @@ # @api plugin def recover_prefetch_setting basic_qos(@prefetch_count) if @prefetch_count end + # Recovers publisher confirms mode. Used by the Automatic Network Failure + # Recovery feature. + # + # @api plugin + def recover_confirm_mode + confirm_select if @confirm_mode + end + + # Recovers transaction mode. Used by the Automatic Network Failure + # Recovery feature. + # + # @api plugin + def recover_tx_mode + tx_select if @tx_mode + end + # Recovers exchanges. Used by the Automatic Network Failure # Recovery feature. # # @api plugin def recover_exchanges @@ -1679,11 +1699,11 @@ @basic_get_continuations.poll(@connection.continuation_timeout) ensure @threads_waiting_on_basic_get_continuations.delete(t) end else - connection.event_loop.run_once until @basic_get_continuations.length > 0 + connection.reader_loop.run_once until @basic_get_continuations.length > 0 @basic_get_continuations.pop end end @@ -1697,10 +1717,10 @@ @confirms_continuations.poll(@connection.continuation_timeout) ensure @threads_waiting_on_confirms_continuations.delete(t) end else - connection.event_loop.run_once until @confirms_continuations.length > 0 + connection.reader_loop.run_once until @confirms_continuations.length > 0 @confirms_continuations.pop end end