lib/reactor/container.rb in qpid_proton-0.17.0 vs lib/reactor/container.rb in qpid_proton-0.18.0
- old
+ new
@@ -17,11 +17,11 @@
# under the License.
#++
module Qpid::Proton::Reactor
- # @private
+ private
class InternalTransactionHandler < Qpid::Proton::Handler::OutgoingMessageHandler
def initialize
super
end
@@ -33,21 +33,18 @@
end
end
end
-
+ public
# A representation of the AMQP concept of a container which, loosely
# speaking, is something that establishes links to or from another
# container on which messages are transferred.
#
# This is an extension to the Reactor classthat adds convenience methods
# for creating instances of Qpid::Proton::Connection, Qpid::Proton::Sender
# and Qpid::Proton::Receiver.
- #
- # @example
- #
class Container < Reactor
include Qpid::Proton::Util::Reactor
include Qpid::Proton::Util::UUID
@@ -72,46 +69,49 @@
@trigger = nil
@container_id = generate_uuid
end
end
- # Initiates the establishment of an AMQP connection.
+ # Initiate an AMQP connection.
#
- # @param options [Hash] A hash of named arguments.
+ # @param url [String] Connect to URL host:port, using user:password@ if present
+ # @param opts [Hash] Named options
+ # For backwards compatibility, can be called with a single parameter opts.
#
- def connect(options = {})
- conn = self.connection(options[:handler])
- conn.container = self.container_id || generate_uuid
- connector = Connector.new(conn)
- conn.overrides = connector
- if !options[:url].nil?
- connector.address = URLs.new([options[:url]])
- elsif !options[:urls].nil?
- connector.address = URLs.new(options[:urls])
- elsif !options[:address].nil?
- connector.address = URLs.new([Qpid::Proton::URL.new(options[:address])])
- else
- raise ::ArgumentError.new("either :url or :urls or :address required")
+ # @option opts [String] :url Connect to URL host:port using user:password@ if present.
+ # @option opts [String] :user user name for authentication if not given by URL
+ # @option opts [String] :password password for authentication if not given by URL
+ # @option opts [Numeric] :idle_timeout seconds before closing an idle connection,
+ # can be a fractional value.
+ # @option opts [Boolean] :sasl_enabled Enable or disable SASL.
+ # @option opts [Boolean] :sasl_allow_insecure_mechs Allow mechanisms that disclose clear text
+ # passwords, even over an insecure connection. By default, such mechanisms are only allowed
+ # when SSL is enabled.
+ # @option opts [String] :sasl_allowed_mechs the allowed SASL mechanisms for use on the connection.
+ #
+ # @option opts [String] :address *deprecated* use the :url option
+ # @option opts [Numeric] :heartbeat milliseconds before closing an idle connection.
+ # *deprecated* use :idle_timeout => heartbeat/1000
+ #
+ # @return [Connection] the new connection
+ #
+ def connect(url, opts = {})
+ # Backwards compatible with old connect(options)
+ if url.is_a? Hash and opts.empty?
+ opts = url
+ url = nil
end
-
- connector.heartbeat = options[:heartbeat] if !options[:heartbeat].nil?
- if !options[:reconnect].nil?
- connector.reconnect = options[:reconnect]
- else
- connector.reconnect = Backoff.new()
- end
-
- connector.ssl_domain = SessionPerConnection.new # TODO seems this should be configurable
-
- conn.open
-
+ conn = self.connection(opts[:handler])
+ conn.container = self.container_id || generate_uuid
+ connector = Connector.new(conn, url, opts)
return conn
end
+ private
def _session(context)
if context.is_a?(Qpid::Proton::URL)
- return self._session(self.connect(:url => context))
+ return _session(self.connect(:url => context))
elsif context.is_a?(Qpid::Proton::Session)
return context
elsif context.is_a?(Qpid::Proton::Connection)
if context.session_policy?
return context.session_policy.session(context)
@@ -121,20 +121,21 @@
else
return context.session
end
end
+ public
# Initiates the establishment of a link over which messages can be sent.
#
# @param context [String, URL] The context.
# @param opts [Hash] Additional options.
- # @param opts [String, Qpid::Proton::URL] The target address.
- # @param opts [String] :source The source address.
- # @param opts [Boolean] :dynamic
- # @param opts [Object] :handler
- # @param opts [Object] :tag_generator The tag generator.
- # @param opts [Hash] :options Addtional link options
+ # @option opts [String, Qpid::Proton::URL] The target address.
+ # @option opts [String] :source The source address.
+ # @option opts [Boolean] :dynamic
+ # @option opts [Object] :handler
+ # @option opts [Object] :tag_generator The tag generator.
+ # @option opts [Hash] :options Addtional link options
#
# @return [Sender] The sender.
#
def create_sender(context, opts = {})
if context.is_a?(::String)
@@ -144,20 +145,20 @@
target = opts[:target]
if context.is_a?(Qpid::Proton::URL) && target.nil?
target = context.path
end
- session = self._session(context)
+ session = _session(context)
sender = session.sender(opts[:name] ||
id(session.connection.container,
target, opts[:source]))
sender.source.address = opts[:source] if !opts[:source].nil?
sender.target.address = target if target
sender.handler = opts[:handler] if !opts[:handler].nil?
sender.tag_generator = opts[:tag_generator] if !opts[:tag_gnenerator].nil?
- self._apply_link_options(opts[:options], sender)
+ _apply_link_options(opts[:options], sender)
sender.open
return sender
end
# Initiates the establishment of a link over which messages can be received.
@@ -190,20 +191,20 @@
source = opts[:source]
if context.is_a?(Qpid::Proton::URL) && source.nil?
source = context.path
end
- session = self._session(context)
+ session = _session(context)
receiver = session.receiver(opts[:name] ||
id(session.connection.container,
source, opts[:target]))
receiver.source.address = source if source
receiver.source.dynamic = true if opts.has_key?(:dynamic) && opts[:dynamic]
receiver.target.address = opts[:target] if !opts[:target].nil?
receiver.handler = opts[:handler] if !opts[:handler].nil?
- self._apply_link_options(opts[:options], receiver)
+ _apply_link_options(opts[:options], receiver)
receiver.open
return receiver
end
def declare_transaction(context, handler = nil, settle_before_discharge = false)
@@ -234,13 +235,10 @@
acceptor.ssl_domain(ssl_config)
end
return acceptor
end
- def do_work(timeout = nil)
- self.timeout = timeout unless timeout.nil?
- self.process
- end
+ private
def id(container, remote, local)
if !local.nil? && !remote.nil?
"#{container}-#{remote}-#{local}"
elsif !local.nil?