lib/qpid_proton/messenger.rb in qpid_proton-0.7 vs lib/qpid_proton/messenger.rb in qpid_proton-0.7.1
- old
+ new
@@ -34,23 +34,23 @@
#
# === Sending & Receiving Messages
#
# The Messenger class works in conjuction with the Message class. The
# Message class is a mutable holder of message content.
- #
- # The put method copies its Message to the outgoing queue, and may
+ #
+ # The put method copies its Message to the outgoing queue, and may
# send queued messages if it can do so without blocking. The send
# method blocks until it has sent the requested number of messages,
# or until a timeout interrupts the attempt.
- #
+ #
# Similarly, the recv method receives messages into the incoming
# queue, and may block as it attempts to receive the requested number
# of messages, or until timeout is reached. It may receive fewer
# than the requested number. The get method pops the
# eldest Message off the incoming queue and copies it into the Message
# object that you supply. It will not block.
- #
+ #
# The blocking attribute allows you to turn off blocking behavior entirely,
# in which case send and recv will do whatever they can without
# blocking, and then return. You can then look at the number
# of incoming and outgoing messages to see how much outstanding work
# still remains.
@@ -68,10 +68,11 @@
#
# * name - the name (def. nil)
#
def initialize(name = nil)
@impl = Cproton.pn_messenger(name)
+ @selectables = {}
ObjectSpace.define_finalizer(self, self.class.finalize!(@impl))
end
def self.finalize!(impl) # :nodoc:
proc {
@@ -134,10 +135,34 @@
# Sets the blocking mode.
def blocking=(blocking)
Cproton.pn_messenger_set_blocking(@impl, blocking)
end
+ # Returns true if passive mode is enabled.
+ #
+ def passive?
+ Cproton.pn_messenger_is_passive(@impl)
+ end
+
+ # Turns passive mode on or off.
+ #
+ # When set to passive mode, Messenger will not attempt to perform I/O
+ # operations internally. In this mode it is necesssary to use the
+ # Selectable type to drive any I/O needed to perform requestioned
+ # actions.
+ #
+ # In this mode Messenger will never block.
+ #
+ def passive=(mode)
+ Cproton.pn_messenger_set_passive(@impl, mode)
+ end
+
+ def deadline
+ tstamp = Cproton.pn_messenger_deadline(@impl)
+ return tstamp / 1000.0 unless tstamp.nil?
+ end
+
# Reports whether an error occurred.
#
def error?
!Cproton.pn_messenger_errno(@impl).zero?
end
@@ -455,10 +480,26 @@
#
def rewrite(pattern, address)
check_for_error(Cproton.pn_messenger_rewrite(@impl, pattern, address))
end
+ def selectable
+ impl = Cproton.pn_messenger_selectable(@impl)
+
+ # if we don't have any selectables, then return
+ return nil if impl.nil?
+
+ fd = Cproton.pn_selectable_fd(impl)
+
+ selectable = @selectables[fd]
+ if selectable.nil?
+ selectable = Selectable.new(self, impl)
+ @selectables[fd] = selectable
+ end
+ return selectable
+ end
+
# Returns a +Tracker+ for the message most recently sent via the put
# method.
#
def outgoing_tracker
impl = Cproton.pn_messenger_outgoing_tracker(@impl)
@@ -597,9 +638,14 @@
# Returns the outgoing window.
#
def outgoing_window
Cproton.pn_messenger_get_outgoing_window(@impl)
+ end
+
+ # Unregisters a selectable object.
+ def unregister_selectable(fileno) # :nodoc:
+ @selectables.delete(fileno)
end
private
def valid_tracker?(tracker)