lib/reactor/container.rb in qpid_proton-0.18.1 vs lib/reactor/container.rb in qpid_proton-0.19.0

- old
+ new

@@ -1,6 +1,5 @@ -#-- # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the @@ -13,258 +12,65 @@ # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -#++ -module Qpid::Proton::Reactor - private - class InternalTransactionHandler < Qpid::Proton::Handler::OutgoingMessageHandler +module Qpid::Proton + module Reactor - def initialize - super - end + # @deprecated use {Qpid::Proton::Container} + class Container < Qpid::Proton::Container + include Util::Deprecation - def on_settled(event) - if event.delivery.respond_to? :transaction - event.transaction = event.delivery.transaction - event.delivery.transaction.handle_outcome(event) - end - end + private + alias super_connect connect # Access to superclass method - end + public - public - # A representation of the AMQP concept of a container which, loosely - # speaking, is something that establishes links to or from another - # container on which messages are transferred. - # - # This is an extension to the Reactor classthat adds convenience methods - # for creating instances of Qpid::Proton::Connection, Qpid::Proton::Sender - # and Qpid::Proton::Receiver. - class Container < Reactor + # @deprecated use {Qpid::Proton::Container} + def initialize(handlers, opts=nil) + deprecated Qpid::Proton::Reactor::Container, Qpid::Proton::Container + h = handlers || (opts && opts[:global_handler]) || Handler::ReactorMessagingAdapter.new(nil) + id = opts && opts[:container_id] + super(h, id) + end - include Qpid::Proton::Util::Reactor + alias container_id id + alias global_handler handler - include Qpid::Proton::Util::UUID - - attr_accessor :container_id - attr_accessor :global_handler - - def initialize(handlers, options = {}) - super(handlers, options) - - # only do the following if we're creating a new instance - if !options.has_key?(:impl) - @ssl = SSLConfig.new - if options[:global_handler] - self.global_handler = GlobalOverrides.new(options[:global_handler]) - else - # very ugly, but using self.global_handler doesn't work in the constructor - ghandler = Reactor.instance_method(:global_handler).bind(self).call - ghandler = GlobalOverrides.new(ghandler) - Reactor.instance_method(:global_handler=).bind(self).call(ghandler) - end - @trigger = nil - @container_id = generate_uuid + def connect(opts=nil) + url = opts && (opts[:url] || opts[:address]) + raise ::ArgumentError.new, "no :url or :address option provided" unless url + super(url, opts) end - end - # Initiate an AMQP connection. - # - # @param url [String] Connect to URL host:port, using user:password@ if present - # @param opts [Hash] Named options - # For backwards compatibility, can be called with a single parameter opts. - # - # @option opts [String] :url Connect to URL host:port using user:password@ if present. - # @option opts [String] :user user name for authentication if not given by URL - # @option opts [String] :password password for authentication if not given by URL - # @option opts [Numeric] :idle_timeout seconds before closing an idle connection, - # can be a fractional value. - # @option opts [Boolean] :sasl_enabled Enable or disable SASL. - # @option opts [Boolean] :sasl_allow_insecure_mechs Allow mechanisms that disclose clear text - # passwords, even over an insecure connection. By default, such mechanisms are only allowed - # when SSL is enabled. - # @option opts [String] :sasl_allowed_mechs the allowed SASL mechanisms for use on the connection. - # - # @option opts [String] :address *deprecated* use the :url option - # @option opts [Numeric] :heartbeat milliseconds before closing an idle connection. - # *deprecated* use :idle_timeout => heartbeat/1000 - # - # @return [Connection] the new connection - # - def connect(url, opts = {}) - # Backwards compatible with old connect(options) - if url.is_a? Hash and opts.empty? - opts = url - url = nil - end - conn = self.connection(opts[:handler]) - conn.container = self.container_id || generate_uuid - connector = Connector.new(conn, url, opts) - return conn - end - - private - def _session(context) - if context.is_a?(Qpid::Proton::URL) - return _session(self.connect(:url => context)) - elsif context.is_a?(Qpid::Proton::Session) - return context - elsif context.is_a?(Qpid::Proton::Connection) - if context.session_policy? - return context.session_policy.session(context) - else - return self.create_session(context) + def create_sender(context, opts=nil) + c = context if context.is_a? Qpid::Proton::Connection + unless c + url = Qpid::Proton::uri context + c = super_connect(url, opts) + opts ||= {} + opts[:target] ||= url.amqp_address end - else - return context.session + c.open_sender opts end - end - public - # Initiates the establishment of a link over which messages can be sent. - # - # @param context [String, URL] The context. - # @param opts [Hash] Additional options. - # @option opts [String, Qpid::Proton::URL] The target address. - # @option opts [String] :source The source address. - # @option opts [Boolean] :dynamic - # @option opts [Object] :handler - # @option opts [Object] :tag_generator The tag generator. - # @option opts [Hash] :options Addtional link options - # - # @return [Sender] The sender. - # - def create_sender(context, opts = {}) - if context.is_a?(::String) - context = Qpid::Proton::URL.new(context) - end - - target = opts[:target] - if context.is_a?(Qpid::Proton::URL) && target.nil? - target = context.path - end - - session = _session(context) - - sender = session.sender(opts[:name] || - id(session.connection.container, - target, opts[:source])) - sender.source.address = opts[:source] if !opts[:source].nil? - sender.target.address = target if target - sender.handler = opts[:handler] if !opts[:handler].nil? - sender.tag_generator = opts[:tag_generator] if !opts[:tag_gnenerator].nil? - _apply_link_options(opts[:options], sender) - sender.open - return sender - end - - # Initiates the establishment of a link over which messages can be received. - # - # There are two accepted arguments for the context - # 1. If a Connection is supplied then the link is established using that - # object. The source, and optionally the target, address can be supplied - # 2. If it is a String or a URL then a new Connection is created on which - # the link will be attached. If a path is specified, but not the source - # address, then the path of the URL is used as the target address. - # - # The name will be generated for the link if one is not specified. - # - # @param context [Connection, URL, String] The connection or the address. - # @param opts [Hash] Additional otpions. - # @option opts [String, Qpid::Proton::URL] The source address. - # @option opts [String] :target The target address - # @option opts [String] :name The link name. - # @option opts [Boolean] :dynamic - # @option opts [Object] :handler - # @option opts [Hash] :options Additional link options. - # - # @return [Receiver - # - def create_receiver(context, opts = {}) - if context.is_a?(::String) - context = Qpid::Proton::URL.new(context) - end - - source = opts[:source] - if context.is_a?(Qpid::Proton::URL) && source.nil? - source = context.path - end - - session = _session(context) - - receiver = session.receiver(opts[:name] || - id(session.connection.container, - source, opts[:target])) - receiver.source.address = source if source - receiver.source.dynamic = true if opts.has_key?(:dynamic) && opts[:dynamic] - receiver.target.address = opts[:target] if !opts[:target].nil? - receiver.handler = opts[:handler] if !opts[:handler].nil? - _apply_link_options(opts[:options], receiver) - receiver.open - return receiver - end - - def declare_transaction(context, handler = nil, settle_before_discharge = false) - if context.respond_to? :txn_ctl && !context.__send__(:txn_ctl).nil? - class << context - attr_accessor :txn_ctl + def create_receiver(context, opts=nil) + c = context if context.is_a? Qpid::Proton::Connection + unless c + url = Qpid::Proton::uri context + c = super_connect(url, opts) + opts ||= {} + opts[:source] ||= url.amqp_address end - context.txn_ctl = self.create_sender(context, nil, "txn-ctl", - InternalTransactionHandler.new()) + c.open_receiver opts end - return Transaction.new(context.txn_ctl, handler, settle_before_discharge) - end - # Initiates a server socket, accepting incoming AMQP connections on the - # interface and port specified. - # - # @param url [] - # @param ssl_domain [] - # - def listen(url, ssl_domain = nil) - url = Qpid::Proton::URL.new(url) - acceptor = self.acceptor(url.host, url.port) - ssl_config = ssl_domain - if ssl_config.nil? && (url.scheme == 'amqps') && @ssl - ssl_config = @ssl.server + def listen(url, ssl_domain = nil) + # TODO aconway 2017-11-29: ssl_domain + super(url) end - if !ssl_config.nil? - acceptor.ssl_domain(ssl_config) - end - return acceptor end - - private - - def id(container, remote, local) - if !local.nil? && !remote.nil? - "#{container}-#{remote}-#{local}" - elsif !local.nil? - "#{container}-#{local}" - elsif !remote.nil? - "#{container}-#{remote}" - else - "#{container}-#{generate_uuid}" - end - end - - def _apply_link_options(options, link) - if !options.nil? && !options.empty? - if !options.is_a?(::List) - options = [Options].flatten - end - - options.each {|option| o.apply(link) if o.test(link)} - end - end - - def to_s - "#{self.class}<@impl=#{Cproton.pni_address_of(@impl)}>" - end - end - end