lib/amq/client/async/adapters/event_machine.rb in amq-client-0.7.0.alpha35 vs lib/amq/client/async/adapters/event_machine.rb in amq-client-0.8.0

- old
+ new

@@ -19,10 +19,12 @@ # # API # + # @group Connection operations + # Initiates connection to AMQP broker. If callback is given, runs it when (and if) AMQP connection # succeeds. # # @option settings [String] :host ("127.0.0.1") Hostname AMQ broker runs on. # @option settings [String] :port (5672) Port AMQ broker listens on. @@ -58,22 +60,49 @@ return end if !@reconnecting @reconnecting = true + self.reset + end - self.handle_connection_interruption + EventMachine.reconnect(@settings[:host], @settings[:port], self) + end + + # Similar to #reconnect, but uses different connection settings + # @see #reconnect + # @api public + def reconnect_to(settings, period = 5) + if !@reconnecting + @reconnecting = true self.reset end + @settings = Settings.configure(settings) EventMachine.reconnect(@settings[:host], @settings[:port], self) end + # Periodically try to reconnect. + # + # @param [Fixnum] period Period of time, in seconds, to wait before reconnection attempt. + # @param [Boolean] force If true, enforces immediate reconnection. + # @api public + def periodically_reconnect(period = 5) + @reconnecting = true + self.reset + @periodic_reconnection_timer = EventMachine::PeriodicTimer.new(period) { + EventMachine.reconnect(@settings[:host], @settings[:port], self) + } + end + # @endgroup + + + # Defines a callback that will be executed when AMQP connection is considered open: # client and broker has agreed on max channel identifier and maximum allowed frame # size and authentication succeeds. You can define more than one callback. # # @see on_possible_authentication_failure @@ -90,34 +119,10 @@ def on_closed(&block) @disconnection_deferrable.callback(&block) end # on_closed(&block) alias on_disconnection on_closed - # Defines a callback that will be run when initial TCP connection fails. - # You can define only one callback. - # - # @api public - def on_tcp_connection_failure(&block) - @on_tcp_connection_failure = block - end - - # Defines a callback that will be run when TCP connection to AMQP broker is lost (interrupted). - # You can define only one callback. - # - # @api public - def on_tcp_connection_loss(&block) - @on_tcp_connection_loss = block - end - - # Defines a callback that will be run when TCP connection is closed before authentication - # finishes. Usually this means authentication failure. You can define only one callback. - # - # @api public - def on_possible_authentication_failure(&block) - @on_possible_authentication_failure = block - end - # @see #on_open # @private def register_connection_callback(&block) unless block.nil? # delay calling block we were given till after we receive @@ -160,10 +165,12 @@ @mechanism = "PLAIN" @locale = @settings.fetch(:locale, "en_GB") @client_properties = Settings.client_properties.merge(@settings.fetch(:client_properties, Hash.new)) + @auto_recovery = (!!@settings[:auto_recovery]) + self.reset self.set_pending_connect_timeout((@settings[:timeout] || 3).to_f) unless defined?(JRUBY_VERSION) if self.heartbeat_interval > 0 @last_server_heartbeat = Time.now @@ -201,12 +208,10 @@ end # tcp_connection_established? - - # # Implementation # # Backwards compatibility with 0.7.0.a25. MK. @@ -241,15 +246,26 @@ def connection_completed # we only can safely set this value here because EventMachine is a lovely piece of # software that calls #post_init before #unbind even when TCP connection # fails. MK. @tcp_connection_established = true + @periodic_reconnection_timer.cancel if @periodic_reconnection_timer + + # again, this is because #unbind is called in different situations # and there is no easy way to tell initial connection failure # from connection loss. Not in EventMachine 0.12.x, anyway. MK. - @had_successfull_connected_before = true + if @had_successfully_connected_before + @recovered = true + + + self.start_automatic_recovery + end + + # now we can set it. MK. + @had_successfully_connected_before = true @reconnecting = false self.handshake end @@ -266,25 +282,25 @@ # * Our peer closes TCP connection down # * There is a network connection issue # * Initial TCP connection fails # @private def unbind(exception = nil) - if !@tcp_connection_established && !@had_successfull_connected_before && !@intentionally_closing_connection + if !@tcp_connection_established && !@had_successfully_connected_before && !@intentionally_closing_connection @tcp_connection_failed = true self.tcp_connection_failed end closing! @tcp_connection_established = false - self.handle_connection_interruption + self.handle_connection_interruption if @reconnecting @disconnection_deferrable.succeed closed! - self.tcp_connection_lost if !@intentionally_closing_connection && @had_successfull_connected_before + self.tcp_connection_lost if !@intentionally_closing_connection && @had_successfully_connected_before # since AMQP spec dictates that authentication failure is a protocol exception # and protocol exceptions result in connection closure, check whether we are # in the authentication stage. If so, it is likely to signal an authentication # issue. Java client behaves the same way. MK. @@ -384,9 +400,9 @@ start_tls(tls_options) elsif tls_options start_tls end end # upgrade_to_tls_if_necessary - end # EventMachineClient + end # EventMachineClient end # Async end # Client end # AMQ