lib/qpid_proton/messenger.rb in qpid_proton-0.5 vs lib/qpid_proton/messenger.rb in qpid_proton-0.6

- old
+ new

@@ -19,15 +19,44 @@ module Qpid module Proton - # A +Messenger+ provides a high-level means for sending and - # receiving AMQP messages. + # The +Messenger+ class defines a high level interface for + # sending and receiving Messages. Every Messenger contains + # a single logical queue of incoming messages and a single + # logical queue of outgoing messages. These messages in these + # queues may be destined for, or originate from, a variety of + # addresses. # - # ==== Examples + # The messenger interface is single-threaded. All methods + # except one ( #interrupt ) are intended to be used from within + # the messenger thread. # + # === Sending & Receiving Messages + # + # The Messenger class works in conjuction with the Message class. The + # Message class is a mutable holder of message content. + # + # The put method copies its Message to the outgoing queue, and may + # send queued messages if it can do so without blocking. The send + # method blocks until it has sent the requested number of messages, + # or until a timeout interrupts the attempt. + # + # Similarly, the recv method receives messages into the incoming + # queue, and may block as it attempts to receive the requested number + # of messages, or until timeout is reached. It may receive fewer + # than the requested number. The get method pops the + # eldest Message off the incoming queue and copies it into the Message + # object that you supply. It will not block. + # + # The blocking attribute allows you to turn off blocking behavior entirely, + # in which case send and recv will do whatever they can without + # blocking, and then return. You can then look at the number + # of incoming and outgoing messages to see how much outstanding work + # still remains. + # class Messenger include Qpid::Proton::ExceptionHandling # Creates a new +Messenger+. @@ -54,10 +83,27 @@ # def name Cproton.pn_messenger_name(@impl) end + # This property contains the password for the Messenger.private_key + # file, or +nil+ if the file is not encrypted. + # + # ==== Arguments + # + # * password - the password + # + def password=(password) + check_for_error(Cproton.pn_messenger_set_password(@impl, password)) + end + + # Returns the password property for the Messenger.private_key file. + # + def password + Cproton.pn_messenger_get_password(@impl) + end + # Sets the timeout period, in milliseconds. # # A negative timeout period implies an infinite timeout. # # ==== Options @@ -73,10 +119,16 @@ # def timeout Cproton.pn_messenger_get_timeout(@impl) end + # Blocking Attribute + # + # Enable or disable blocking behavior during message sending + # and receiving. This affects every blocking call, with the + # exception of work(). Currently, the affected calls are + # send, recv, and stop. def blocking Cproton.pn_mesenger_is_blocking(@impl) end def blocking=(blocking) @@ -99,12 +151,13 @@ # def error Cproton.pn_error_text(Cproton.pn_messenger_error(@impl)) end - # Starts the +Messenger+, allowing it to begin sending and - # receiving messages. + # Currently a no-op placeholder. + # For future compatibility, do not send or recv messages + # before starting the +Messenger+. # def start check_for_error(Cproton.pn_messenger_start(@impl)) end @@ -113,15 +166,26 @@ # def stop check_for_error(Cproton.pn_messenger_stop(@impl)) end + # Returns true iff a Messenger is in the stopped state. + # This function does not block. + # def stopped Cproton.pn_messenger_stopped(@impl) end - # Subscribes the +Messenger+ to a remote address. + # Subscribes the Messenger to messages originating from the + # specified source. The source is an address as specified in the + # Messenger introduction with the following addition. If the + # domain portion of the address begins with the '~' character, the + # Messenger will interpret the domain as host/port, bind to it, + # and listen for incoming messages. For example "~0.0.0.0", + # "amqp://~0.0.0.0" will all bind to any local interface and + # listen for incoming messages. Ad address of # "amqps://~0.0.0.0" + # will only permit incoming SSL connections. # def subscribe(address) raise TypeError.new("invalid address: #{address}") if address.nil? subscription = Cproton.pn_messenger_subscribe(@impl, address) raise Qpid::Proton::ProtonError.new("Subscribe failed") if subscription.nil? @@ -129,11 +193,14 @@ end # Path to a certificate file for the +Messenger+. # # This certificate is used when the +Messenger+ accepts or establishes - # SSL/TLS connections. + # SSL/TLS connections. This property must be specified for the + # Messenger to accept incoming SSL/TLS connections and to establish + # client authenticated outgoing SSL/TLS connection. Non client authenticated + # outgoing SSL/TLS connections do not require this property. # # ==== Options # # * certificate - the certificate # @@ -149,11 +216,12 @@ # Path to a private key file for the +Messenger+. # # The property must be specified for the +Messenger+ to accept incoming # SSL/TLS connections and to establish client authenticated outgoing - # SSL/TLS connections. + # SSL/TLS connections. Non client authenticated SSL/TLS connections + # do not require this property. # # ==== Options # # * key - the key file # @@ -183,13 +251,19 @@ # def trusted_certificates Cproton.pn_messenger_get_trusted_certificates(@impl) end - # Puts a single message into the outgoing queue. + # Places the content contained in the message onto the outgoing + # queue of the Messenger. # - # To ensure messages are sent, you should then call ::send. + # This method will never block, however it will send any unblocked + # Messages in the outgoing queue immediately and leave any blocked + # Messages remaining in the outgoing queue. + # The send call may then be used to block until the outgoing queue + # is empty. The outgoing attribute may be used to check the depth + # of the outgoing queue. # # ==== Options # # * message - the message # @@ -200,18 +274,26 @@ message.pre_encode check_for_error(Cproton.pn_messenger_put(@impl, message.impl)) return outgoing_tracker end - # Sends all outgoing messages, blocking until the outgoing queue - # is empty. + # This call will block until the indicated number of messages + # have been sent, or until the operation times out. + # If n is -1 this call will block until all outgoing messages + # have been sent. If n is 0 then this call will send whatever + # it can without blocking. # def send(n = -1) check_for_error(Cproton.pn_messenger_send(@impl, n)) end - # Gets a single message incoming message from the local queue. + # Moves the message from the head of the incoming message queue into + # the supplied message object. Any content in the supplied message + # will be overwritten. + # A tracker for the incoming Message is returned. The tracker can + # later be used to communicate your acceptance or rejection of the + # Message. # # If no message is provided in the argument, then one is created. In # either case, the one returned will be the fetched message. # # ==== Options @@ -228,12 +310,15 @@ check_for_error(Cproton.pn_messenger_get(@impl, msg_impl)) msg.post_decode unless msg.nil? return incoming_tracker end - # Receives up to the specified number of messages, blocking until at least - # one message is received. + # Receives up to limit messages into the incoming queue. If no value + # for limit is supplied, this call will receive as many messages as it + # can buffer internally. If the Messenger is in blocking mode, this + # call will block until at least one Message is available in the + # incoming queue. # # Options ==== # # * limit - the maximum number of messages to receive # @@ -243,10 +328,32 @@ def receiving Cproton.pn_messenger_receiving(@impl) end + # Attempts interrupting of the messenger thread. + # + # The Messenger interface is single-threaded, and this is the only + # function intended to be called from outside of is thread. + # + # Call this from a non-Messenger thread to interrupt it while it + # is blocking. This will cause a ::InterruptError to be raised. + # + # If there is no currently blocking call, then the next blocking + # call will be affected, even if it is within the same thread that + # originated the interrupt. + # + def interrupt + check_for_error(Cproton.pn_messenger_interrupt(@impl)) + end + + # Sends or receives any outstanding messages queued for a Messenger. + # + # This will block for the indicated timeout. This method may also do I/O + # other than sending and receiving messages. For example, closing + # connections after stop() has been called. + # def work(timeout=-1) err = Cproton.pn_messenger_work(@impl, timeout) if (err == Cproton::PN_TIMEOUT) then return false else @@ -267,10 +374,90 @@ # def incoming Cproton.pn_messenger_incoming(@impl) end + # Adds a routing rule to the Messenger's internal routing table. + # + # The route procedure may be used to influence how a Messenger will + # internally treat a given address or class of addresses. Every call + # to the route procedure will result in Messenger appending a routing + # rule to its internal routing table. + # + # Whenever a Message is presented to a Messenger for delivery, it + # will match the address of this message against the set of routing + # rules in order. The first rule to match will be triggered, and + # instead of routing based on the address presented in the message, + # the Messenger will route based on the address supplied in the rule. + # + # The pattern matching syntax supports two types of matches, a '%' + # will match any character except a '/', and a '*' will match any + # character including a '/'. + # + # A routing address is specified as a normal AMQP address, however it + # may additionally use substitution variables from the pattern match + # that triggered the rule. + # + # ==== Arguments + # + # * pattern - the address pattern + # * address - the target address + # + # ==== Examples + # + # # route messages sent to foo to the destionaty amqp://foo.com + # messenger.route("foo", "amqp://foo.com") + # + # # any message to foobar will be routed to amqp://foo.com/bar + # messenger.route("foobar", "amqp://foo.com/bar") + # + # # any message to bar/<path> will be routed to the same path within + # # the amqp://bar.com domain + # messenger.route("bar/*", "amqp://bar.com/$1") + # + # # route all Message objects over TLS + # messenger.route("amqp:*", "amqps:$1") + # + # # supply credentials for foo + # messenger.route("amqp://foo.com/*", "amqp://user:password@foo.com/$1") + # + # # supply credentials for all domains + # messenger.route("amqp://*", "amqp://user:password@$1") + # + # # route all addresses through a single proxy while preserving the + # # original destination + # messenger.route("amqp://%$/*", "amqp://user:password@proxy/$1/$2") + # + # # route any address through a single broker + # messenger.route("*", "amqp://user:password@broker/$1") + # + def route(pattern, address) + check_for_error(Cproton.pn_messenger_route(@impl, pattern, address)) + end + + # Similar to #route, except that the destination of + # the Message is determined before the message address is rewritten. + # + # The outgoing address is only rewritten after routing has been + # finalized. If a message has an outgoing address of + # "amqp://0.0.0.0:5678", and a rewriting rule that changes its + # outgoing address to "foo", it will still arrive at the peer that + # is listening on "amqp://0.0.0.0:5678", but when it arrives there, + # the receiver will see its outgoing address as "foo". + # + # The default rewrite rule removes username and password from addresses + # before they are transmitted. + # + # ==== Arguments + # + # * pattern - the outgoing address + # * address - the target address + # + def rewrite(pattern, address) + check_for_error(Cproton.pn_messenger_rewrite(@impl, pattern, address)) + end + # Returns a +Tracker+ for the message most recently sent via the put # method. # def outgoing_tracker impl = Cproton.pn_messenger_outgoing_tracker(@impl) @@ -284,16 +471,19 @@ impl = Cproton.pn_messenger_incoming_tracker(@impl) return nil if impl == -1 Qpid::Proton::Tracker.new(impl) end - # Accepts the incoming message identified by the tracker. + # Signal the sender that you have acted on the Message + # pointed to by the tracker. If no tracker is supplied, + # then all messages that have been returned by the get + # method are accepted, except those that have already been + # auto-settled by passing beyond your incoming window size. # # ==== Options # # * tracker - the tracker - # * flag - the flag # def accept(tracker = nil) raise TypeError.new("invalid tracker: #{tracker}") unless tracker.nil? or valid_tracker?(tracker) if tracker.nil? then tracker = self.incoming_tracker @@ -303,15 +493,17 @@ end check_for_error(Cproton.pn_messenger_accept(@impl, tracker.impl, flag)) end # Rejects the incoming message identified by the tracker. + # If no tracker is supplied, all messages that have been returned + # by the get method are rejected, except those that have already + # been auto-settled by passing beyond your outgoing window size. # # ==== Options # # * tracker - the tracker - # * flag - the flag # def reject(tracker) raise TypeError.new("invalid tracker: #{tracker}") unless tracker.nil? or valid_tracker?(tracker) if tracker.nil? then tracker = self.incoming_tracker @@ -321,43 +513,56 @@ end check_for_error(Cproton.pn_messenger_reject(@impl, tracker.impl, flag)) end # Gets the last known remote state of the delivery associated with - # the given tracker. See TrackerStatus for details on the values - # returned. + # the given tracker, as long as the Message is still within your + # outgoing window. (Also works on incoming messages that are still + # within your incoming queue. See TrackerStatus for details on the + # values returned. # # ==== Options # # * tracker - the tracker # def status(tracker) raise TypeError.new("invalid tracker: #{tracker}") unless valid_tracker?(tracker) Qpid::Proton::TrackerStatus.by_value(Cproton.pn_messenger_status(@impl, tracker.impl)) end - # Settles messages for a tracker. + # Frees a Messenger from tracking the status associated + # with a given tracker. If you don't supply a tracker, all + # outgoing messages up to the most recent will be settled. # # ==== Options # # * tracker - the tracker - # * flag - the flag # # ==== Examples # - def settle(tracker, flag) + def settle(tracker) raise TypeError.new("invalid tracker: #{tracker}") unless valid_tracker?(tracker) - raise TypeError.new("invalid flag: #{flag}") unless Qpid::Proton::Tracker.valid_flag?(flag) + if tracker.nil? then + tracker = self.incoming_tracker + flag = Cproton::PN_CUMULATIVE + else + flag = 0 + end Cproton.pn_messenger_settle(@impl, tracker.impl, flag) end # Sets the incoming window. # - # If the incoming window is set to a positive value, then after each - # call to #accept or #reject, the object will track the status of that - # many deliveries. + # The Messenger will track the remote status of this many incoming + # deliveries after they have been accepted or rejected. # + # Messages enter this window only when you take them into your application + # using get(). If your incoming window size is n, and you get n+1 messages + # without explicitly accepting or rejecting the oldest message, then the + # message that passes beyond the edge of the incoming window will be + # assigned the default disposition of its link. + # # ==== Options # # * window - the window size # def incoming_window=(window) @@ -369,13 +574,17 @@ # def incoming_window Cproton.pn_messenger_get_incoming_window(@impl) end - #Sets the outgoing window. + # Sets the outgoing window. # - # If the outgoing window is set to a positive value, then after each call - # to #send, the object will track the status of that many deliveries. + # The Messenger will track the remote status of this many outgoing + # deliveries after calling send. + # A Message enters this window when you call the put() method with the + # message. If your outgoing window size is n, and you call put n+1 + # times, status information will no longer be available for the + # first message. # # ==== Options # # * window - the window size #