lib/amq/client/async/adapters/event_machine.rb in amq-client-0.7.0.alpha35 vs lib/amq/client/async/adapters/event_machine.rb in amq-client-0.8.0
- old
+ new
@@ -19,10 +19,12 @@
#
# API
#
+ # @group Connection operations
+
# Initiates connection to AMQP broker. If callback is given, runs it when (and if) AMQP connection
# succeeds.
#
# @option settings [String] :host ("127.0.0.1") Hostname AMQ broker runs on.
# @option settings [String] :port (5672) Port AMQ broker listens on.
@@ -58,22 +60,49 @@
return
end
if !@reconnecting
@reconnecting = true
+ self.reset
+ end
- self.handle_connection_interruption
+ EventMachine.reconnect(@settings[:host], @settings[:port], self)
+ end
+
+ # Similar to #reconnect, but uses different connection settings
+ # @see #reconnect
+ # @api public
+ def reconnect_to(settings, period = 5)
+ if !@reconnecting
+ @reconnecting = true
self.reset
end
+ @settings = Settings.configure(settings)
EventMachine.reconnect(@settings[:host], @settings[:port], self)
end
+ # Periodically try to reconnect.
+ #
+ # @param [Fixnum] period Period of time, in seconds, to wait before reconnection attempt.
+ # @param [Boolean] force If true, enforces immediate reconnection.
+ # @api public
+ def periodically_reconnect(period = 5)
+ @reconnecting = true
+ self.reset
+ @periodic_reconnection_timer = EventMachine::PeriodicTimer.new(period) {
+ EventMachine.reconnect(@settings[:host], @settings[:port], self)
+ }
+ end
+ # @endgroup
+
+
+
# Defines a callback that will be executed when AMQP connection is considered open:
# client and broker has agreed on max channel identifier and maximum allowed frame
# size and authentication succeeds. You can define more than one callback.
#
# @see on_possible_authentication_failure
@@ -90,34 +119,10 @@
def on_closed(&block)
@disconnection_deferrable.callback(&block)
end # on_closed(&block)
alias on_disconnection on_closed
- # Defines a callback that will be run when initial TCP connection fails.
- # You can define only one callback.
- #
- # @api public
- def on_tcp_connection_failure(&block)
- @on_tcp_connection_failure = block
- end
-
- # Defines a callback that will be run when TCP connection to AMQP broker is lost (interrupted).
- # You can define only one callback.
- #
- # @api public
- def on_tcp_connection_loss(&block)
- @on_tcp_connection_loss = block
- end
-
- # Defines a callback that will be run when TCP connection is closed before authentication
- # finishes. Usually this means authentication failure. You can define only one callback.
- #
- # @api public
- def on_possible_authentication_failure(&block)
- @on_possible_authentication_failure = block
- end
-
# @see #on_open
# @private
def register_connection_callback(&block)
unless block.nil?
# delay calling block we were given till after we receive
@@ -160,10 +165,12 @@
@mechanism = "PLAIN"
@locale = @settings.fetch(:locale, "en_GB")
@client_properties = Settings.client_properties.merge(@settings.fetch(:client_properties, Hash.new))
+ @auto_recovery = (!!@settings[:auto_recovery])
+
self.reset
self.set_pending_connect_timeout((@settings[:timeout] || 3).to_f) unless defined?(JRUBY_VERSION)
if self.heartbeat_interval > 0
@last_server_heartbeat = Time.now
@@ -201,12 +208,10 @@
end # tcp_connection_established?
-
-
#
# Implementation
#
# Backwards compatibility with 0.7.0.a25. MK.
@@ -241,15 +246,26 @@
def connection_completed
# we only can safely set this value here because EventMachine is a lovely piece of
# software that calls #post_init before #unbind even when TCP connection
# fails. MK.
@tcp_connection_established = true
+ @periodic_reconnection_timer.cancel if @periodic_reconnection_timer
+
+
# again, this is because #unbind is called in different situations
# and there is no easy way to tell initial connection failure
# from connection loss. Not in EventMachine 0.12.x, anyway. MK.
- @had_successfull_connected_before = true
+ if @had_successfully_connected_before
+ @recovered = true
+
+
+ self.start_automatic_recovery
+ end
+
+ # now we can set it. MK.
+ @had_successfully_connected_before = true
@reconnecting = false
self.handshake
end
@@ -266,25 +282,25 @@
# * Our peer closes TCP connection down
# * There is a network connection issue
# * Initial TCP connection fails
# @private
def unbind(exception = nil)
- if !@tcp_connection_established && !@had_successfull_connected_before && !@intentionally_closing_connection
+ if !@tcp_connection_established && !@had_successfully_connected_before && !@intentionally_closing_connection
@tcp_connection_failed = true
self.tcp_connection_failed
end
closing!
@tcp_connection_established = false
- self.handle_connection_interruption
+ self.handle_connection_interruption if @reconnecting
@disconnection_deferrable.succeed
closed!
- self.tcp_connection_lost if !@intentionally_closing_connection && @had_successfull_connected_before
+ self.tcp_connection_lost if !@intentionally_closing_connection && @had_successfully_connected_before
# since AMQP spec dictates that authentication failure is a protocol exception
# and protocol exceptions result in connection closure, check whether we are
# in the authentication stage. If so, it is likely to signal an authentication
# issue. Java client behaves the same way. MK.
@@ -384,9 +400,9 @@
start_tls(tls_options)
elsif tls_options
start_tls
end
end # upgrade_to_tls_if_necessary
- end # EventMachineClient
+ end # EventMachineClient
end # Async
end # Client
end # AMQ