lib/qpid_proton/messenger.rb in qpid_proton-0.8 vs lib/qpid_proton/messenger.rb in qpid_proton-0.9.0
- old
+ new
@@ -15,13 +15,13 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
-module Qpid
+module Qpid # :nodoc:
- module Proton
+ module Proton # :nodoc:
# The +Messenger+ class defines a high level interface for
# sending and receiving Messages. Every Messenger contains
# a single logical queue of incoming messages and a single
# logical queue of outgoing messages. These messages in these
@@ -57,10 +57,15 @@
#
class Messenger
include Qpid::Proton::ExceptionHandling
+ can_raise_exception [:send, :receive, :password=, :start, :stop,
+ :perform_put, :perform_get, :interrupt,
+ :route, :rewrite, :accept, :reject,
+ :incoming_window=, :outgoing_window=]
+
# Creates a new +Messenger+.
#
# The +name+ parameter is optional. If one is not provided then
# a unique name is generated.
#
@@ -92,11 +97,11 @@
# ==== Arguments
#
# * password - the password
#
def password=(password)
- check_for_error(Cproton.pn_messenger_set_password(@impl, password))
+ Cproton.pn_messenger_set_password(@impl, password)
end
# Returns the password property for the Messenger.private_key file.
#
def password
@@ -177,45 +182,60 @@
#
def error
Cproton.pn_error_text(Cproton.pn_messenger_error(@impl))
end
+ # Clears the current error state.
+ #
+ def clear_error
+ error = Cproton.pn_messenger_error(@impl)
+ unless error.nil?
+ Cproton.pn_error_clear(error)
+ end
+ end
+
# Currently a no-op placeholder.
# For future compatibility, do not send or recv messages
# before starting the +Messenger+.
#
def start
- check_for_error(Cproton.pn_messenger_start(@impl))
+ Cproton.pn_messenger_start(@impl)
end
# Stops the +Messenger+, preventing it from sending or receiving
# any more messages.
#
def stop
- check_for_error(Cproton.pn_messenger_stop(@impl))
+ Cproton.pn_messenger_stop(@impl)
end
- # Returns true iff a Messenger is in the stopped state.
+ # Returns true if a Messenger is in the stopped state.
# This function does not block.
#
- def stopped
+ def stopped?
Cproton.pn_messenger_stopped(@impl)
end
# Subscribes the Messenger to messages originating from the
# specified source. The source is an address as specified in the
# Messenger introduction with the following addition. If the
# domain portion of the address begins with the '~' character, the
# Messenger will interpret the domain as host/port, bind to it,
# and listen for incoming messages. For example "~0.0.0.0",
- # "amqp://~0.0.0.0" will all bind to any local interface and
- # listen for incoming messages. Ad address of # "amqps://~0.0.0.0"
+ # "amqp://~0.0.0.0" will all bind to any local interface and
+ # listen for incoming messages. An address of "amqps://~0.0.0.0"
# will only permit incoming SSL connections.
#
- def subscribe(address)
+ # ==== Options
+ #
+ # * address - the source address to be subscribe
+ # * timeout - an optional time-to-live value, in seconds, for the
+ # subscription
+ #
+ def subscribe(address, timeout=0)
raise TypeError.new("invalid address: #{address}") if address.nil?
- subscription = Cproton.pn_messenger_subscribe(@impl, address)
+ subscription = Cproton.pn_messenger_subscribe_ttl(@impl, address, timeout)
raise Qpid::Proton::ProtonError.new("Subscribe failed") if subscription.nil?
Qpid::Proton::Subscription.new(subscription)
end
# Path to a certificate file for the +Messenger+.
@@ -280,14 +300,14 @@
end
# Places the content contained in the message onto the outgoing
# queue of the Messenger.
#
- # This method will never block, however it will send any unblocked
+ # This method will never block, however it will send any unblocked
# Messages in the outgoing queue immediately and leave any blocked
# Messages remaining in the outgoing queue.
- # The send call may then be used to block until the outgoing queue
+ # The send call may then be used to block until the outgoing queue
# is empty. The outgoing attribute may be used to check the depth
# of the outgoing queue.
#
# ==== Options
#
@@ -296,22 +316,31 @@
def put(message)
raise TypeError.new("invalid message: #{message}") if message.nil?
raise ArgumentError.new("invalid message type: #{message.class}") unless message.kind_of?(Message)
# encode the message first
message.pre_encode
- check_for_error(Cproton.pn_messenger_put(@impl, message.impl))
+ perform_put(message)
return outgoing_tracker
end
+ private
+
+ def perform_put(message) # :nodoc:
+ Cproton.pn_messenger_put(@impl, message.impl)
+ end
+
+ public
+
+
# This call will block until the indicated number of messages
# have been sent, or until the operation times out.
- # If n is -1 this call will block until all outgoing messages
- # have been sent. If n is 0 then this call will send whatever
+ # If n is -1 this call will block until all outgoing messages
+ # have been sent. If n is 0 then this call will send whatever
# it can without blocking.
#
def send(n = -1)
- check_for_error(Cproton.pn_messenger_send(@impl, n))
+ Cproton.pn_messenger_send(@impl, n)
end
# Moves the message from the head of the incoming message queue into
# the supplied message object. Any content in the supplied message
# will be overwritten.
@@ -331,15 +360,23 @@
if msg.nil? then
msg_impl = nil
else
msg_impl = msg.impl
end
- check_for_error(Cproton.pn_messenger_get(@impl, msg_impl))
+ perform_get(msg_impl)
msg.post_decode unless msg.nil?
return incoming_tracker
end
+ private
+
+ def perform_get(msg) # :nodoc:
+ Cproton.pn_messenger_get(@impl, msg)
+ end
+
+ public
+
# Receives up to limit messages into the incoming queue. If no value
# for limit is supplied, this call will receive as many messages as it
# can buffer internally. If the Messenger is in blocking mode, this
# call will block until at least one Message is available in the
# incoming queue.
@@ -347,14 +384,15 @@
# Options ====
#
# * limit - the maximum number of messages to receive
#
def receive(limit = -1)
- check_for_error(Cproton.pn_messenger_recv(@impl, limit))
+ Cproton.pn_messenger_recv(@impl, limit)
end
- def receiving
+ # Returns true if the messenger is currently receiving data.
+ def receiving?
Cproton.pn_messenger_receiving(@impl)
end
# Attempts interrupting of the messenger thread.
#
@@ -367,19 +405,19 @@
# If there is no currently blocking call, then the next blocking
# call will be affected, even if it is within the same thread that
# originated the interrupt.
#
def interrupt
- check_for_error(Cproton.pn_messenger_interrupt(@impl))
+ Cproton.pn_messenger_interrupt(@impl)
end
# Sends or receives any outstanding messages queued for a Messenger.
#
# This will block for the indicated timeout. This method may also do I/O
# other than sending and receiving messages. For example, closing
# connections after stop() has been called.
- #
+ #
def work(timeout=-1)
err = Cproton.pn_messenger_work(@impl, timeout)
if (err == Cproton::PN_TIMEOUT) then
return false
else
@@ -455,11 +493,11 @@
#
# # route any address through a single broker
# messenger.route("*", "amqp://user:password@broker/$1")
#
def route(pattern, address)
- check_for_error(Cproton.pn_messenger_route(@impl, pattern, address))
+ Cproton.pn_messenger_route(@impl, pattern, address)
end
# Similar to #route, except that the destination of
# the Message is determined before the message address is rewritten.
#
@@ -477,11 +515,11 @@
#
# * pattern - the outgoing address
# * address - the target address
#
def rewrite(pattern, address)
- check_for_error(Cproton.pn_messenger_rewrite(@impl, pattern, address))
+ Cproton.pn_messenger_rewrite(@impl, pattern, address)
end
def selectable
impl = Cproton.pn_messenger_selectable(@impl)
@@ -531,11 +569,11 @@
tracker = self.incoming_tracker
flag = Cproton::PN_CUMULATIVE
else
flag = 0
end
- check_for_error(Cproton.pn_messenger_accept(@impl, tracker.impl, flag))
+ Cproton.pn_messenger_accept(@impl, tracker.impl, flag)
end
# Rejects the incoming message identified by the tracker.
# If no tracker is supplied, all messages that have been returned
# by the get method are rejected, except those that have already
@@ -551,17 +589,17 @@
tracker = self.incoming_tracker
flag = Cproton::PN_CUMULATIVE
else
flag = 0
end
- check_for_error(Cproton.pn_messenger_reject(@impl, tracker.impl, flag))
+ Cproton.pn_messenger_reject(@impl, tracker.impl, flag)
end
# Gets the last known remote state of the delivery associated with
- # the given tracker, as long as the Message is still within your
- # outgoing window. (Also works on incoming messages that are still
- # within your incoming queue. See TrackerStatus for details on the
+ # the given tracker, as long as the Message is still within your
+ # outgoing window. (Also works on incoming messages that are still
+ # within your incoming queue. See TrackerStatus for details on the
# values returned.
#
# ==== Options
#
# * tracker - the tracker
@@ -592,37 +630,37 @@
Cproton.pn_messenger_settle(@impl, tracker.impl, flag)
end
# Sets the incoming window.
#
- # The Messenger will track the remote status of this many incoming
+ # The Messenger will track the remote status of this many incoming
# deliveries after they have been accepted or rejected.
#
# Messages enter this window only when you take them into your application
# using get(). If your incoming window size is n, and you get n+1 messages
# without explicitly accepting or rejecting the oldest message, then the
- # message that passes beyond the edge of the incoming window will be
+ # message that passes beyond the edge of the incoming window will be
# assigned the default disposition of its link.
#
# ==== Options
#
# * window - the window size
#
def incoming_window=(window)
raise TypeError.new("invalid window: #{window}") unless valid_window?(window)
- check_for_error(Cproton.pn_messenger_set_incoming_window(@impl, window))
+ Cproton.pn_messenger_set_incoming_window(@impl, window)
end
# Returns the incoming window.
#
def incoming_window
Cproton.pn_messenger_get_incoming_window(@impl)
end
# Sets the outgoing window.
#
- # The Messenger will track the remote status of this many outgoing
+ # The Messenger will track the remote status of this many outgoing
# deliveries after calling send.
# A Message enters this window when you call the put() method with the
# message. If your outgoing window size is n, and you call put n+1
# times, status information will no longer be available for the
# first message.
@@ -631,10 +669,10 @@
#
# * window - the window size
#
def outgoing_window=(window)
raise TypeError.new("invalid window: #{window}") unless valid_window?(window)
- check_for_error(Cproton.pn_messenger_set_outgoing_window(@impl, window))
+ Cproton.pn_messenger_set_outgoing_window(@impl, window)
end
# Returns the outgoing window.
#
def outgoing_window