# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. module Qpid::Proton # An AMQP connection. class Connection < Endpoint PROTON_METHOD_PREFIX = "pn_connection" include Util::Wrapper include Util::Deprecation # @private def self.wrap(impl) return nil if impl.nil? self.fetch_instance(impl, :pn_connection_attachments) || Connection.new(impl) end # @private def initialize(impl = Cproton.pn_connection) super() @impl = impl @overrides = nil @session_policy = nil @link_count = 0 @link_prefix = "" self.class.store_instance(self, :pn_connection_attachments) end # @return [String] The AMQP hostname for the connection. def virtual_host() Cproton.pn_connection_remote_hostname(@impl); end deprecated_alias :remote_hostname, :virtual_host # @!attribute hostname # @deprecated use {#virtual_host} proton_set_get :hostname # @return [String] User name used for authentication (outgoing connection) # or the authenticated user name (incoming connection) def user() Cproton.pn_connection_get_user(impl) or (connection.transport && connection.transport.user) end # @deprecated no replacement def overrides?() deprecated __method__; false; end # @deprecated no replacement def session_policy?() deprecated __method__; false; end # @return [Connection] self def connection() self; end # @return [Transport, nil] transport bound to this connection, or nil if unbound. def transport() Transport.wrap(Cproton.pn_connection_transport(@impl)); end # @return AMQP container ID advertised by the remote peer. # To get the local container ID use {#container} and {Container#id} def container_id() Cproton.pn_connection_remote_container(@impl); end deprecated_alias :remote_container, :container_id # @return [Container] the container managing this connection attr_reader :container # @return [Array] offered capabilities provided by the remote peer def offered_capabilities Codec::Data.to_object(Cproton.pn_connection_remote_offered_capabilities(@impl)) end deprecated_alias :remote_offered_capabilities, :offered_capabilities # @return [Array] desired capabilities provided by the remote peer def desired_capabilities Codec::Data.to_object(Cproton.pn_connection_remote_desired_capabilities(@impl)) end deprecated_alias :remote_desired_capabilities, :desired_capabilities # @return [Hash] connection-properties provided by the remote peer def properties Codec::Data.to_object(Cproton.pn_connection_remote_properties(@impl)) end deprecated_alias :remote_properties, :properties # Open the local end of the connection. # # @option opts [MessagingHandler] :handler handler for events related to this connection. # # @option opts [String] :user User name for authentication # @option opts [String] :password Authentication secret # @option opts [String] :virtual_host Virtual host name # @option opts [String] :container_id (provided by {Container}) override advertised container-id # # @option opts [HashObject>] :properties Application-defined properties # @option opts [Array] :offered_capabilities Extensions the endpoint supports # @option opts [Array] :desired_capabilities Extensions the endpoint can use # # @option opts [Numeric] :idle_timeout Seconds before closing an idle connection # @option opts [Integer] :max_sessions Limit the number of active sessions # @option opts [Integer] :max_frame_size Limit the size of AMQP frames # # @option opts [Boolean] :sasl_enabled (false) Enable or disable SASL. # @option opts [Boolean] :sasl_allow_insecure_mechs (false) Allow mechanisms send secrets in clear text # @option opts [String] :sasl_allowed_mechs SASL mechanisms allowed by this end of the connection # # @option opts [SSLDomain] :ssl_domain SSL configuration domain. # def open(opts=nil) return if local_active? apply opts if opts Cproton.pn_connection_open(@impl) end # @private def apply opts # NOTE: Only connection options are set here. Transport options are set # with {Transport#apply} from the connection_driver (or in # on_connection_bound if not using a connection_driver) @container = opts[:container] cid = opts[:container_id] || (@container && @container.id) || SecureRandom.uuid cid = cid.to_s if cid.is_a? Symbol # Allow symbols as container name Cproton.pn_connection_set_container(@impl, cid) Cproton.pn_connection_set_user(@impl, opts[:user]) if opts[:user] Cproton.pn_connection_set_password(@impl, opts[:password]) if opts[:password] Cproton.pn_connection_set_hostname(@impl, opts[:virtual_host]) if opts[:virtual_host] @link_prefix = opts[:link_prefix] || cid Codec::Data.from_object(Cproton.pn_connection_offered_capabilities(@impl), Types.symbol_array(opts[:offered_capabilities])) Codec::Data.from_object(Cproton.pn_connection_desired_capabilities(@impl), Types.symbol_array(opts[:desired_capabilities])) Codec::Data.from_object(Cproton.pn_connection_properties(@impl), Types.symbol_keys(opts[:properties])) end # Idle-timeout advertised by the remote peer, in seconds. # @return [Numeric] Idle-timeout advertised by the remote peer, in seconds. # @return [nil] if the peer does not advertise an idle time-out def idle_timeout() if transport && (t = transport.remote_idle_timeout) Rational(t, 1000) # More precise than Float end end # Session limit advertised by the remote peer. See {Connection#open :max_sessions} # @return [Integer] maximum number of sessions per connection allowed by remote peer. # @return [nil] no specific limit is set. def max_sessions() raise StateError, "connection not bound to transport" unless transport max = transport.remote_channel_max return max.zero? ? nil : max end # Maximum frame size, in bytes, advertised by the remote peer. # See {Connection#open :max_frame_size} # @return [Integer] maximum frame size # @return [nil] no limit def max_frame_size() raise StateError, "connection not bound to transport" unless transport max = transport.remote_max_frame return max.zero? ? nil : max end # Closes the local end of the connection. The remote end may or may not be closed. # @param error [Condition] Optional error condition to send with the close. def close(error=nil) Condition.assign(_local_condition, error) Cproton.pn_connection_close(@impl) end # Gets the endpoint current state flags # # @see Endpoint#LOCAL_UNINIT # @see Endpoint#LOCAL_ACTIVE # @see Endpoint#LOCAL_CLOSED # @see Endpoint#LOCAL_MASK # # @return [Integer] The state flags. # def state Cproton.pn_connection_state(@impl) end # Returns the default session for this connection. # # @return [Session] The session. # def default_session @session ||= open_session end # @deprecated use #default_session() deprecated_alias :session, :default_session # Open a new session on this connection. def open_session s = Session.wrap(Cproton.pn_session(@impl)) s.open return s end # Open a sender on the default_session # @option opts (see Session#open_sender) def open_sender(opts=nil) default_session.open_sender(opts) end # Open a on the default_session # @option opts (see Session#open_receiver) def open_receiver(opts=nil) default_session.open_receiver(opts) end # @deprecated use {#each_session} def session_head(mask) deprecated __method__, "#each_session" Session.wrap(Cproton.pn_session_head(@impl, mask)) end # Get the sessions on this connection. # @overload each_session # @yieldparam s [Session] pass each session to block # @overload each_session # @return [Enumerator] enumerator over sessions def each_session(&block) return enum_for(:each_session) unless block_given? s = Cproton.pn_session_head(@impl, 0); while s yield Session.wrap(s) s = Cproton.pn_session_next(s, 0) end self end # @deprecated use {#each_link} def link_head(mask) deprecated __method__, "#each_link" Link.wrap(Cproton.pn_link_head(@impl, mask)) end # Get the links on this connection. # @overload each_link # @yieldparam l [Link] pass each link to block # @overload each_link # @return [Enumerator] enumerator over links def each_link return enum_for(:each_link) unless block_given? l = Cproton.pn_link_head(@impl, 0); while l yield Link.wrap(l) l = Cproton.pn_link_next(l, 0) end self end # Get the {Sender} links - see {#each_link} def each_sender() each_link.select { |l| l.sender? }; end # Get the {Receiver} links - see {#each_link} def each_receiver() each_link.select { |l| l.receiver? }; end # @deprecated use {#MessagingHandler} to handle work def work_head deprecated __method__ Delivery.wrap(Cproton.pn_work_head(@impl)) end # @deprecated use {#condition} def error deprecated __method__, "#condition" Cproton.pn_error_code(Cproton.pn_connection_error(@impl)) end # @private Generate a unique link name, internal use only. def link_name() @link_prefix + "/" + (@link_count += 1).to_s(32) end protected def _local_condition Cproton.pn_connection_condition(@impl) end def _remote_condition Cproton.pn_connection_remote_condition(@impl) end proton_get :attachments end end