lib/reactor/container.rb in qpid_proton-0.17.0 vs lib/reactor/container.rb in qpid_proton-0.18.0

- old
+ new

@@ -17,11 +17,11 @@ # under the License. #++ module Qpid::Proton::Reactor - # @private + private class InternalTransactionHandler < Qpid::Proton::Handler::OutgoingMessageHandler def initialize super end @@ -33,21 +33,18 @@ end end end - + public # A representation of the AMQP concept of a container which, loosely # speaking, is something that establishes links to or from another # container on which messages are transferred. # # This is an extension to the Reactor classthat adds convenience methods # for creating instances of Qpid::Proton::Connection, Qpid::Proton::Sender # and Qpid::Proton::Receiver. - # - # @example - # class Container < Reactor include Qpid::Proton::Util::Reactor include Qpid::Proton::Util::UUID @@ -72,46 +69,49 @@ @trigger = nil @container_id = generate_uuid end end - # Initiates the establishment of an AMQP connection. + # Initiate an AMQP connection. # - # @param options [Hash] A hash of named arguments. + # @param url [String] Connect to URL host:port, using user:password@ if present + # @param opts [Hash] Named options + # For backwards compatibility, can be called with a single parameter opts. # - def connect(options = {}) - conn = self.connection(options[:handler]) - conn.container = self.container_id || generate_uuid - connector = Connector.new(conn) - conn.overrides = connector - if !options[:url].nil? - connector.address = URLs.new([options[:url]]) - elsif !options[:urls].nil? - connector.address = URLs.new(options[:urls]) - elsif !options[:address].nil? - connector.address = URLs.new([Qpid::Proton::URL.new(options[:address])]) - else - raise ::ArgumentError.new("either :url or :urls or :address required") + # @option opts [String] :url Connect to URL host:port using user:password@ if present. + # @option opts [String] :user user name for authentication if not given by URL + # @option opts [String] :password password for authentication if not given by URL + # @option opts [Numeric] :idle_timeout seconds before closing an idle connection, + # can be a fractional value. + # @option opts [Boolean] :sasl_enabled Enable or disable SASL. + # @option opts [Boolean] :sasl_allow_insecure_mechs Allow mechanisms that disclose clear text + # passwords, even over an insecure connection. By default, such mechanisms are only allowed + # when SSL is enabled. + # @option opts [String] :sasl_allowed_mechs the allowed SASL mechanisms for use on the connection. + # + # @option opts [String] :address *deprecated* use the :url option + # @option opts [Numeric] :heartbeat milliseconds before closing an idle connection. + # *deprecated* use :idle_timeout => heartbeat/1000 + # + # @return [Connection] the new connection + # + def connect(url, opts = {}) + # Backwards compatible with old connect(options) + if url.is_a? Hash and opts.empty? + opts = url + url = nil end - - connector.heartbeat = options[:heartbeat] if !options[:heartbeat].nil? - if !options[:reconnect].nil? - connector.reconnect = options[:reconnect] - else - connector.reconnect = Backoff.new() - end - - connector.ssl_domain = SessionPerConnection.new # TODO seems this should be configurable - - conn.open - + conn = self.connection(opts[:handler]) + conn.container = self.container_id || generate_uuid + connector = Connector.new(conn, url, opts) return conn end + private def _session(context) if context.is_a?(Qpid::Proton::URL) - return self._session(self.connect(:url => context)) + return _session(self.connect(:url => context)) elsif context.is_a?(Qpid::Proton::Session) return context elsif context.is_a?(Qpid::Proton::Connection) if context.session_policy? return context.session_policy.session(context) @@ -121,20 +121,21 @@ else return context.session end end + public # Initiates the establishment of a link over which messages can be sent. # # @param context [String, URL] The context. # @param opts [Hash] Additional options. - # @param opts [String, Qpid::Proton::URL] The target address. - # @param opts [String] :source The source address. - # @param opts [Boolean] :dynamic - # @param opts [Object] :handler - # @param opts [Object] :tag_generator The tag generator. - # @param opts [Hash] :options Addtional link options + # @option opts [String, Qpid::Proton::URL] The target address. + # @option opts [String] :source The source address. + # @option opts [Boolean] :dynamic + # @option opts [Object] :handler + # @option opts [Object] :tag_generator The tag generator. + # @option opts [Hash] :options Addtional link options # # @return [Sender] The sender. # def create_sender(context, opts = {}) if context.is_a?(::String) @@ -144,20 +145,20 @@ target = opts[:target] if context.is_a?(Qpid::Proton::URL) && target.nil? target = context.path end - session = self._session(context) + session = _session(context) sender = session.sender(opts[:name] || id(session.connection.container, target, opts[:source])) sender.source.address = opts[:source] if !opts[:source].nil? sender.target.address = target if target sender.handler = opts[:handler] if !opts[:handler].nil? sender.tag_generator = opts[:tag_generator] if !opts[:tag_gnenerator].nil? - self._apply_link_options(opts[:options], sender) + _apply_link_options(opts[:options], sender) sender.open return sender end # Initiates the establishment of a link over which messages can be received. @@ -190,20 +191,20 @@ source = opts[:source] if context.is_a?(Qpid::Proton::URL) && source.nil? source = context.path end - session = self._session(context) + session = _session(context) receiver = session.receiver(opts[:name] || id(session.connection.container, source, opts[:target])) receiver.source.address = source if source receiver.source.dynamic = true if opts.has_key?(:dynamic) && opts[:dynamic] receiver.target.address = opts[:target] if !opts[:target].nil? receiver.handler = opts[:handler] if !opts[:handler].nil? - self._apply_link_options(opts[:options], receiver) + _apply_link_options(opts[:options], receiver) receiver.open return receiver end def declare_transaction(context, handler = nil, settle_before_discharge = false) @@ -234,13 +235,10 @@ acceptor.ssl_domain(ssl_config) end return acceptor end - def do_work(timeout = nil) - self.timeout = timeout unless timeout.nil? - self.process - end + private def id(container, remote, local) if !local.nil? && !remote.nil? "#{container}-#{remote}-#{local}" elsif !local.nil?