lib/amq/client/async/adapters/event_machine.rb in amq-client-1.0.0.pre2 vs lib/amq/client/async/adapters/event_machine.rb in amq-client-1.0.0

- old
+ new

@@ -29,14 +29,14 @@ # @option settings [String] :host ("127.0.0.1") Hostname AMQ broker runs on. # @option settings [String] :port (5672) Port AMQ broker listens on. # @option settings [String] :vhost ("/") Virtual host to use. # @option settings [String] :user ("guest") Username to use for authentication. # @option settings [String] :pass ("guest") Password to use for authentication. + # @option settings [String] :auth_mechanism ("PLAIN") SASL authentication mechanism to use. # @option settings [String] :ssl (false) Should be use TLS (SSL) for connection? # @option settings [String] :timeout (nil) Connection timeout. - # @option settings [String] :logging (false) Turns logging on or off. - # @option settings [String] :broker (nil) Broker name (use if you intend to use broker-specific features). + # @option settings [Fixnum] :heartbeat (0) Connection heartbeat, in seconds. # @option settings [Fixnum] :frame_max (131072) Maximum frame size to use. If broker cannot support frames this large, broker's maximum value will be used instead. # # @param [Hash] settings def self.connect(settings = {}, &block) @settings = Settings.configure(settings) @@ -162,20 +162,18 @@ } @on_possible_authentication_failure = @settings[:on_possible_authentication_failure] || Proc.new { |settings| raise self.class.authentication_failure_exception_class.new(settings) } - @mechanism = "PLAIN" + @mechanism = @settings.fetch(:auth_mechanism, "PLAIN") @locale = @settings.fetch(:locale, "en_GB") @client_properties = Settings.client_properties.merge(@settings.fetch(:client_properties, Hash.new)) @auto_recovery = (!!@settings[:auto_recovery]) self.reset self.set_pending_connect_timeout((@settings[:timeout] || 3).to_f) unless defined?(JRUBY_VERSION) - - self.initialize_heartbeat_sender if self.heartbeats_enabled? end # initialize(*args) # For EventMachine adapter, this is a no-op. @@ -265,12 +263,10 @@ @had_successfully_connected_before = true @reconnecting = false @handling_skipped_hearbeats = false @last_server_heartbeat = Time.now - self.initialize_heartbeat_sender if self.heartbeat_interval > 0 - self.handshake end # @private def close_connection(*args) @@ -354,19 +350,24 @@ # # @api plugin def handle_skipped_hearbeats if !@handling_skipped_hearbeats && @tcp_connection_established && !@intentionally_closing_connection @handling_skipped_hearbeats = true - @heartbeats_timer.cancel + self.cancel_heartbeat_sender self.run_skipped_heartbeats_callbacks end end # @private def initialize_heartbeat_sender @last_server_heartbeat = Time.now @heartbeats_timer = EventMachine::PeriodicTimer.new(self.heartbeat_interval, &method(:send_heartbeat)) + end + + # @private + def cancel_heartbeat_sender + @heartbeats_timer.cancel if @heartbeats_timer end self.handle(Protocol::Connection::Start) do |connection, frame|