lib/reactor/container.rb in qpid_proton-0.18.1 vs lib/reactor/container.rb in qpid_proton-0.19.0
- old
+ new
@@ -1,6 +1,5 @@
-#--
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
@@ -13,258 +12,65 @@
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-#++
-module Qpid::Proton::Reactor
- private
- class InternalTransactionHandler < Qpid::Proton::Handler::OutgoingMessageHandler
+module Qpid::Proton
+ module Reactor
- def initialize
- super
- end
+ # @deprecated use {Qpid::Proton::Container}
+ class Container < Qpid::Proton::Container
+ include Util::Deprecation
- def on_settled(event)
- if event.delivery.respond_to? :transaction
- event.transaction = event.delivery.transaction
- event.delivery.transaction.handle_outcome(event)
- end
- end
+ private
+ alias super_connect connect # Access to superclass method
- end
+ public
- public
- # A representation of the AMQP concept of a container which, loosely
- # speaking, is something that establishes links to or from another
- # container on which messages are transferred.
- #
- # This is an extension to the Reactor classthat adds convenience methods
- # for creating instances of Qpid::Proton::Connection, Qpid::Proton::Sender
- # and Qpid::Proton::Receiver.
- class Container < Reactor
+ # @deprecated use {Qpid::Proton::Container}
+ def initialize(handlers, opts=nil)
+ deprecated Qpid::Proton::Reactor::Container, Qpid::Proton::Container
+ h = handlers || (opts && opts[:global_handler]) || Handler::ReactorMessagingAdapter.new(nil)
+ id = opts && opts[:container_id]
+ super(h, id)
+ end
- include Qpid::Proton::Util::Reactor
+ alias container_id id
+ alias global_handler handler
- include Qpid::Proton::Util::UUID
-
- attr_accessor :container_id
- attr_accessor :global_handler
-
- def initialize(handlers, options = {})
- super(handlers, options)
-
- # only do the following if we're creating a new instance
- if !options.has_key?(:impl)
- @ssl = SSLConfig.new
- if options[:global_handler]
- self.global_handler = GlobalOverrides.new(options[:global_handler])
- else
- # very ugly, but using self.global_handler doesn't work in the constructor
- ghandler = Reactor.instance_method(:global_handler).bind(self).call
- ghandler = GlobalOverrides.new(ghandler)
- Reactor.instance_method(:global_handler=).bind(self).call(ghandler)
- end
- @trigger = nil
- @container_id = generate_uuid
+ def connect(opts=nil)
+ url = opts && (opts[:url] || opts[:address])
+ raise ::ArgumentError.new, "no :url or :address option provided" unless url
+ super(url, opts)
end
- end
- # Initiate an AMQP connection.
- #
- # @param url [String] Connect to URL host:port, using user:password@ if present
- # @param opts [Hash] Named options
- # For backwards compatibility, can be called with a single parameter opts.
- #
- # @option opts [String] :url Connect to URL host:port using user:password@ if present.
- # @option opts [String] :user user name for authentication if not given by URL
- # @option opts [String] :password password for authentication if not given by URL
- # @option opts [Numeric] :idle_timeout seconds before closing an idle connection,
- # can be a fractional value.
- # @option opts [Boolean] :sasl_enabled Enable or disable SASL.
- # @option opts [Boolean] :sasl_allow_insecure_mechs Allow mechanisms that disclose clear text
- # passwords, even over an insecure connection. By default, such mechanisms are only allowed
- # when SSL is enabled.
- # @option opts [String] :sasl_allowed_mechs the allowed SASL mechanisms for use on the connection.
- #
- # @option opts [String] :address *deprecated* use the :url option
- # @option opts [Numeric] :heartbeat milliseconds before closing an idle connection.
- # *deprecated* use :idle_timeout => heartbeat/1000
- #
- # @return [Connection] the new connection
- #
- def connect(url, opts = {})
- # Backwards compatible with old connect(options)
- if url.is_a? Hash and opts.empty?
- opts = url
- url = nil
- end
- conn = self.connection(opts[:handler])
- conn.container = self.container_id || generate_uuid
- connector = Connector.new(conn, url, opts)
- return conn
- end
-
- private
- def _session(context)
- if context.is_a?(Qpid::Proton::URL)
- return _session(self.connect(:url => context))
- elsif context.is_a?(Qpid::Proton::Session)
- return context
- elsif context.is_a?(Qpid::Proton::Connection)
- if context.session_policy?
- return context.session_policy.session(context)
- else
- return self.create_session(context)
+ def create_sender(context, opts=nil)
+ c = context if context.is_a? Qpid::Proton::Connection
+ unless c
+ url = Qpid::Proton::uri context
+ c = super_connect(url, opts)
+ opts ||= {}
+ opts[:target] ||= url.amqp_address
end
- else
- return context.session
+ c.open_sender opts
end
- end
- public
- # Initiates the establishment of a link over which messages can be sent.
- #
- # @param context [String, URL] The context.
- # @param opts [Hash] Additional options.
- # @option opts [String, Qpid::Proton::URL] The target address.
- # @option opts [String] :source The source address.
- # @option opts [Boolean] :dynamic
- # @option opts [Object] :handler
- # @option opts [Object] :tag_generator The tag generator.
- # @option opts [Hash] :options Addtional link options
- #
- # @return [Sender] The sender.
- #
- def create_sender(context, opts = {})
- if context.is_a?(::String)
- context = Qpid::Proton::URL.new(context)
- end
-
- target = opts[:target]
- if context.is_a?(Qpid::Proton::URL) && target.nil?
- target = context.path
- end
-
- session = _session(context)
-
- sender = session.sender(opts[:name] ||
- id(session.connection.container,
- target, opts[:source]))
- sender.source.address = opts[:source] if !opts[:source].nil?
- sender.target.address = target if target
- sender.handler = opts[:handler] if !opts[:handler].nil?
- sender.tag_generator = opts[:tag_generator] if !opts[:tag_gnenerator].nil?
- _apply_link_options(opts[:options], sender)
- sender.open
- return sender
- end
-
- # Initiates the establishment of a link over which messages can be received.
- #
- # There are two accepted arguments for the context
- # 1. If a Connection is supplied then the link is established using that
- # object. The source, and optionally the target, address can be supplied
- # 2. If it is a String or a URL then a new Connection is created on which
- # the link will be attached. If a path is specified, but not the source
- # address, then the path of the URL is used as the target address.
- #
- # The name will be generated for the link if one is not specified.
- #
- # @param context [Connection, URL, String] The connection or the address.
- # @param opts [Hash] Additional otpions.
- # @option opts [String, Qpid::Proton::URL] The source address.
- # @option opts [String] :target The target address
- # @option opts [String] :name The link name.
- # @option opts [Boolean] :dynamic
- # @option opts [Object] :handler
- # @option opts [Hash] :options Additional link options.
- #
- # @return [Receiver
- #
- def create_receiver(context, opts = {})
- if context.is_a?(::String)
- context = Qpid::Proton::URL.new(context)
- end
-
- source = opts[:source]
- if context.is_a?(Qpid::Proton::URL) && source.nil?
- source = context.path
- end
-
- session = _session(context)
-
- receiver = session.receiver(opts[:name] ||
- id(session.connection.container,
- source, opts[:target]))
- receiver.source.address = source if source
- receiver.source.dynamic = true if opts.has_key?(:dynamic) && opts[:dynamic]
- receiver.target.address = opts[:target] if !opts[:target].nil?
- receiver.handler = opts[:handler] if !opts[:handler].nil?
- _apply_link_options(opts[:options], receiver)
- receiver.open
- return receiver
- end
-
- def declare_transaction(context, handler = nil, settle_before_discharge = false)
- if context.respond_to? :txn_ctl && !context.__send__(:txn_ctl).nil?
- class << context
- attr_accessor :txn_ctl
+ def create_receiver(context, opts=nil)
+ c = context if context.is_a? Qpid::Proton::Connection
+ unless c
+ url = Qpid::Proton::uri context
+ c = super_connect(url, opts)
+ opts ||= {}
+ opts[:source] ||= url.amqp_address
end
- context.txn_ctl = self.create_sender(context, nil, "txn-ctl",
- InternalTransactionHandler.new())
+ c.open_receiver opts
end
- return Transaction.new(context.txn_ctl, handler, settle_before_discharge)
- end
- # Initiates a server socket, accepting incoming AMQP connections on the
- # interface and port specified.
- #
- # @param url []
- # @param ssl_domain []
- #
- def listen(url, ssl_domain = nil)
- url = Qpid::Proton::URL.new(url)
- acceptor = self.acceptor(url.host, url.port)
- ssl_config = ssl_domain
- if ssl_config.nil? && (url.scheme == 'amqps') && @ssl
- ssl_config = @ssl.server
+ def listen(url, ssl_domain = nil)
+ # TODO aconway 2017-11-29: ssl_domain
+ super(url)
end
- if !ssl_config.nil?
- acceptor.ssl_domain(ssl_config)
- end
- return acceptor
end
-
- private
-
- def id(container, remote, local)
- if !local.nil? && !remote.nil?
- "#{container}-#{remote}-#{local}"
- elsif !local.nil?
- "#{container}-#{local}"
- elsif !remote.nil?
- "#{container}-#{remote}"
- else
- "#{container}-#{generate_uuid}"
- end
- end
-
- def _apply_link_options(options, link)
- if !options.nil? && !options.empty?
- if !options.is_a?(::List)
- options = [Options].flatten
- end
-
- options.each {|option| o.apply(link) if o.test(link)}
- end
- end
-
- def to_s
- "#{self.class}<@impl=#{Cproton.pni_address_of(@impl)}>"
- end
-
end
-
end