lib/reactor/connector.rb in qpid_proton-0.17.0 vs lib/reactor/connector.rb in qpid_proton-0.18.0
- old
+ new
@@ -19,31 +19,36 @@
module Qpid::Proton::Reactor
class Connector < Qpid::Proton::BaseHandler
- attr_accessor :address
- attr_accessor :reconnect
- attr_accessor :ssl_domain
+ def initialize(connection, url, opts)
+ @connection, @opts = connection, opts
+ @urls = URLs.new(url) if url
+ opts.each do |k,v|
+ case k
+ when :url, :urls, :address
+ @urls = URLs.new(v) unless @urls
+ when :reconnect
+ @reconnect = v
+ end
+ end
+ raise ::ArgumentError.new("no url for connect") unless @urls
- def initialize(connection)
- @connection = connection
- @address = nil
- @heartbeat = nil
- @reconnect = nil
- @ssl_domain = nil
+ # TODO aconway 2017-08-17: review reconnect configuration and defaults
+ @reconnect = Backoff.new() unless @reconnect
+ @ssl_domain = SessionPerConnection.new # TODO seems this should be configurable
+ @connection.overrides = self
+ @connection.open
end
def on_connection_local_open(event)
self.connect(event.connection)
end
def on_connection_remote_open(event)
- if !@reconnect.nil?
- @reconnect.reset
- @transport = nil
- end
+ @reconnect.reset if @reconnect
end
def on_transport_tail_closed(event)
self.on_transport_closed(event)
end
@@ -71,28 +76,40 @@
def on_connection_remote_close(event)
@connection = nil
end
def connect(connection)
- url = @address.next
+ url = @urls.next
+ transport = Qpid::Proton::Transport.new
+ @opts.each do |k,v|
+ case k
+ when :user
+ connection.user = v
+ when :password
+ connection.password = v
+ when :heartbeat
+ transport.idle_timeout = v.to_i
+ when :idle_timeout
+ transport.idle_timeout = v.(v*1000).to_i
+ when :sasl_enabled
+ transport.sasl if v
+ when :sasl_allow_insecure_mechs
+ transport.sasl.allow_insecure_mechs = v
+ when :sasl_allowed_mechs, :sasl_mechanisms
+ transport.sasl.allowed_mechs = v
+ end
+ end
+
+ # TODO aconway 2017-08-11: hostname setting is incorrect, reactor only
connection.hostname = "#{url.host}:#{url.port}"
+ connection.user = url.username if url.username && !url.username.empty?
+ connection.password = url.password if url.password && !url.password.empty?
- transport = Qpid::Proton::Transport.new
transport.bind(connection)
- if !@heartbeat.nil?
- transport.idle_timeout = @heartbeat
- elsif (url.scheme == "amqps") && !@ssl_domain.nil?
+
+ if (url.scheme == "amqps") && @ssl_domain
@ssl = Qpid::Proton::SSL.new(transport, @ssl_domain)
- @ss.peer_hostname = url.host
- elsif !url.username.nil?
- sasl = transport.sasl
- if url.username == "anonymous"
- sasl.mechanisms("ANONYMOUS")
- else
- sasl.plain(url.username, url.password)
- end
+ @ssl.peer_hostname = url.host
end
end
-
end
-
end