lib/reactor/connector.rb in qpid_proton-0.17.0 vs lib/reactor/connector.rb in qpid_proton-0.18.0

- old
+ new

@@ -19,31 +19,36 @@ module Qpid::Proton::Reactor class Connector < Qpid::Proton::BaseHandler - attr_accessor :address - attr_accessor :reconnect - attr_accessor :ssl_domain + def initialize(connection, url, opts) + @connection, @opts = connection, opts + @urls = URLs.new(url) if url + opts.each do |k,v| + case k + when :url, :urls, :address + @urls = URLs.new(v) unless @urls + when :reconnect + @reconnect = v + end + end + raise ::ArgumentError.new("no url for connect") unless @urls - def initialize(connection) - @connection = connection - @address = nil - @heartbeat = nil - @reconnect = nil - @ssl_domain = nil + # TODO aconway 2017-08-17: review reconnect configuration and defaults + @reconnect = Backoff.new() unless @reconnect + @ssl_domain = SessionPerConnection.new # TODO seems this should be configurable + @connection.overrides = self + @connection.open end def on_connection_local_open(event) self.connect(event.connection) end def on_connection_remote_open(event) - if !@reconnect.nil? - @reconnect.reset - @transport = nil - end + @reconnect.reset if @reconnect end def on_transport_tail_closed(event) self.on_transport_closed(event) end @@ -71,28 +76,40 @@ def on_connection_remote_close(event) @connection = nil end def connect(connection) - url = @address.next + url = @urls.next + transport = Qpid::Proton::Transport.new + @opts.each do |k,v| + case k + when :user + connection.user = v + when :password + connection.password = v + when :heartbeat + transport.idle_timeout = v.to_i + when :idle_timeout + transport.idle_timeout = v.(v*1000).to_i + when :sasl_enabled + transport.sasl if v + when :sasl_allow_insecure_mechs + transport.sasl.allow_insecure_mechs = v + when :sasl_allowed_mechs, :sasl_mechanisms + transport.sasl.allowed_mechs = v + end + end + + # TODO aconway 2017-08-11: hostname setting is incorrect, reactor only connection.hostname = "#{url.host}:#{url.port}" + connection.user = url.username if url.username && !url.username.empty? + connection.password = url.password if url.password && !url.password.empty? - transport = Qpid::Proton::Transport.new transport.bind(connection) - if !@heartbeat.nil? - transport.idle_timeout = @heartbeat - elsif (url.scheme == "amqps") && !@ssl_domain.nil? + + if (url.scheme == "amqps") && @ssl_domain @ssl = Qpid::Proton::SSL.new(transport, @ssl_domain) - @ss.peer_hostname = url.host - elsif !url.username.nil? - sasl = transport.sasl - if url.username == "anonymous" - sasl.mechanisms("ANONYMOUS") - else - sasl.plain(url.username, url.password) - end + @ssl.peer_hostname = url.host end end - end - end