lib/qpid_proton/messenger.rb in qpid_proton-0.8 vs lib/qpid_proton/messenger.rb in qpid_proton-0.9.0

- old
+ new

@@ -15,13 +15,13 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # -module Qpid +module Qpid # :nodoc: - module Proton + module Proton # :nodoc: # The +Messenger+ class defines a high level interface for # sending and receiving Messages. Every Messenger contains # a single logical queue of incoming messages and a single # logical queue of outgoing messages. These messages in these @@ -57,10 +57,15 @@ # class Messenger include Qpid::Proton::ExceptionHandling + can_raise_exception [:send, :receive, :password=, :start, :stop, + :perform_put, :perform_get, :interrupt, + :route, :rewrite, :accept, :reject, + :incoming_window=, :outgoing_window=] + # Creates a new +Messenger+. # # The +name+ parameter is optional. If one is not provided then # a unique name is generated. # @@ -92,11 +97,11 @@ # ==== Arguments # # * password - the password # def password=(password) - check_for_error(Cproton.pn_messenger_set_password(@impl, password)) + Cproton.pn_messenger_set_password(@impl, password) end # Returns the password property for the Messenger.private_key file. # def password @@ -177,45 +182,60 @@ # def error Cproton.pn_error_text(Cproton.pn_messenger_error(@impl)) end + # Clears the current error state. + # + def clear_error + error = Cproton.pn_messenger_error(@impl) + unless error.nil? + Cproton.pn_error_clear(error) + end + end + # Currently a no-op placeholder. # For future compatibility, do not send or recv messages # before starting the +Messenger+. # def start - check_for_error(Cproton.pn_messenger_start(@impl)) + Cproton.pn_messenger_start(@impl) end # Stops the +Messenger+, preventing it from sending or receiving # any more messages. # def stop - check_for_error(Cproton.pn_messenger_stop(@impl)) + Cproton.pn_messenger_stop(@impl) end - # Returns true iff a Messenger is in the stopped state. + # Returns true if a Messenger is in the stopped state. # This function does not block. # - def stopped + def stopped? Cproton.pn_messenger_stopped(@impl) end # Subscribes the Messenger to messages originating from the # specified source. The source is an address as specified in the # Messenger introduction with the following addition. If the # domain portion of the address begins with the '~' character, the # Messenger will interpret the domain as host/port, bind to it, # and listen for incoming messages. For example "~0.0.0.0", - # "amqp://~0.0.0.0" will all bind to any local interface and - # listen for incoming messages. Ad address of # "amqps://~0.0.0.0" + # "amqp://~0.0.0.0" will all bind to any local interface and + # listen for incoming messages. An address of "amqps://~0.0.0.0" # will only permit incoming SSL connections. # - def subscribe(address) + # ==== Options + # + # * address - the source address to be subscribe + # * timeout - an optional time-to-live value, in seconds, for the + # subscription + # + def subscribe(address, timeout=0) raise TypeError.new("invalid address: #{address}") if address.nil? - subscription = Cproton.pn_messenger_subscribe(@impl, address) + subscription = Cproton.pn_messenger_subscribe_ttl(@impl, address, timeout) raise Qpid::Proton::ProtonError.new("Subscribe failed") if subscription.nil? Qpid::Proton::Subscription.new(subscription) end # Path to a certificate file for the +Messenger+. @@ -280,14 +300,14 @@ end # Places the content contained in the message onto the outgoing # queue of the Messenger. # - # This method will never block, however it will send any unblocked + # This method will never block, however it will send any unblocked # Messages in the outgoing queue immediately and leave any blocked # Messages remaining in the outgoing queue. - # The send call may then be used to block until the outgoing queue + # The send call may then be used to block until the outgoing queue # is empty. The outgoing attribute may be used to check the depth # of the outgoing queue. # # ==== Options # @@ -296,22 +316,31 @@ def put(message) raise TypeError.new("invalid message: #{message}") if message.nil? raise ArgumentError.new("invalid message type: #{message.class}") unless message.kind_of?(Message) # encode the message first message.pre_encode - check_for_error(Cproton.pn_messenger_put(@impl, message.impl)) + perform_put(message) return outgoing_tracker end + private + + def perform_put(message) # :nodoc: + Cproton.pn_messenger_put(@impl, message.impl) + end + + public + + # This call will block until the indicated number of messages # have been sent, or until the operation times out. - # If n is -1 this call will block until all outgoing messages - # have been sent. If n is 0 then this call will send whatever + # If n is -1 this call will block until all outgoing messages + # have been sent. If n is 0 then this call will send whatever # it can without blocking. # def send(n = -1) - check_for_error(Cproton.pn_messenger_send(@impl, n)) + Cproton.pn_messenger_send(@impl, n) end # Moves the message from the head of the incoming message queue into # the supplied message object. Any content in the supplied message # will be overwritten. @@ -331,15 +360,23 @@ if msg.nil? then msg_impl = nil else msg_impl = msg.impl end - check_for_error(Cproton.pn_messenger_get(@impl, msg_impl)) + perform_get(msg_impl) msg.post_decode unless msg.nil? return incoming_tracker end + private + + def perform_get(msg) # :nodoc: + Cproton.pn_messenger_get(@impl, msg) + end + + public + # Receives up to limit messages into the incoming queue. If no value # for limit is supplied, this call will receive as many messages as it # can buffer internally. If the Messenger is in blocking mode, this # call will block until at least one Message is available in the # incoming queue. @@ -347,14 +384,15 @@ # Options ==== # # * limit - the maximum number of messages to receive # def receive(limit = -1) - check_for_error(Cproton.pn_messenger_recv(@impl, limit)) + Cproton.pn_messenger_recv(@impl, limit) end - def receiving + # Returns true if the messenger is currently receiving data. + def receiving? Cproton.pn_messenger_receiving(@impl) end # Attempts interrupting of the messenger thread. # @@ -367,19 +405,19 @@ # If there is no currently blocking call, then the next blocking # call will be affected, even if it is within the same thread that # originated the interrupt. # def interrupt - check_for_error(Cproton.pn_messenger_interrupt(@impl)) + Cproton.pn_messenger_interrupt(@impl) end # Sends or receives any outstanding messages queued for a Messenger. # # This will block for the indicated timeout. This method may also do I/O # other than sending and receiving messages. For example, closing # connections after stop() has been called. - # + # def work(timeout=-1) err = Cproton.pn_messenger_work(@impl, timeout) if (err == Cproton::PN_TIMEOUT) then return false else @@ -455,11 +493,11 @@ # # # route any address through a single broker # messenger.route("*", "amqp://user:password@broker/$1") # def route(pattern, address) - check_for_error(Cproton.pn_messenger_route(@impl, pattern, address)) + Cproton.pn_messenger_route(@impl, pattern, address) end # Similar to #route, except that the destination of # the Message is determined before the message address is rewritten. # @@ -477,11 +515,11 @@ # # * pattern - the outgoing address # * address - the target address # def rewrite(pattern, address) - check_for_error(Cproton.pn_messenger_rewrite(@impl, pattern, address)) + Cproton.pn_messenger_rewrite(@impl, pattern, address) end def selectable impl = Cproton.pn_messenger_selectable(@impl) @@ -531,11 +569,11 @@ tracker = self.incoming_tracker flag = Cproton::PN_CUMULATIVE else flag = 0 end - check_for_error(Cproton.pn_messenger_accept(@impl, tracker.impl, flag)) + Cproton.pn_messenger_accept(@impl, tracker.impl, flag) end # Rejects the incoming message identified by the tracker. # If no tracker is supplied, all messages that have been returned # by the get method are rejected, except those that have already @@ -551,17 +589,17 @@ tracker = self.incoming_tracker flag = Cproton::PN_CUMULATIVE else flag = 0 end - check_for_error(Cproton.pn_messenger_reject(@impl, tracker.impl, flag)) + Cproton.pn_messenger_reject(@impl, tracker.impl, flag) end # Gets the last known remote state of the delivery associated with - # the given tracker, as long as the Message is still within your - # outgoing window. (Also works on incoming messages that are still - # within your incoming queue. See TrackerStatus for details on the + # the given tracker, as long as the Message is still within your + # outgoing window. (Also works on incoming messages that are still + # within your incoming queue. See TrackerStatus for details on the # values returned. # # ==== Options # # * tracker - the tracker @@ -592,37 +630,37 @@ Cproton.pn_messenger_settle(@impl, tracker.impl, flag) end # Sets the incoming window. # - # The Messenger will track the remote status of this many incoming + # The Messenger will track the remote status of this many incoming # deliveries after they have been accepted or rejected. # # Messages enter this window only when you take them into your application # using get(). If your incoming window size is n, and you get n+1 messages # without explicitly accepting or rejecting the oldest message, then the - # message that passes beyond the edge of the incoming window will be + # message that passes beyond the edge of the incoming window will be # assigned the default disposition of its link. # # ==== Options # # * window - the window size # def incoming_window=(window) raise TypeError.new("invalid window: #{window}") unless valid_window?(window) - check_for_error(Cproton.pn_messenger_set_incoming_window(@impl, window)) + Cproton.pn_messenger_set_incoming_window(@impl, window) end # Returns the incoming window. # def incoming_window Cproton.pn_messenger_get_incoming_window(@impl) end # Sets the outgoing window. # - # The Messenger will track the remote status of this many outgoing + # The Messenger will track the remote status of this many outgoing # deliveries after calling send. # A Message enters this window when you call the put() method with the # message. If your outgoing window size is n, and you call put n+1 # times, status information will no longer be available for the # first message. @@ -631,10 +669,10 @@ # # * window - the window size # def outgoing_window=(window) raise TypeError.new("invalid window: #{window}") unless valid_window?(window) - check_for_error(Cproton.pn_messenger_set_outgoing_window(@impl, window)) + Cproton.pn_messenger_set_outgoing_window(@impl, window) end # Returns the outgoing window. # def outgoing_window