lib/wamp/client/session.rb in wamp_client-0.1.3 vs lib/wamp/client/session.rb in wamp_client-0.1.4
- old
+ new
@@ -159,11 +159,11 @@
end
end
class Session
- include Wamp::Client::Check
+ include Check
# on_join callback is called when the session joins the router. It has the following parameters
# @param details [Hash] Object containing information about the joined session
@on_join
def on_join(&on_join)
@@ -206,11 +206,11 @@
# Private attributes
attr_accessor :_goodbye_sent, :_requests, :_subscriptions, :_registrations, :_defers
# Constructor
- # @param transport [Wamp::Client::Transport::Base] The transport that the session will use
+ # @param transport [Transport::Base] The transport that the session will use
# @param options [Hash] Hash containing different session options
# @option options [String] :authid The authentication ID
# @option options [Array] :authmethods Different auth methods that this client supports
def initialize(transport, options={})
@@ -266,16 +266,16 @@
self.realm = realm
details = {}
details[:roles] = WAMP_FEATURES
- details[:agent] = "Ruby-Wamp::Client-#{Wamp::Client::VERSION}"
+ details[:agent] = "Ruby-Wamp::Client-#{VERSION}"
details[:authid] = self.options[:authid] if self.options[:authid]
details[:authmethods] = self.options[:authmethods] if self.options[:authmethods]
# Send Hello message
- hello = Wamp::Client::Message::Hello.new(realm, details)
+ hello = Message::Hello.new(realm, details)
self._send_message(hello)
end
# Leaves the WAMP Router
# @param reason [String] URI signalling the reason for leaving
@@ -289,32 +289,32 @@
details = {}
details[:message] = message
# Send Goodbye message
- goodbye = Wamp::Client::Message::Goodbye.new(details, reason)
+ goodbye = Message::Goodbye.new(details, reason)
self._send_message(goodbye)
self._goodbye_sent = true
end
# Generates an ID according to the specification (Section 5.1.2)
def _generate_id
rand(0..9007199254740992)
end
# Converts and error message to a hash
- # @param msg [Wamp::Client::Message::Error]
+ # @param msg [Message::Error]
def _error_to_hash(msg)
{
error: msg.error,
args: msg.arguments,
kwargs: msg.argumentskw
}
end
# Sends a message
- # @param msg [Wamp::Client::Message::Base]
+ # @param msg [Message::Base]
def _send_message(msg)
# Log the message
logger.debug("#{self.class.name} TX: #{msg.to_s}")
# Send it to the transport
@@ -327,29 +327,29 @@
# Print the raw message
logger.debug("#{self.class.name} RX(raw): #{msg.to_s}")
# Parse the WAMP message
- message = Wamp::Client::Message.parse(msg)
+ message = Message.parse(msg)
# Print the parsed WAMP message
logger.debug("#{self.class.name} RX: #{message.to_s}")
# WAMP Session is not open
if self.id.nil?
# Parse the welcome message
- if message.is_a? Wamp::Client::Message::Welcome
+ if message.is_a? Message::Welcome
# Get the session ID
self.id = message.session
# Log joining the session
logger.info("#{self.class.name} joined session with realm '#{message.details[:realm]}'")
# Call the callback if it is set
@on_join.call(message.details) unless @on_join.nil?
- elsif message.is_a? Wamp::Client::Message::Challenge
+ elsif message.is_a? Message::Challenge
# Log challenge received
logger.debug("#{self.class.name} auth challenge '#{message.authmethod}', extra: #{message.extra}")
# Call the callback if set
if @on_challenge
@@ -360,14 +360,14 @@
end
signature ||= ''
extra ||= {}
- authenticate = Wamp::Client::Message::Authenticate.new(signature, extra)
+ authenticate = Message::Authenticate.new(signature, extra)
self._send_message(authenticate)
- elsif message.is_a? Wamp::Client::Message::Abort
+ elsif message.is_a? Message::Abort
# Log leaving the session
logger.info("#{self.class.name} left session '#{message.reason}'")
# Call the callback if it is set
@on_leave.call(message.reason, message.details) unless @on_leave.nil?
@@ -375,15 +375,15 @@
# Wamp Session is open
else
# If goodbye, close the session
- if message.is_a? Wamp::Client::Message::Goodbye
+ if message.is_a? Message::Goodbye
# If we didn't send the goodbye, respond
unless self._goodbye_sent
- goodbye = Wamp::Client::Message::Goodbye.new({}, 'wamp.error.goodbye_and_out')
+ goodbye = Message::Goodbye.new({}, 'wamp.error.goodbye_and_out')
self._send_message(goodbye)
end
# Close out session
self.id = nil
@@ -426,16 +426,16 @@
# Create a new subscribe request
request = self._generate_id
self._requests[:subscribe][request] = {t: topic, h: handler, o: options, c: callback}
# Send the message
- subscribe = Wamp::Client::Message::Subscribe.new(request, options, topic)
+ subscribe = Message::Subscribe.new(request, options, topic)
self._send_message(subscribe)
end
# Processes the response to a subscribe request
- # @param msg [Wamp::Client::Message::Subscribed] The response from the subscribe
+ # @param msg [Message::Subscribed] The response from the subscribe
def _process_SUBSCRIBED(msg)
# Remove the pending subscription, add it to the registered ones, and inform the caller
s = self._requests[:subscribe].delete(msg.subscribe_request)
if s
@@ -452,11 +452,11 @@
end
end
# Processes an error from a request
- # @param msg [Wamp::Client::Message::Error] The response from the subscribe
+ # @param msg [Message::Error] The response from the subscribe
def _process_SUBSCRIBE_error(msg)
# Remove the pending subscription and inform the caller of the failure
s = self._requests[:subscribe].delete(msg.request_request)
if s
@@ -471,11 +471,11 @@
end
end
# Processes and event from the broker
- # @param msg [Wamp::Client::Message::Event] An event that was published
+ # @param msg [Message::Event] An event that was published
def _process_EVENT(msg)
args = msg.publish_arguments || []
kwargs = msg.publish_argumentskw || {}
@@ -508,16 +508,16 @@
# Create a new unsubscribe request
request = self._generate_id
self._requests[:unsubscribe][request] = { s: subscription, c: callback }
# Send the message
- unsubscribe = Wamp::Client::Message::Unsubscribe.new(request, subscription.id)
+ unsubscribe = Message::Unsubscribe.new(request, subscription.id)
self._send_message(unsubscribe)
end
# Processes the response to a unsubscribe request
- # @param msg [Wamp::Client::Message::Unsubscribed] The response from the unsubscribe
+ # @param msg [Message::Unsubscribed] The response from the unsubscribe
def _process_UNSUBSCRIBED(msg)
# Remove the pending unsubscription, add it to the registered ones, and inform the caller
s = self._requests[:unsubscribe].delete(msg.unsubscribe_request)
if s
@@ -535,11 +535,11 @@
end
# Processes an error from a request
- # @param msg [Wamp::Client::Message::Error] The response from the subscribe
+ # @param msg [Message::Error] The response from the subscribe
def _process_UNSUBSCRIBE_error(msg)
# Remove the pending subscription and inform the caller of the failure
s = self._requests[:unsubscribe].delete(msg.request_request)
if s
@@ -578,16 +578,16 @@
# Create a new publish request
request = self._generate_id
self._requests[:publish][request] = {t: topic, a: args, k: kwargs, o: options, c: callback} if options[:acknowledge]
# Send the message
- publish = Wamp::Client::Message::Publish.new(request, options, topic, args, kwargs)
+ publish = Message::Publish.new(request, options, topic, args, kwargs)
self._send_message(publish)
end
# Processes the response to a publish request
- # @param msg [Wamp::Client::Message::Published] The response from the subscribe
+ # @param msg [Message::Published] The response from the subscribe
def _process_PUBLISHED(msg)
# Remove the pending publish and alert the callback
p = self._requests[:publish].delete(msg.publish_request)
if p
@@ -603,11 +603,11 @@
end
end
# Processes an error from a publish request
- # @param msg [Wamp::Client::Message::Error] The response from the subscribe
+ # @param msg [Message::Error] The response from the subscribe
def _process_PUBLISH_error(msg)
# Remove the pending publish and inform the caller of the failure
s = self._requests[:publish].delete(msg.request_request)
if s
@@ -646,16 +646,16 @@
# Create a new registration request
request = self._generate_id
self._requests[:register][request] = {p: procedure, h: handler, i: interrupt, o: options, c: callback}
# Send the message
- register = Wamp::Client::Message::Register.new(request, options, procedure)
+ register = Message::Register.new(request, options, procedure)
self._send_message(register)
end
# Processes the response to a register request
- # @param msg [Wamp::Client::Message::Registered] The response from the subscribe
+ # @param msg [Message::Registered] The response from the subscribe
def _process_REGISTERED(msg)
# Remove the pending subscription, add it to the registered ones, and inform the caller
r = self._requests[:register].delete(msg.register_request)
if r
@@ -672,11 +672,11 @@
end
end
# Processes an error from a request
- # @param msg [Wamp::Client::Message::Error] The response from the register
+ # @param msg [Message::Error] The response from the register
def _process_REGISTER_error(msg)
# Remove the pending registration and inform the caller of the failure
r = self._requests[:register].delete(msg.request_request)
if r
@@ -706,11 +706,11 @@
elsif not error.is_a?(CallError)
backtrace = error.is_a?(Exception) ? error.backtrace : nil
error = CallError.new('wamp.error.runtime', [error.to_s], { backtrace: backtrace })
end
- error_msg = Wamp::Client::Message::Error.new(Wamp::Client::Message::Types::INVOCATION, request, {}, error.error, error.args, error.kwargs)
+ error_msg = Message::Error.new(Message::Types::INVOCATION, request, {}, error.error, error.args, error.kwargs)
self._send_message(error_msg)
end
# Sends a result for the invocation
# @param request [Integer] - The id of the request
@@ -720,29 +720,36 @@
# Prevent responses for defers that have already completed or had an error
if check_defer and not self._defers[request]
return
end
+ # Wrap the result accordingly
if result.nil?
result = CallResult.new
elsif result.is_a?(CallError)
# Do nothing
elsif not result.is_a?(CallResult)
result = CallResult.new([result])
end
+ # Send either the error or the response
if result.is_a?(CallError)
self._send_INVOCATION_error(request, result)
else
- yield_msg = Wamp::Client::Message::Yield.new(request, options, result.args, result.kwargs)
+ yield_msg = Message::Yield.new(request, options, result.args, result.kwargs)
self._send_message(yield_msg)
end
+
+ # Remove the defer if this was not a progress update
+ if check_defer and options[:progress] == nil
+ self._defers.delete(request)
+ end
end
# Processes and event from the broker
- # @param msg [Wamp::Client::Message::Invocation] An procedure that was called
+ # @param msg [Message::Invocation] An procedure that was called
def _process_INVOCATION(msg)
request = msg.request
args = msg.call_arguments || []
kwargs = msg.call_argumentskw || {}
@@ -757,37 +764,36 @@
if h
begin
value = h.call(args, kwargs, details)
# If a defer was returned, handle accordingly
- if value.is_a? Wamp::Client::Defer::CallDefer
+ if value.is_a? Defer::CallDefer
value.request = request
value.registration = msg.registered_registration
# Store the defer
self._defers[request] = value
# On complete, send the result
value.on_complete do |defer, result|
self.yield(defer.request, result, {}, true)
- self._defers.delete(defer.request)
end
# On error, send the error
value.on_error do |defer, error|
- self._send_INVOCATION_error(defer.request, error, true)
- self._defers.delete(defer.request)
+ error = CallError.new("wamp.error.runtime", [error]) if error.is_a?(String)
+ self.yield(defer.request, error, {}, true)
end
# For progressive, return the progress
- if value.is_a? Wamp::Client::Defer::ProgressiveCallDefer
+ if value.is_a? Defer::ProgressiveCallDefer
value.on_progress do |defer, result|
self.yield(defer.request, result, {progress: true}, true)
end
end
- # Else it was a normal response
+ # Else it was a normal response
else
self.yield(request, value)
end
rescue Exception => error
@@ -797,11 +803,11 @@
end
end
end
# Processes the interrupt
- # @param msg [Wamp::Client::Message::Interrupt] An interrupt to a procedure
+ # @param msg [Message::Interrupt] An interrupt to a procedure
def _process_INTERRUPT(msg)
request = msg.invocation_request
mode = msg.options[:mode]
@@ -849,16 +855,16 @@
# Create a new unsubscribe request
request = self._generate_id
self._requests[:unregister][request] = { r: registration, c: callback }
# Send the message
- unregister = Wamp::Client::Message::Unregister.new(request, registration.id)
+ unregister = Message::Unregister.new(request, registration.id)
self._send_message(unregister)
end
# Processes the response to a unregister request
- # @param msg [Wamp::Client::Message::Unregistered] The response from the unsubscribe
+ # @param msg [Message::Unregistered] The response from the unsubscribe
def _process_UNREGISTERED(msg)
# Remove the pending unregistration, add it to the registered ones, and inform the caller
r = self._requests[:unregister].delete(msg.unregister_request)
if r
@@ -875,11 +881,11 @@
end
end
# Processes an error from a request
- # @param msg [Wamp::Client::Message::Error] The response from the subscribe
+ # @param msg [Message::Error] The response from the subscribe
def _process_UNREGISTER_error(msg)
# Remove the pending subscription and inform the caller of the failure
r = self._requests[:unregister].delete(msg.request_request)
if r
@@ -919,11 +925,11 @@
# Create a new call request
request = self._generate_id
self._requests[:call][request] = {p: procedure, a: args, k: kwargs, o: options, c: callback}
# Send the message
- msg = Wamp::Client::Message::Call.new(request, options, procedure, args, kwargs)
+ msg = Message::Call.new(request, options, procedure, args, kwargs)
self._send_message(msg)
call = Call.new(self, request)
# Timeout Logic
@@ -938,11 +944,11 @@
call
end
# Processes the response to a publish request
- # @param msg [Wamp::Client::Message::Result] The response from the call
+ # @param msg [Message::Result] The response from the call
def _process_RESULT(msg)
details = msg.details || {}
call = self._requests[:call][msg.call_request]
@@ -960,11 +966,11 @@
end
end
# Processes an error from a call request
- # @param msg [Wamp::Client::Message::Error] The response from the call
+ # @param msg [Message::Error] The response from the call
def _process_CALL_error(msg)
# Remove the pending publish and inform the caller of the failure
call = self._requests[:call].delete(msg.request_request)
if call
@@ -993,10 +999,10 @@
end
self.class.check_nil('call', call, false)
# Send the message
- cancel = Wamp::Client::Message::Cancel.new(call.id, { mode: mode })
+ cancel = Message::Cancel.new(call.id, { mode: mode })
self._send_message(cancel)
end
#endregion
\ No newline at end of file