#-- # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. #++ module Qpid::Proton::Reactor # @private class InternalTransactionHandler < Qpid::Proton::Handler::OutgoingMessageHandler def initialize super end def on_settled(event) if event.delivery.respond_to? :transaction event.transaction = event.delivery.transaction event.delivery.transaction.handle_outcome(event) end end end # A representation of the AMQP concept of a container which, loosely # speaking, is something that establishes links to or from another # container on which messages are transferred. # # This is an extension to the Reactor classthat adds convenience methods # for creating instances of Qpid::Proton::Connection, Qpid::Proton::Sender # and Qpid::Proton::Receiver. # # @example # class Container < Reactor include Qpid::Proton::Util::Reactor include Qpid::Proton::Util::UUID attr_accessor :container_id attr_accessor :global_handler def initialize(handlers, options = {}) super(handlers, options) # only do the following if we're creating a new instance if !options.has_key?(:impl) @ssl = SSLConfig.new if options[:global_handler] self.global_handler = GlobalOverrides.new(options[:global_handler]) else # very ugly, but using self.global_handler doesn't work in the constructor ghandler = Reactor.instance_method(:global_handler).bind(self).call ghandler = GlobalOverrides.new(ghandler) Reactor.instance_method(:global_handler=).bind(self).call(ghandler) end @trigger = nil @container_id = generate_uuid end end # Initiates the establishment of an AMQP connection. # # @param options [Hash] A hash of named arguments. # def connect(options = {}) conn = self.connection(options[:handler]) conn.container = self.container_id || generate_uuid connector = Connector.new(conn) conn.overrides = connector if !options[:url].nil? connector.address = URLs.new([options[:url]]) elsif !options[:urls].nil? connector.address = URLs.new(options[:urls]) elsif !options[:address].nil? connector.address = URLs.new([Qpid::Proton::URL.new(options[:address])]) else raise ::ArgumentError.new("either :url or :urls or :address required") end connector.heartbeat = options[:heartbeat] if !options[:heartbeat].nil? if !options[:reconnect].nil? connector.reconnect = options[:reconnect] else connector.reconnect = Backoff.new() end connector.ssl_domain = SessionPerConnection.new # TODO seems this should be configurable conn.open return conn end def _session(context) if context.is_a?(Qpid::Proton::URL) return self._session(self.connect(:url => context)) elsif context.is_a?(Qpid::Proton::Session) return context elsif context.is_a?(Qpid::Proton::Connection) if context.session_policy? return context.session_policy.session(context) else return self.create_session(context) end else return context.session end end # Initiates the establishment of a link over which messages can be sent. # # @param context [String, URL] The context. # @param opts [Hash] Additional options. # @param opts [String, Qpid::Proton::URL] The target address. # @param opts [String] :source The source address. # @param opts [Boolean] :dynamic # @param opts [Object] :handler # @param opts [Object] :tag_generator The tag generator. # @param opts [Hash] :options Addtional link options # # @return [Sender] The sender. # def create_sender(context, opts = {}) if context.is_a?(::String) context = Qpid::Proton::URL.new(context) end target = opts[:target] if context.is_a?(Qpid::Proton::URL) && target.nil? target = context.path end session = self._session(context) sender = session.sender(opts[:name] || id(session.connection.container, target, opts[:source])) sender.source.address = opts[:source] if !opts[:source].nil? sender.target.address = target if target sender.handler = opts[:handler] if !opts[:handler].nil? sender.tag_generator = opts[:tag_generator] if !opts[:tag_gnenerator].nil? self._apply_link_options(opts[:options], sender) sender.open return sender end # Initiates the establishment of a link over which messages can be received. # # There are two accepted arguments for the context # 1. If a Connection is supplied then the link is established using that # object. The source, and optionally the target, address can be supplied # 2. If it is a String or a URL then a new Connection is created on which # the link will be attached. If a path is specified, but not the source # address, then the path of the URL is used as the target address. # # The name will be generated for the link if one is not specified. # # @param context [Connection, URL, String] The connection or the address. # @param opts [Hash] Additional otpions. # @option opts [String, Qpid::Proton::URL] The source address. # @option opts [String] :target The target address # @option opts [String] :name The link name. # @option opts [Boolean] :dynamic # @option opts [Object] :handler # @option opts [Hash] :options Additional link options. # # @return [Receiver # def create_receiver(context, opts = {}) if context.is_a?(::String) context = Qpid::Proton::URL.new(context) end source = opts[:source] if context.is_a?(Qpid::Proton::URL) && source.nil? source = context.path end session = self._session(context) receiver = session.receiver(opts[:name] || id(session.connection.container, source, opts[:target])) receiver.source.address = source if source receiver.source.dynamic = true if opts.has_key?(:dynamic) && opts[:dynamic] receiver.target.address = opts[:target] if !opts[:target].nil? receiver.handler = opts[:handler] if !opts[:handler].nil? self._apply_link_options(opts[:options], receiver) receiver.open return receiver end def declare_transaction(context, handler = nil, settle_before_discharge = false) if context.respond_to? :txn_ctl && !context.__send__(:txn_ctl).nil? class << context attr_accessor :txn_ctl end context.txn_ctl = self.create_sender(context, nil, "txn-ctl", InternalTransactionHandler.new()) end return Transaction.new(context.txn_ctl, handler, settle_before_discharge) end # Initiates a server socket, accepting incoming AMQP connections on the # interface and port specified. # # @param url [] # @param ssl_domain [] # def listen(url, ssl_domain = nil) url = Qpid::Proton::URL.new(url) acceptor = self.acceptor(url.host, url.port) ssl_config = ssl_domain if ssl_config.nil? && (url.scheme == 'amqps') && @ssl ssl_config = @ssl.server end if !ssl_config.nil? acceptor.ssl_domain(ssl_config) end return acceptor end def do_work(timeout = nil) self.timeout = timeout unless timeout.nil? self.process end def id(container, remote, local) if !local.nil? && !remote.nil? "#{container}-#{remote}-#{local}" elsif !local.nil? "#{container}-#{local}" elsif !remote.nil? "#{container}-#{remote}" else "#{container}-#{generate_uuid}" end end def _apply_link_options(options, link) if !options.nil? && !options.empty? if !options.is_a?(::List) options = [Options].flatten end options.each {|option| o.apply(link) if o.test(link)} end end def to_s "#{self.class}<@impl=#{Cproton.pni_address_of(@impl)}>" end end end