#-- # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. #++ module Qpid::Proton::Reactor class Connector < Qpid::Proton::BaseHandler attr_accessor :address attr_accessor :reconnect attr_accessor :ssl_domain def initialize(connection) @connection = connection @address = nil @heartbeat = nil @reconnect = nil @ssl_domain = nil end def on_connection_local_open(event) self.connect(event.connection) end def on_connection_remote_open(event) if !@reconnect.nil? @reconnect.reset @transport = nil end end def on_transport_tail_closed(event) self.on_transport_closed(event) end def on_transport_closed(event) if !@connection.nil? && !(@connection.state & Qpid::Proton::Endpoint::LOCAL_ACTIVE).zero? if !@reconnect.nil? event.transport.unbind delay = @reconnect.next if delay == 0 self.connect(@connection) else event.reactor.schedule(delay, self) end else @connection = nil end end end def on_timer_task(event) self.connect(@connection) end def on_connection_remote_close(event) @connection = nil end def connect(connection) url = @address.next connection.hostname = "#{url.host}:#{url.port}" transport = Qpid::Proton::Transport.new transport.bind(connection) if !@heartbeat.nil? transport.idle_timeout = @heartbeat elsif (url.scheme == "amqps") && !@ssl_domain.nil? @ssl = Qpid::Proton::SSL.new(transport, @ssl_domain) @ss.peer_hostname = url.host elsif !url.username.nil? sasl = transport.sasl if url.username == "anonymous" sasl.mechanisms("ANONYMOUS") else sasl.plain(url.username, url.password) end end end end end