lib/qpid_proton/messenger.rb in qpid_proton-0.4 vs lib/qpid_proton/messenger.rb in qpid_proton-0.5

- old
+ new

@@ -26,18 +26,10 @@ # # ==== Examples # class Messenger - # Automatically accept every message as it is returned by #get - # - ACCEPT_MODE_AUTO = Cproton::PN_ACCEPT_MODE_AUTO - - # Messages must be manually accepted or rejected using #accept - # - ACCEPT_MODE_MANUAL = Cproton::PN_ACCEPT_MODE_MANUAL - include Qpid::Proton::ExceptionHandling # Creates a new +Messenger+. # # The +name+ parameter is optional. If one is not provided then @@ -52,11 +44,10 @@ ObjectSpace.define_finalizer(self, self.class.finalize!(@impl)) end def self.finalize!(impl) # :nodoc: proc { - Cproton.pn_messenger_stop(impl) Cproton.pn_messenger_free(impl) } end # Returns the name. @@ -82,10 +73,18 @@ # def timeout Cproton.pn_messenger_get_timeout(@impl) end + def blocking + Cproton.pn_mesenger_is_blocking(@impl) + end + + def blocking=(blocking) + Cproton.pn_messenger_set_blocking(@impl, blocking) + end + # Reports whether an error occurred. # def error? !Cproton.pn_messenger_errno(@impl).zero? end @@ -97,11 +96,11 @@ end # Returns the most recent error message. # def error - Cproton.pn_messenger_error(@impl) + Cproton.pn_error_text(Cproton.pn_messenger_error(@impl)) end # Starts the +Messenger+, allowing it to begin sending and # receiving messages. # @@ -114,10 +113,14 @@ # def stop check_for_error(Cproton.pn_messenger_stop(@impl)) end + def stopped + Cproton.pn_messenger_stopped(@impl) + end + # Subscribes the +Messenger+ to a remote address. # def subscribe(address) raise TypeError.new("invalid address: #{address}") if address.nil? subscription = Cproton.pn_messenger_subscribe(@impl, address) @@ -191,18 +194,21 @@ # * message - the message # def put(message) raise TypeError.new("invalid message: #{message}") if message.nil? raise ArgumentError.new("invalid message type: #{message.class}") unless message.kind_of?(Message) + # encode the message first + message.pre_encode check_for_error(Cproton.pn_messenger_put(@impl, message.impl)) + return outgoing_tracker end # Sends all outgoing messages, blocking until the outgoing queue # is empty. # - def send - check_for_error(Cproton.pn_messenger_send(@impl)) + def send(n = -1) + check_for_error(Cproton.pn_messenger_send(@impl, n)) end # Gets a single message incoming message from the local queue. # # If no message is provided in the argument, then one is created. In @@ -211,28 +217,46 @@ # ==== Options # # * msg - the (optional) +Message+ instance to be used # def get(msg = nil) - msg = Qpid::Proton::Message.new if msg.nil? - check_for_error(Cproton.pn_messenger_get(@impl, msg.impl)) - return msg + msg_impl = nil + if msg.nil? then + msg_impl = nil + else + msg_impl = msg.impl + end + check_for_error(Cproton.pn_messenger_get(@impl, msg_impl)) + msg.post_decode unless msg.nil? + return incoming_tracker end # Receives up to the specified number of messages, blocking until at least # one message is received. # # Options ==== # - # * max - the maximum number of messages to receive + # * limit - the maximum number of messages to receive # - def receive(max) - raise TypeError.new("invalid max: #{max}") if max.nil? || max.to_i.zero? - raise RangeError.new("negative max: #{max}") if max < 0 - check_for_error(Cproton.pn_messenger_recv(@impl, max)) + def receive(limit = -1) + check_for_error(Cproton.pn_messenger_recv(@impl, limit)) end + def receiving + Cproton.pn_messenger_receiving(@impl) + end + + def work(timeout=-1) + err = Cproton.pn_messenger_work(@impl, timeout) + if (err == Cproton::PN_TIMEOUT) then + return false + else + check_for_error(err) + return true + end + end + # Returns the number messages in the outgoing queue that have not been # transmitted. # def outgoing Cproton.pn_messenger_outgoing(@impl) @@ -260,54 +284,43 @@ impl = Cproton.pn_messenger_incoming_tracker(@impl) return nil if impl == -1 Qpid::Proton::Tracker.new(impl) end - # Set the accept mode for the Messenger. See #ACCEPT_MODE_AUTO and - # #ACCEPT_MODE_MANUAL for more details - # - # ==== Options - # - # * mode - the acceptance mode - # - # ==== Examples - # - # @messenger.accept_mode = Qpid::Proton::Messenger::ACCEPT_MODE_AUTO - # - def accept_mode=(mode) - raise TypeError.new("Invalid mode: #{mode}") unless valid_mode?(mode) - Cproton.pn_messenger_set_accept_mode(@impl, mode) - end - - # Returns the current acceptance mode for the Messenger. - # - def accept_mode - Cproton.pn_messenger_get_accept_mode(@impl) - end - # Accepts the incoming message identified by the tracker. # # ==== Options # # * tracker - the tracker # * flag - the flag # - def accept(tracker, flag) - raise TypeError.new("invalid tracker: #{tracker}") unless valid_tracker?(tracker) - raise TypeError.new("invalid flag: #{flag}") unless Qpid::Proton::Tracker.valid_flag?(flag) + def accept(tracker = nil) + raise TypeError.new("invalid tracker: #{tracker}") unless tracker.nil? or valid_tracker?(tracker) + if tracker.nil? then + tracker = self.incoming_tracker + flag = Cproton::PN_CUMULATIVE + else + flag = 0 + end check_for_error(Cproton.pn_messenger_accept(@impl, tracker.impl, flag)) end # Rejects the incoming message identified by the tracker. # # ==== Options # # * tracker - the tracker # * flag - the flag # - def reject(tracker, flag) - raise TypeError.new("invalid tracker: #{tracker}") unless valid_tracker?(tracker) + def reject(tracker) + raise TypeError.new("invalid tracker: #{tracker}") unless tracker.nil? or valid_tracker?(tracker) + if tracker.nil? then + tracker = self.incoming_tracker + flag = Cproton::PN_CUMULATIVE + else + flag = 0 + end check_for_error(Cproton.pn_messenger_reject(@impl, tracker.impl, flag)) end # Gets the last known remote state of the delivery associated with # the given tracker. See TrackerStatus for details on the values @@ -380,13 +393,9 @@ private def valid_tracker?(tracker) !tracker.nil? && tracker.is_a?(Qpid::Proton::Tracker) - end - - def valid_mode?(mode) - [ACCEPT_MODE_AUTO, ACCEPT_MODE_MANUAL].include?(mode) end def valid_window?(window) !window.nil? && [Float, Fixnum].include?(window.class) end