# @(#) MQMBID sn=pkoa-L150203 su=_xDgVYKvOEeS63qb0dW9cyg pn=appmsging/ruby/mqlight/lib/mqlight/blocking_client.rb # # # Licensed Materials - Property of IBM # # 5725-P60 # # (C) Copyright IBM Corp. 2014, 2015 # # US Government Users Restricted Rights - Use, duplication or # disclosure restricted by GSA ADP Schedule Contract with # IBM Corp. # require 'thread' require 'securerandom' require 'uri' require 'timeout' module Mqlight # # The MQ Light Client. This can be used to exchange messages between with # the MQ Light server. This version of the client blocks the calling thread # while carrying out messaging operations. # # @note this class uses timeouts in milliseconds with zero meaning: "don't # wait at all" and nil meaning "wait forever - don't time out". class BlockingClient include Qpid::Proton::ExceptionHandling # @return [String] the client id, which can either be explicitly specified # when the client is created or automatically generated. attr_reader :id # @return [Symbol] the current state of the client. This will be one of: # :starting, :started, :stopping, :stopped, :retrying, or :restarted # @note not all states are implemented (currently the client will never # transition into :stopping and :restarted states). attr_reader :state # Increment each time the client has a successful connection. attr_reader :connect_id # Creates a new instance of the client. The client will be created in # starting state. The constructor will make a connection attempt to the # server and report failures (such as "not authorised") as # exceptions. This means that in the golden path case the constructor # will return an instance of the BlockingClient that is in started state. # A code block, yielded to by the constructor can be used to register a # listener that receives notifications when the associated client changes # state. # # @param service [Array, String] a String containing the URL for the service # to connect to, or alternatively an Array containing a list of URLs to # attempt to connect to in turn. User names and passwords may be embedded # into the URL (e.g. amqp://user:pass@host). # @option options [String] :id a unique identifier for this client. A # maximum of one instance of the client (as identified by the value # of this property) can be connected the an MQ Light server at a given # point in time. If another instance of the same client connects, then # the previously connected instance will be disconnected. This is # reported, to the first client, as a ReplacedError being emitted as an # error event and the client transitioning into stopped state. If the id # property is not a valid client identifier (e.g. it contains a colon, # it is too long, or it contains some other forbidden character) then # the function will throw an ArgumentError exception. If this option is # not specified, a probabilistically unique value will be generated by the # client. # @option options [String] :user user name for authentication. # Alternatively, the user name may be embedded in the URL passed via the # service property. If you choose to specify a user name via this # property and also embed a user name in the URL passed via the surface # argument then all the user names must match otherwise an ArgumentError # exception will be thrown. User names and passwords must be specified # together (or not at all). If you specify just the user property but no # password property an ArgumentError exception will be thrown. # @option options [String] :password password for authentication. # Alternatively, user name may be embedded in the URL passed via the # service property. # @option options [String] :ssl_trust_certificate SSL trust certificate # to use when authentication is required for the MQ Light server. Only # used when service specifies the amqps scheme. # @option options [Boolean] :ssl_verify_name whether or not to additionally # check the MQ Light server's common name in the certificate matches the # actual server's DNS name. Only used when the sslTrustCertificate # option is specified. The default is true. # # @yield an optional block of code that is called into each time a # transition occurs in the state machine underpinning the client. # @yieldparam state [Symbol] the state that the client has now transitioned # into. This will be one of: :starting, :started:, :stopping, # :stopped, :retrying, :restarted. # @yieldparam reason [Exception, nil] an indication of why the client # transitioned into this state. An Exception is passed back # when the client encounters an exception which causes it to # transition into a new state. A value of nil indicates that # the client transitioned into this state either automatically # or as a result of the user invoking the start or stop # methods. # # @return [BlockingClient] the newly created instance of the client. # # @raise [ArgumentError] if one of the arguments supplied to the method # is not valid. # @raise [SecurityError] if, during the construction process of the # client, the MQ Light server rejects the client's connection attempt # for a security related reason. # # @note the :id option does not, currently, implement the behaviour # described when two clients connect using the same value for # this option. # @note currently SSL is not supported - thus the :ssl_trust_certificate # and :ssl_verify_name options are not implemented. def initialize(service, options = {}, &state_callback) @id = options.fetch(:id, nil) @user = options.fetch(:user, nil) @password = options.fetch(:password, nil) @connect_id = 0 # Validate id fail ArgumentError, 'Client identifier must be a String.' unless @id.is_a?(String) || @id.nil? set_defaults # Validate id some more fail ArgumentError, "Client identifier '#{@id}' is longer than the "\ 'maximum ID length of 48.' if @id.length > 48 # currently client ids are restricted, reject any invalid ones invalid_client_id_pattern = /[^A-Za-z0-9%\/\._]+/ invalid_client_id_pattern.match(@id) do |m| fail ArgumentError, "Client Identifier '#{@id}' contains invalid "\ "char: #{m[0]}" end # Validate username and password fail ArgumentError, 'Both user and password properties must '\ 'be specified together.' if (@user && !@password) || (!@user && @password) if @user && @password fail ArgumentError, 'Both user and password must be Strings.' unless (@user.is_a? String) && (@password.is_a? String) end # Validate service @service_list = [] if service.is_a?(Array) @service_list = service elsif service.is_a?(String) begin @service_list << service if URI(service).scheme.eql?('amqp') || URI(service).scheme.eql?('amqps') @service_lookup_uri = service if URI(service).scheme.eql?('http') || URI(service).scheme.eql?('https') rescue @service_list = [] @service_lookup_uri = nil end end fail ArgumentError, 'A valid service must be specified.' if @service_list.length == 0 && @service_lookup_uri.nil? @state_callback = state_callback # Setup queue for sharing with proton thread @proton_queue = Queue.new @proton_queue_mutex = Mutex.new @proton_queue_resource = ConditionVariable.new # Setup queue for running any user callbacks in @callback_queue = Queue.new # Setup queue for returning messages from proton thread @message_queue = Queue.new # Setup queue for returning acknowledgements or exception # from the proton loop to the block caller. @reply_queue = Queue.new start end # # Performs a settle of the given message/track with error handling. # def settle(tracker) check_for_error(Cproton.pn_messenger_settle(@messenger_impl, tracker, 0)) fail Mqlight::InternalError, Cproton.pn_error_text(Cproton.pn_messenger_error(@messenger_impl)) \ unless Cproton.pn_messenger_errno(@messenger_impl) == 0 end # # Performs a accept of the given message/track with error handling # def accept(tracker) Cproton.pn_messenger_accept(@messenger_impl, tracker, 0) fail Mqlight::InternalError, Cproton.pn_error_text(Cproton.pn_messenger_error(@messenger_impl)) \ unless Cproton.pn_messenger_errno(@messenger_impl) == 0 end # @private def self.finalize!(impl) proc do Cproton.pn_messenger_free(impl) end end # Requests that the client transition into started state. This method will # block the calling thread until the client has either: # 1. Attained started state (effectively being a no-op if the client is # already in started state) # 2. Attained stopped state (most likely due to another thread calling the # stop method before the client manages to attain started state). # # @option options [nil, Numeric] :timeout the period of time (in # milliseconds) to wait for the client to attain started state. If the # client does not attain started state in this period of time a # TimeoutError exception will be thrown by this method and the client # will continue to transition in state, as defined by its underlying # state machine. A value of zero is interpreted as time out immediately # if the client is not already in started state. A value of nil (the # default) is interpreted as never timeout. # # @return [BlockingClient] the instance of the client that the send method # was invoked upon. This allows for method chaining. # # @raise [RangeError] if the value specified via the timeout option is # outside of the range of valid values. # @raise [StoppedError] if the client transitions into stopped state before # attaining started state. # @raise [TimeoutError] if a timeout value is specified and the client does # not transition into started state within this period of time. # # @note if the client cannot connect to the server, it will throw # NetworkError, not implement the described retry behaviour. # @note the :timeout option is not, currently, implemented. def start(options = {}) return unless stopped? change_state(:starting) generate_service_list validate_service_list # Sort out authentication information if @user && @password auth = "#{URI.encode_www_form_component(@user)}:"\ "#{URI.encode_www_form_component(@password)}" else auth = nil end # Try each service in turn last_error = 'Unable to connect to MQ Light' @service_list.each do |service| service_url = URI(service) # Add default port for scheme unless one is specified already unless service_url.port service_url.port = (service_url.scheme == 'amqps') ? 5671 : 5672 end if service_url.userinfo address = service_url pattern = service_url.clone pattern.userinfo = '' else pattern = service_url end unless address address = service_url.clone address.userinfo = auth end # TODO: log each connection attempt once tracing is available # puts "Trying #{service_url.scheme}://#{service_url.host}:"\ # "#{service_url.port}" begin # Setup the proton messenger @messenger_impl = Cproton.pn_messenger(@id) ObjectSpace.define_finalizer(self, self.class.finalize!(@messenger_impl)) Cproton.pn_messenger_set_flags(@messenger_impl, Cproton::PN_FLAGS_CHECK_ROUTES) Cproton.pn_messenger_set_incoming_window(@messenger_impl, 1024) Cproton.pn_messenger_set_outgoing_window(@messenger_impl, 1024) Cproton.pn_messenger_route(@messenger_impl, (pattern.to_s + '/*'), (address.to_s + '/$1')) # Try to start the messenger check_for_error(Cproton.pn_messenger_start(@messenger_impl)) # Assign the service if we start successfully (without auth info) @service = "#{service_url.scheme}://#{service_url.host}:"\ "#{service_url.port}" change_state(:started) rescue Qpid::Proton::ProtonError => e last_error = e.to_s end break if started? change_state(:retrying) end # if we exhausted @service_list without achieving the started state then # fail with a suitable error message for the last error that we saw unless started? if /sasl /.match(last_error) || /SSL /.match(last_error) fail Mqlight::SecurityError, last_error else fail Mqlight::NetworkError, last_error end end # New connection; increment count @connect_id += 1 @proton_thread = Thread.new do Thread.current['name'] = 'proton_loop' proton_loop while started? # drain remaining proton_queue requests before Thread completes proton_loop until @proton_queue.empty? end @callback_thread = Thread.new do Thread.current['name'] = 'callback_thread' callback_loop while started? end self end # Requests that the client transition into stopped state. This method will # block the calling thread until the client has attained stopped state. # # @option options [nil, Numeric] :timeout the amount of time (in # milliseconds) to wait to flush any outstanding messages to the # network. A value of zero indicates the client should stop # immediately without attempting to flush messages. A value of nil # (the default) indicates the method will block until all messages # are flushed. # # @raise [RangeError] if the value specified via the timeout option is # outside of the range of valid values. # @raise [TimeoutError] if a timeout value is specified and the client does # not flush any buffered messages within the timeout period. The # client will, however, still transition to stopped state even if # this exception is thrown. # @note the timeout option is not, currently, implemented. def stop(options = {}) return unless started? @proton_queue_mutex.synchronize do change_state(:stopped) @proton_queue_resource.broadcast end @proton_thread.join check_for_error(Cproton.pn_messenger_stop(@messenger_impl)) end # Sends a message to the specified topic, blocking the calling thread while # the send operation takes place (or until the timeout value, as specified # via the timeout option is exceeded). # * For "at most once" quality of service messages (qos option set to 0) # messages the calling thread will block until the client is both # successfully network connected and the message has been buffered # by the client. This method may or may not block until the data has # been flushed to the underlying network, at the discretion of the # client implementation which balances throughput against buffering # large amounts of data. # * For "at least once" quality of service messages (qos option set to 1) # messages the calling thread will block until the client is both # successfully network connected and has received confirmation # from the server that the server has received a copy of the message. # # @param topic [String] the topic to which the message will be sent. # @param data [String] the data to send in the message payload. # @option options [Numeric] :qos The quality of service to use when # sending the message. 0 is used to denote at most once (the default) # and 1 is used for at least once. If a value which is not 0 and not 1 # is specified then this method will throw a RangeError exception. # @option options [nil, Numeric] :timeout the minimum amount # of time (in milliseconds) that the client will attempt to send # the message for. If the client is not able to send the message # after this period has elapsed then this method will raise # TimeoutError. A value of zero is interpreted as timeout # immediately. A value of nil (the default) means wait indefinitely. # @option options [Numeric] :ttl A time to live value for the message in # milliseconds. MQ Light will endeavour to discard, without delivering, # any copy of the message that has not been delivered within its time to # live period. The default time to live is 604800000 milliseconds # (7 days). The value supplied for this argument must be greater than # zero and finite, otherwise a RangeError exception will be thrown when # this method is called. # # @return [BlockingClient] the instance of the client that the send method # was invoked upon. This allows for method chaining. # # @raise [ArgumentError] if one of the arguments supplied to the method is # not valid. # @raise [TimeoutError] if the amount of time taken to process the send # request has exceeded the value specified by the timeout option. If # the send operation is sending a QoS 0 message then the message will # not have been sent. If a QoS 1 message is being sent then the message # may have been sent to the server, but not as yet acknowledged by # the server. # @raise [StoppedError] if the method is called while the client is in # stopped state, or has transitioned into stopped state while the send # operation was taking place. def send(topic, data, options = {}) fail Mqlight::StoppedError, 'Not started.' unless started? fail ArgumentError, 'topic must be a String' unless topic.is_a? String fail Mqlight::UnsupportedError, "#{data.class.name.split('::').last} "\ 'is not yet supported as a message data type' unless data.is_a? String if options.is_a? Hash qos = options.fetch(:qos, nil) ttl = options.fetch(:ttl, nil) timeout = options.fetch(:timeout, nil) else fail ArgumentError, 'options must be a Hash.' unless options.nil? end qos ||= QOS_AT_MOST_ONCE set_settle_mode(@messenger_impl, qos) unless ttl.nil? fail ArgumentError, "options:ttl value '" + ttl.to_s + "' is invalid, must be an unsigned non-zero integer number" unless ttl.is_a?(Integer) && ttl > 0 ttl = 4_294_967_295 if ttl > 4_294_967_295 end if timeout fail ArgumentError, 'timeout must be nil or a unsigned Integer' if (!timeout.is_a? Integer) || (timeout < 0) timeout /= 1000.0 end # Setup the message msg = Qpid::Proton::Message.new # URI escape anything apart from path separators (/) and all known # unreserved characters msg.address = "#{@service}/"\ "#{URI.encode(topic, Regexp.new("[^/#{URI::PATTERN::UNRESERVED}]"))}" msg.ttl = ttl if ttl msg.body = data msg.content_type = 'text/plain' # Send the message begin Timeout.timeout(timeout, Mqlight::TimeoutError) do msg.pre_encode @proton_queue_mutex.synchronize do @proton_queue.push(action: 'send', params: msg.impl) @proton_queue_resource.signal until @proton_queue.empty? @proton_queue_resource.wait(@proton_queue_mutex, timeout) end @proton_queue_resource.signal end end rescue StandardError => error raise error end # # Collect status and throw exception is present reply = @reply_queue.pop() fail reply unless reply.nil? self end # Subscribes to receive messages from a destination, identified by the # topic pattern argument. The receive(...) method can then be used to # retrieve messages, held at the server, for the destination. # The client cannot be in stopped or stopping state when this method is # called, otherwise a StoppedError will be raised. # # @param topic_pattern [String] the topic pattern to subscribe to. This # identifies or creates a destination. # @option options [Boolean] :auto_confirm when set to true (the default) the # client will automatically confirm delivery of messages when all of # the listeners registered for the client's message event have # returned. When set to false, application code is responsible for # confirming the delivery of messages using the confirm # method, passed via the delivery argument of the listener # registered for message events. auto_confirm is only applicable # when the qos property is set to 1. The qos property is described # later. # @option options [Numeric] :qos the quality of service to use for # delivering messages to the subscription. Valid values are: 0 to # denote at most once (the default), and 1 for at least once. A # RangeError will be thrown for other value. # @option options [Numeric] :ttl a time-to-live value, in milliseconds, that # is applied to the destination that the client is subscribed to. # This value will replace any previous value, if the destination # already exists. Time to live starts counting down when there are # no instances of a client subscribed to a destination. It is reset # each time a new instance of the client subscribes to the # destination. If time to live counts down to zero then MQ Light # will delete the destination by discarding any messages held at # the destination and not accruing any new messages. The default # value for this property is 0 - which means the destination will be # deleted as soon as there are no clients subscribed to it. # @option options [String] :share the name for creating or joining a shared # destination for which messages are anycast between connected # subscribers. If omitted defaults to a private destination (e.g. # messages can only be received by a specific instance of the # client). # @raise [StoppedError] if the method is called while the client is in the # stopped state. # @raise [SubscribedError] if the client is already subscribed to the # destination. def subscribe(topic_pattern, options = {}) fail Mqlight::StoppedError, 'Not started.' if stopped? destination = Mqlight::Destination.new(@service, topic_pattern, options) set_settle_mode(@messenger_impl, destination.qos) @proton_queue_mutex.synchronize do @proton_queue.push(action: 'subscribe', params: destination) @proton_queue_resource.signal until @proton_queue.empty? @proton_queue_resource.wait(@proton_queue_mutex) @proton_queue_resource.signal end end # Collect status and throw exception is present reply = @reply_queue.pop() fail reply unless reply.nil? self end # Receive a message from a destination, as identified by the topic pattern # used to subscribe to the destination. # @param topic_pattern [String] a topic pattern identifying the # destination to attempt to receive messages from. The destination # must previously have been subscribed to using the subscribe method. # This method will block the calling thread until at least one # message is received from the destinations or the operation times # out (see the timeout option). # @option options [nil, Numeric] :timeout the period of time # (in milliseconds) to wait for a message to be received from at # least one of the destinations. If no messages are received from # any of the destinations within this time period, then an empty # array is returned. A value of zero is interpreted as time out # immediately. A value of nil (the default) is intepreted as # never timeout. # @return (Delivery, nil) either a delivery object - representing the # message received or nil if no message was received (e.g. because # the operation timed out). # @raise [StoppedError] if the client is in stopped or stopping state. This # can also occur because another thread calls the stop method while # a thread is blocked inside this receive method. # @raise [UnsubscribedError] if one or more of the topic_patterns refers to # a destination that the client not currently subscribed to. # This can also occur because another thread calls the unsubscribe # method while a thread is blocked inside this receive method. def receive(topic_pattern, options = {}) fail Mqlight::StoppedError, 'Not started.' unless started? # Validate topic_pattern fail ArgumentError, 'topic_pattern must be a String.' unless topic_pattern.is_a? String # Validate options fail ArgumentError, 'options must be a Hash.' unless options.is_a?(Hash) || options.nil? timeout = options.fetch(:timeout, nil) if options.is_a? Hash unless timeout.nil? fail ArgumentError, 'timeout must be nil or an unsigned Integer' unless timeout.is_a? Integer fail RangeError, 'timeout must be an unsigned Integer' if timeout < 0 end share = options.fetch(:share, nil) fail ArgumentError, 'share must be a String or nil.' unless share.is_a?(String) || share.nil? if share.is_a? String fail ArgumentError, 'share is invalid because it contains a colon (:) character' if share.include? ':' end destination = @destinations.find do |dest| dest.match?(topic_pattern, share) end # Has a matching destination has been found? if destination.nil? fail Mqlight::UnsubscribedError, 'You must be subscribed with '\ "topic_pattern #{topic_pattern} to receive messages from it." \ if share.nil? fail Mqlight::UnsubscribedError, 'You must be subscribed with '\ "topic_pattern #{topic_pattern} and share #{share} to receive"\ 'messages from it.' end @proton_queue_mutex.synchronize do @proton_queue.push(action: 'receive', timeout: timeout, destination: destination) @proton_queue_resource.signal until @proton_queue.empty? @proton_queue_resource.wait(@proton_queue_mutex, timeout) end @proton_queue_resource.signal end # Get the message or nil for timeout to return @message_queue.pop end # Unsubscribes from a destination. The client will no longer be able to # receive messages from the destination. If another thread is using the # receive() methods to retrieve messages from the destination that is being # unsubscribed from then the receive() method will return immediately # raising an UnsubscribedError. # # @param topic_pattern [String] the topic pattern to unsubscribe from. # This identifies the destination to unsubscribe from. # @option options [Numeric] :ttl sets the destination's time to live as part # of the unsubscribe operation. The default (when this property is # not specified) is not to change the destination's time to live. # When specified the only valid value for this property is 0. # @option options [String] :share matched against the share specified on the # subscribe call to determine which destination the client will # unsubscribed from. # @raise [StoppedError] if the client is in stopped or stopping state. # @raise [UnsubscribedError] if the client is not subscribed to the # destination (e.g. there has been no matching call to the subscribe # method). # # @note the ttl option is, currently, not supported. def unsubscribe(topic_pattern, options = {}) fail Mqlight::StoppedError, 'Not started' unless started? fail ArgumentError, 'topic_pattern must be a String' unless topic_pattern.is_a? String @topic_pattern = topic_pattern share = options[:share] fail ArgumentError, 'share must be a String or nil.' unless share.is_a?(String) || share.nil? if share.is_a? String fail ArgumentError, 'share is invalid because it contains a colon (:) character' if share.include? ':' end destination = @destinations.find do |dest| dest.match? topic_pattern, share end fail Mqlight::UnsubscribedError, 'client is not subscribed to this address and share' if destination.nil? && !share.nil? fail Mqlight::UnsubscribedError, 'client is not subscribed to this address' if destination.nil? @proton_queue_mutex.synchronize do @proton_queue.push(action: 'unsubscribe', params: destination) @proton_queue_resource.signal until @proton_queue.empty? @proton_queue_resource.wait(@proton_queue_mutex) @proton_queue_resource.signal end end @destinations.delete(destination) self end # @return [nil, String] either the URL of the service that the client is # currently connect to, or nil if the client is not currently # connected to a service. def service if started? @service else nil end end # def started? @state == :started end # def stopped? @state == :stopped end # def retrying? @state == :retrying end # def error Cproton.pn_error_text(Cproton.pn_messenger_error(@messenger_impl)) end # def to_s "#{@id}" end private # @private def set_defaults # Generate id if none supplied @id ||= 'AUTO_' + SecureRandom.hex[0..6] # Empty service list to be populated @service_list = [] # Initialise as stopped @state = :stopped # Start with no destinations @destinations = [] end # @private def change_state(new_state, reason = nil) return if @state == new_state @state = new_state @callback_queue.push([@state_callback, @state, reason]) if @state_callback end # @private def generate_service_list return unless @service_lookup_uri # TODO: Retry logic @service_list = Mqlight::Util.get_service_urls(@service_lookup_uri) end # @private def validate_service_list property_auth = nil if @user && @password property_auth = "#{URI.encode_www_form_component(@user)}:"\ "#{URI.encode_www_form_component(@password)}" end @service_list.each do |service| service_auth = URI(service).userinfo if service_auth fail ArgumentError, "URLs supplied via the 'service' property must specify both a "\ 'user name and a password value, or omit both values' unless service_auth.split(':').size == 2 fail ArgumentError, "User name supplied as an argument (#{property_auth}) does not"\ ' match user name supplied via a service url'\ "(#{service_auth})" if property_auth && !(property_auth.eql? service_auth) end next if URI(service).scheme.eql?('amqp') # TODO: remove comment once amqps:// is supported # next if URI(service).scheme.eql?('amqps') fail ArgumentError, "One of the supplied services (#{service}) is not a "\ 'URL scheme that is supported by this client' end end # @private def callback_loop argv = @callback_queue.pop callback = argv.shift callback.call(argv) end # @private def remote_timeout Cproton.pn_messenger_get_remote_idle_timeout(@messenger_impl, @service.to_s) end # @private def proton_loop @proton_queue_mutex.synchronize do unless @proton_queue.empty? begin op = @proton_queue.pop(true) case op[:action] when 'send' process_queued_send op[:params] when 'subscribe' process_queued_subscription op[:params] when 'unsubscribe' process_queued_unsubscribe op[:params] when 'receive' check_for_messages(op[:destination], op[:timeout]) end rescue => e # TODO: this section needs to be converted into a FDC # when the story is implemented puts "Ruby: uncaught exception: #{e}" puts e.backtrace rescue ThreadError # thrown by queue.pop if queue is empty (should never happen) break end end @proton_queue_resource.signal unless stopped? @proton_queue_resource.wait(@proton_queue_mutex, remote_timeout / 1000) Cproton.pn_messenger_work(@messenger_impl, 0) @proton_queue_resource.signal end end end # @private def process_queued_send(msg) check_for_error(Cproton.pn_messenger_put(@messenger_impl, msg)) check_for_error(Cproton.pn_messenger_send(@messenger_impl, 1)) # Push back a message: nil if no problem detected otherwise an exception # describing the issue. exception = nil complete = true # May be required in the future, see Node JS client tracker = Cproton.pn_messenger_outgoing_tracker(@messenger_impl) status = Cproton.pn_messenger_status(@messenger_impl, tracker) case status when Cproton::PN_STATUS_ACCEPTED # TODO: future story messenger.settle(inFlight.msg) when Cproton::PN_STATUS_SETTLED # TODO: future story messenger.settle(inFlight.msg) when Cproton::PN_STATUS_REJECTED reject_msg = Mqlight::Util.tracker_condition_description( @messenger_impl, tracker, 'send failed - message was rejected') exception = Mqlight::ExceptionContainer.new( RangeError.new(reject_msg)) when Cproton::PN_STATUS_RELEASED exception = Mqlight::ExceptionContainer.new( Mqlight::InternalError.new( 'send failed - message was released')) when Cproton::PN_STATUS_MODIFIED exception = Mqlight::ExceptionContainer.new( Mqlight::InternalError.new( 'send failed - message was modified')) when Cproton::PN_STATUS_ABORTED exception = Mqlight::ExceptionContainer.new( Mqlight::InternalError.new( 'send failed - message was aborted')) when Cproton::PN_STATUS_PENDING # TODO: Node JS client performs a messenger.send() complete = false when 0 # TODO: rspec is generating these. else exception = Mqlight::ExceptionContainer.new( Mqlight::InternalError.new( "send failed - unknown status #{status}")) end @reply_queue.push(exception) rescue Qpid::Proton::TimeoutError # Specific capture of the QPid timeout condition # Reply back to user with TimeoutError. @reply_queue.push(TimeoutError.new( 'Send request did not complete within the requested period')) rescue Qpid::Proton::ProtonError => error @reply_queue.push(Mqlight::ExceptionContainer.new( Mqlight::InternalError.new(error))) end # @private def process_queued_subscription(destination) Cproton.pn_messenger_subscribe_ttl(@messenger_impl, destination.address, destination.ttl) link = Cproton.pn_messenger_get_link(@messenger_impl, destination.address, false) # block until link is active or error condition detected exception = nil while (Cproton.pn_link_state(link) & Cproton::PN_REMOTE_ACTIVE) == 0 # Perform work Cproton.pn_messenger_work(@messenger_impl, 0) # # Check for errors from last work action unless Cproton.pn_messenger_errno(@messenger_impl) == 0 exception = Mqlight::SubscribedError.new( Cproton.pn_error_text(Cproton.pn_messenger_error(@messenger_impl))) break end # Short pause sleep 0.1 end # Return the acknowledgement @reply_queue.push(exception) # FIXME: shouldn't call link flow unless using manual credit (we're using # explicit credit on our recv call) # Cproton.pn_link_flow(link, destination.credit) if destination.credit > 0 # Store record of subscription @destinations.push(destination) end # @private def process_queued_unsubscribe(destination) # find and close the link link = Cproton.pn_messenger_get_link(@messenger_impl, destination.address, false) expiry_policy = Cproton.pn_terminus_get_expiry_policy(Cproton.pn_link_target(link)) timeout = Cproton.pn_terminus_get_timeout(Cproton.pn_link_target(link)) # if we're not expiring the link, we won't get an ACK from the server # so all we can do is wait until our request has gone over the network if timeout > 0 || expiry_policy == Cproton::PN_EXPIRE_NEVER Cproton.pn_link_detach(link) until Cproton.pn_link_remote_detached(link) Cproton.pn_messenger_work(@messenger_impl, 0) end Cproton.pn_link_free(link) else # otherwise we can wait for server-side confirmation of the close Cproton.pn_link_close(link) while (Cproton.pn_link_state(link) & Cproton::PN_REMOTE_CLOSED) == 0 Cproton.pn_messenger_work(@messenger_impl, 0) end end end # @private def check_for_messages(destination, timeout = nil) link = Cproton.pn_messenger_get_link(@messenger_impl, destination.address, false) unless link @message_queue.push(nil) return end Cproton.pn_link_flow(link, 1) if Cproton.pn_link_credit(link) == 0 loop do begin break unless started? if timeout.nil? loop_timeout = (remote_timeout > 0) ? remote_timeout / 2 : -1 else loop_timeout = [timeout, remote_timeout].min end Cproton.pn_messenger_set_timeout(@messenger_impl, loop_timeout) check_for_error(Cproton.pn_messenger_recv(@messenger_impl, -2)) break rescue Qpid::Proton::TimeoutError Cproton.pn_messenger_work(@messenger_impl, 0) next if timeout.nil? timeout -= loop_timeout break if timeout <= 0 end end # Clear the timeout Cproton.pn_messenger_set_timeout(@messenger_impl, -1) incoming_count = Cproton.pn_messenger_incoming(@messenger_impl) if incoming_count == 0 # if no message was received, we set the drain flag and wait for the # server to advance the delivery-count, consuming our credit # XXX: temporarily disabled as this requires a service update # Cproton.pn_link_set_drain(link, true) # while Cproton.pn_link_draining(link) # Cproton.pn_messenger_work(@messenger_impl, 0) # sleep 0.1 # end incoming_count = Cproton.pn_messenger_incoming(@messenger_impl) unless incoming_count > 0 @message_queue.push(nil) return end end msg = Qpid::Proton::Message.new begin check_for_error(Cproton.pn_messenger_get(@messenger_impl, msg.impl)) msg.post_decode unless msg.nil? rescue Qpid::Proton::Error => error raise "ERROR: #{error.message}" end tracker = Cproton.pn_messenger_incoming_tracker(@messenger_impl) message = Mqlight::Delivery.new(msg, destination, self, tracker) @message_queue.push(message) # QoS 0 accept(tracker) if destination.qos == QOS_AT_MOST_ONCE # QoS 1 / auto-confirm settle(tracker) if destination.qos == QOS_AT_LEAST_ONCE \ && destination.auto_confirm fail "unexpectedly received #{incoming_count} messages when only 1 was "\ 'expected' if incoming_count > 1 end # # Configures the settle mode based on the given QoS # def set_settle_mode(messenger, qos) if qos == 0 Cproton.pn_messenger_set_snd_settle_mode( messenger, Cproton::PN_SND_SETTLED) Cproton.pn_messenger_set_rcv_settle_mode( messenger, Cproton::PN_RCV_FIRST) elsif qos == 1 Cproton.pn_messenger_set_snd_settle_mode( messenger, Cproton::PN_SND_UNSETTLED) Cproton.pn_messenger_set_rcv_settle_mode( messenger, Cproton::PN_RCV_FIRST) else fail ArgumentError, "Argument qos=#{qos} is an invalid value. " \ 'Must be either QOS_AT_LEAST_ONCE(0) or QOS_AT_MOST_ONCE(1)' end end end end