lib/qpid_proton/messenger.rb in qpid_proton-0.7 vs lib/qpid_proton/messenger.rb in qpid_proton-0.7.1

- old
+ new

@@ -34,23 +34,23 @@ # # === Sending & Receiving Messages # # The Messenger class works in conjuction with the Message class. The # Message class is a mutable holder of message content. - # - # The put method copies its Message to the outgoing queue, and may + # + # The put method copies its Message to the outgoing queue, and may # send queued messages if it can do so without blocking. The send # method blocks until it has sent the requested number of messages, # or until a timeout interrupts the attempt. - # + # # Similarly, the recv method receives messages into the incoming # queue, and may block as it attempts to receive the requested number # of messages, or until timeout is reached. It may receive fewer # than the requested number. The get method pops the # eldest Message off the incoming queue and copies it into the Message # object that you supply. It will not block. - # + # # The blocking attribute allows you to turn off blocking behavior entirely, # in which case send and recv will do whatever they can without # blocking, and then return. You can then look at the number # of incoming and outgoing messages to see how much outstanding work # still remains. @@ -68,10 +68,11 @@ # # * name - the name (def. nil) # def initialize(name = nil) @impl = Cproton.pn_messenger(name) + @selectables = {} ObjectSpace.define_finalizer(self, self.class.finalize!(@impl)) end def self.finalize!(impl) # :nodoc: proc { @@ -134,10 +135,34 @@ # Sets the blocking mode. def blocking=(blocking) Cproton.pn_messenger_set_blocking(@impl, blocking) end + # Returns true if passive mode is enabled. + # + def passive? + Cproton.pn_messenger_is_passive(@impl) + end + + # Turns passive mode on or off. + # + # When set to passive mode, Messenger will not attempt to perform I/O + # operations internally. In this mode it is necesssary to use the + # Selectable type to drive any I/O needed to perform requestioned + # actions. + # + # In this mode Messenger will never block. + # + def passive=(mode) + Cproton.pn_messenger_set_passive(@impl, mode) + end + + def deadline + tstamp = Cproton.pn_messenger_deadline(@impl) + return tstamp / 1000.0 unless tstamp.nil? + end + # Reports whether an error occurred. # def error? !Cproton.pn_messenger_errno(@impl).zero? end @@ -455,10 +480,26 @@ # def rewrite(pattern, address) check_for_error(Cproton.pn_messenger_rewrite(@impl, pattern, address)) end + def selectable + impl = Cproton.pn_messenger_selectable(@impl) + + # if we don't have any selectables, then return + return nil if impl.nil? + + fd = Cproton.pn_selectable_fd(impl) + + selectable = @selectables[fd] + if selectable.nil? + selectable = Selectable.new(self, impl) + @selectables[fd] = selectable + end + return selectable + end + # Returns a +Tracker+ for the message most recently sent via the put # method. # def outgoing_tracker impl = Cproton.pn_messenger_outgoing_tracker(@impl) @@ -597,9 +638,14 @@ # Returns the outgoing window. # def outgoing_window Cproton.pn_messenger_get_outgoing_window(@impl) + end + + # Unregisters a selectable object. + def unregister_selectable(fileno) # :nodoc: + @selectables.delete(fileno) end private def valid_tracker?(tracker)