lib/amq/client/async/adapters/event_machine.rb in amq-client-1.0.0.pre2 vs lib/amq/client/async/adapters/event_machine.rb in amq-client-1.0.0
- old
+ new
@@ -29,14 +29,14 @@
# @option settings [String] :host ("127.0.0.1") Hostname AMQ broker runs on.
# @option settings [String] :port (5672) Port AMQ broker listens on.
# @option settings [String] :vhost ("/") Virtual host to use.
# @option settings [String] :user ("guest") Username to use for authentication.
# @option settings [String] :pass ("guest") Password to use for authentication.
+ # @option settings [String] :auth_mechanism ("PLAIN") SASL authentication mechanism to use.
# @option settings [String] :ssl (false) Should be use TLS (SSL) for connection?
# @option settings [String] :timeout (nil) Connection timeout.
- # @option settings [String] :logging (false) Turns logging on or off.
- # @option settings [String] :broker (nil) Broker name (use if you intend to use broker-specific features).
+ # @option settings [Fixnum] :heartbeat (0) Connection heartbeat, in seconds.
# @option settings [Fixnum] :frame_max (131072) Maximum frame size to use. If broker cannot support frames this large, broker's maximum value will be used instead.
#
# @param [Hash] settings
def self.connect(settings = {}, &block)
@settings = Settings.configure(settings)
@@ -162,20 +162,18 @@
}
@on_possible_authentication_failure = @settings[:on_possible_authentication_failure] || Proc.new { |settings|
raise self.class.authentication_failure_exception_class.new(settings)
}
- @mechanism = "PLAIN"
+ @mechanism = @settings.fetch(:auth_mechanism, "PLAIN")
@locale = @settings.fetch(:locale, "en_GB")
@client_properties = Settings.client_properties.merge(@settings.fetch(:client_properties, Hash.new))
@auto_recovery = (!!@settings[:auto_recovery])
self.reset
self.set_pending_connect_timeout((@settings[:timeout] || 3).to_f) unless defined?(JRUBY_VERSION)
-
- self.initialize_heartbeat_sender if self.heartbeats_enabled?
end # initialize(*args)
# For EventMachine adapter, this is a no-op.
@@ -265,12 +263,10 @@
@had_successfully_connected_before = true
@reconnecting = false
@handling_skipped_hearbeats = false
@last_server_heartbeat = Time.now
- self.initialize_heartbeat_sender if self.heartbeat_interval > 0
-
self.handshake
end
# @private
def close_connection(*args)
@@ -354,19 +350,24 @@
#
# @api plugin
def handle_skipped_hearbeats
if !@handling_skipped_hearbeats && @tcp_connection_established && !@intentionally_closing_connection
@handling_skipped_hearbeats = true
- @heartbeats_timer.cancel
+ self.cancel_heartbeat_sender
self.run_skipped_heartbeats_callbacks
end
end
# @private
def initialize_heartbeat_sender
@last_server_heartbeat = Time.now
@heartbeats_timer = EventMachine::PeriodicTimer.new(self.heartbeat_interval, &method(:send_heartbeat))
+ end
+
+ # @private
+ def cancel_heartbeat_sender
+ @heartbeats_timer.cancel if @heartbeats_timer
end
self.handle(Protocol::Connection::Start) do |connection, frame|