lib/qpid_proton/messenger.rb in qpid_proton-0.4 vs lib/qpid_proton/messenger.rb in qpid_proton-0.5
- old
+ new
@@ -26,18 +26,10 @@
#
# ==== Examples
#
class Messenger
- # Automatically accept every message as it is returned by #get
- #
- ACCEPT_MODE_AUTO = Cproton::PN_ACCEPT_MODE_AUTO
-
- # Messages must be manually accepted or rejected using #accept
- #
- ACCEPT_MODE_MANUAL = Cproton::PN_ACCEPT_MODE_MANUAL
-
include Qpid::Proton::ExceptionHandling
# Creates a new +Messenger+.
#
# The +name+ parameter is optional. If one is not provided then
@@ -52,11 +44,10 @@
ObjectSpace.define_finalizer(self, self.class.finalize!(@impl))
end
def self.finalize!(impl) # :nodoc:
proc {
- Cproton.pn_messenger_stop(impl)
Cproton.pn_messenger_free(impl)
}
end
# Returns the name.
@@ -82,10 +73,18 @@
#
def timeout
Cproton.pn_messenger_get_timeout(@impl)
end
+ def blocking
+ Cproton.pn_mesenger_is_blocking(@impl)
+ end
+
+ def blocking=(blocking)
+ Cproton.pn_messenger_set_blocking(@impl, blocking)
+ end
+
# Reports whether an error occurred.
#
def error?
!Cproton.pn_messenger_errno(@impl).zero?
end
@@ -97,11 +96,11 @@
end
# Returns the most recent error message.
#
def error
- Cproton.pn_messenger_error(@impl)
+ Cproton.pn_error_text(Cproton.pn_messenger_error(@impl))
end
# Starts the +Messenger+, allowing it to begin sending and
# receiving messages.
#
@@ -114,10 +113,14 @@
#
def stop
check_for_error(Cproton.pn_messenger_stop(@impl))
end
+ def stopped
+ Cproton.pn_messenger_stopped(@impl)
+ end
+
# Subscribes the +Messenger+ to a remote address.
#
def subscribe(address)
raise TypeError.new("invalid address: #{address}") if address.nil?
subscription = Cproton.pn_messenger_subscribe(@impl, address)
@@ -191,18 +194,21 @@
# * message - the message
#
def put(message)
raise TypeError.new("invalid message: #{message}") if message.nil?
raise ArgumentError.new("invalid message type: #{message.class}") unless message.kind_of?(Message)
+ # encode the message first
+ message.pre_encode
check_for_error(Cproton.pn_messenger_put(@impl, message.impl))
+ return outgoing_tracker
end
# Sends all outgoing messages, blocking until the outgoing queue
# is empty.
#
- def send
- check_for_error(Cproton.pn_messenger_send(@impl))
+ def send(n = -1)
+ check_for_error(Cproton.pn_messenger_send(@impl, n))
end
# Gets a single message incoming message from the local queue.
#
# If no message is provided in the argument, then one is created. In
@@ -211,28 +217,46 @@
# ==== Options
#
# * msg - the (optional) +Message+ instance to be used
#
def get(msg = nil)
- msg = Qpid::Proton::Message.new if msg.nil?
- check_for_error(Cproton.pn_messenger_get(@impl, msg.impl))
- return msg
+ msg_impl = nil
+ if msg.nil? then
+ msg_impl = nil
+ else
+ msg_impl = msg.impl
+ end
+ check_for_error(Cproton.pn_messenger_get(@impl, msg_impl))
+ msg.post_decode unless msg.nil?
+ return incoming_tracker
end
# Receives up to the specified number of messages, blocking until at least
# one message is received.
#
# Options ====
#
- # * max - the maximum number of messages to receive
+ # * limit - the maximum number of messages to receive
#
- def receive(max)
- raise TypeError.new("invalid max: #{max}") if max.nil? || max.to_i.zero?
- raise RangeError.new("negative max: #{max}") if max < 0
- check_for_error(Cproton.pn_messenger_recv(@impl, max))
+ def receive(limit = -1)
+ check_for_error(Cproton.pn_messenger_recv(@impl, limit))
end
+ def receiving
+ Cproton.pn_messenger_receiving(@impl)
+ end
+
+ def work(timeout=-1)
+ err = Cproton.pn_messenger_work(@impl, timeout)
+ if (err == Cproton::PN_TIMEOUT) then
+ return false
+ else
+ check_for_error(err)
+ return true
+ end
+ end
+
# Returns the number messages in the outgoing queue that have not been
# transmitted.
#
def outgoing
Cproton.pn_messenger_outgoing(@impl)
@@ -260,54 +284,43 @@
impl = Cproton.pn_messenger_incoming_tracker(@impl)
return nil if impl == -1
Qpid::Proton::Tracker.new(impl)
end
- # Set the accept mode for the Messenger. See #ACCEPT_MODE_AUTO and
- # #ACCEPT_MODE_MANUAL for more details
- #
- # ==== Options
- #
- # * mode - the acceptance mode
- #
- # ==== Examples
- #
- # @messenger.accept_mode = Qpid::Proton::Messenger::ACCEPT_MODE_AUTO
- #
- def accept_mode=(mode)
- raise TypeError.new("Invalid mode: #{mode}") unless valid_mode?(mode)
- Cproton.pn_messenger_set_accept_mode(@impl, mode)
- end
-
- # Returns the current acceptance mode for the Messenger.
- #
- def accept_mode
- Cproton.pn_messenger_get_accept_mode(@impl)
- end
-
# Accepts the incoming message identified by the tracker.
#
# ==== Options
#
# * tracker - the tracker
# * flag - the flag
#
- def accept(tracker, flag)
- raise TypeError.new("invalid tracker: #{tracker}") unless valid_tracker?(tracker)
- raise TypeError.new("invalid flag: #{flag}") unless Qpid::Proton::Tracker.valid_flag?(flag)
+ def accept(tracker = nil)
+ raise TypeError.new("invalid tracker: #{tracker}") unless tracker.nil? or valid_tracker?(tracker)
+ if tracker.nil? then
+ tracker = self.incoming_tracker
+ flag = Cproton::PN_CUMULATIVE
+ else
+ flag = 0
+ end
check_for_error(Cproton.pn_messenger_accept(@impl, tracker.impl, flag))
end
# Rejects the incoming message identified by the tracker.
#
# ==== Options
#
# * tracker - the tracker
# * flag - the flag
#
- def reject(tracker, flag)
- raise TypeError.new("invalid tracker: #{tracker}") unless valid_tracker?(tracker)
+ def reject(tracker)
+ raise TypeError.new("invalid tracker: #{tracker}") unless tracker.nil? or valid_tracker?(tracker)
+ if tracker.nil? then
+ tracker = self.incoming_tracker
+ flag = Cproton::PN_CUMULATIVE
+ else
+ flag = 0
+ end
check_for_error(Cproton.pn_messenger_reject(@impl, tracker.impl, flag))
end
# Gets the last known remote state of the delivery associated with
# the given tracker. See TrackerStatus for details on the values
@@ -380,13 +393,9 @@
private
def valid_tracker?(tracker)
!tracker.nil? && tracker.is_a?(Qpid::Proton::Tracker)
- end
-
- def valid_mode?(mode)
- [ACCEPT_MODE_AUTO, ACCEPT_MODE_MANUAL].include?(mode)
end
def valid_window?(window)
!window.nil? && [Float, Fixnum].include?(window.class)
end