lib/wamp/client/session.rb in wamp_client-0.1.3 vs lib/wamp/client/session.rb in wamp_client-0.1.4

- old
+ new

@@ -159,11 +159,11 @@ end end class Session - include Wamp::Client::Check + include Check # on_join callback is called when the session joins the router. It has the following parameters # @param details [Hash] Object containing information about the joined session @on_join def on_join(&on_join) @@ -206,11 +206,11 @@ # Private attributes attr_accessor :_goodbye_sent, :_requests, :_subscriptions, :_registrations, :_defers # Constructor - # @param transport [Wamp::Client::Transport::Base] The transport that the session will use + # @param transport [Transport::Base] The transport that the session will use # @param options [Hash] Hash containing different session options # @option options [String] :authid The authentication ID # @option options [Array] :authmethods Different auth methods that this client supports def initialize(transport, options={}) @@ -266,16 +266,16 @@ self.realm = realm details = {} details[:roles] = WAMP_FEATURES - details[:agent] = "Ruby-Wamp::Client-#{Wamp::Client::VERSION}" + details[:agent] = "Ruby-Wamp::Client-#{VERSION}" details[:authid] = self.options[:authid] if self.options[:authid] details[:authmethods] = self.options[:authmethods] if self.options[:authmethods] # Send Hello message - hello = Wamp::Client::Message::Hello.new(realm, details) + hello = Message::Hello.new(realm, details) self._send_message(hello) end # Leaves the WAMP Router # @param reason [String] URI signalling the reason for leaving @@ -289,32 +289,32 @@ details = {} details[:message] = message # Send Goodbye message - goodbye = Wamp::Client::Message::Goodbye.new(details, reason) + goodbye = Message::Goodbye.new(details, reason) self._send_message(goodbye) self._goodbye_sent = true end # Generates an ID according to the specification (Section 5.1.2) def _generate_id rand(0..9007199254740992) end # Converts and error message to a hash - # @param msg [Wamp::Client::Message::Error] + # @param msg [Message::Error] def _error_to_hash(msg) { error: msg.error, args: msg.arguments, kwargs: msg.argumentskw } end # Sends a message - # @param msg [Wamp::Client::Message::Base] + # @param msg [Message::Base] def _send_message(msg) # Log the message logger.debug("#{self.class.name} TX: #{msg.to_s}") # Send it to the transport @@ -327,29 +327,29 @@ # Print the raw message logger.debug("#{self.class.name} RX(raw): #{msg.to_s}") # Parse the WAMP message - message = Wamp::Client::Message.parse(msg) + message = Message.parse(msg) # Print the parsed WAMP message logger.debug("#{self.class.name} RX: #{message.to_s}") # WAMP Session is not open if self.id.nil? # Parse the welcome message - if message.is_a? Wamp::Client::Message::Welcome + if message.is_a? Message::Welcome # Get the session ID self.id = message.session # Log joining the session logger.info("#{self.class.name} joined session with realm '#{message.details[:realm]}'") # Call the callback if it is set @on_join.call(message.details) unless @on_join.nil? - elsif message.is_a? Wamp::Client::Message::Challenge + elsif message.is_a? Message::Challenge # Log challenge received logger.debug("#{self.class.name} auth challenge '#{message.authmethod}', extra: #{message.extra}") # Call the callback if set if @on_challenge @@ -360,14 +360,14 @@ end signature ||= '' extra ||= {} - authenticate = Wamp::Client::Message::Authenticate.new(signature, extra) + authenticate = Message::Authenticate.new(signature, extra) self._send_message(authenticate) - elsif message.is_a? Wamp::Client::Message::Abort + elsif message.is_a? Message::Abort # Log leaving the session logger.info("#{self.class.name} left session '#{message.reason}'") # Call the callback if it is set @on_leave.call(message.reason, message.details) unless @on_leave.nil? @@ -375,15 +375,15 @@ # Wamp Session is open else # If goodbye, close the session - if message.is_a? Wamp::Client::Message::Goodbye + if message.is_a? Message::Goodbye # If we didn't send the goodbye, respond unless self._goodbye_sent - goodbye = Wamp::Client::Message::Goodbye.new({}, 'wamp.error.goodbye_and_out') + goodbye = Message::Goodbye.new({}, 'wamp.error.goodbye_and_out') self._send_message(goodbye) end # Close out session self.id = nil @@ -426,16 +426,16 @@ # Create a new subscribe request request = self._generate_id self._requests[:subscribe][request] = {t: topic, h: handler, o: options, c: callback} # Send the message - subscribe = Wamp::Client::Message::Subscribe.new(request, options, topic) + subscribe = Message::Subscribe.new(request, options, topic) self._send_message(subscribe) end # Processes the response to a subscribe request - # @param msg [Wamp::Client::Message::Subscribed] The response from the subscribe + # @param msg [Message::Subscribed] The response from the subscribe def _process_SUBSCRIBED(msg) # Remove the pending subscription, add it to the registered ones, and inform the caller s = self._requests[:subscribe].delete(msg.subscribe_request) if s @@ -452,11 +452,11 @@ end end # Processes an error from a request - # @param msg [Wamp::Client::Message::Error] The response from the subscribe + # @param msg [Message::Error] The response from the subscribe def _process_SUBSCRIBE_error(msg) # Remove the pending subscription and inform the caller of the failure s = self._requests[:subscribe].delete(msg.request_request) if s @@ -471,11 +471,11 @@ end end # Processes and event from the broker - # @param msg [Wamp::Client::Message::Event] An event that was published + # @param msg [Message::Event] An event that was published def _process_EVENT(msg) args = msg.publish_arguments || [] kwargs = msg.publish_argumentskw || {} @@ -508,16 +508,16 @@ # Create a new unsubscribe request request = self._generate_id self._requests[:unsubscribe][request] = { s: subscription, c: callback } # Send the message - unsubscribe = Wamp::Client::Message::Unsubscribe.new(request, subscription.id) + unsubscribe = Message::Unsubscribe.new(request, subscription.id) self._send_message(unsubscribe) end # Processes the response to a unsubscribe request - # @param msg [Wamp::Client::Message::Unsubscribed] The response from the unsubscribe + # @param msg [Message::Unsubscribed] The response from the unsubscribe def _process_UNSUBSCRIBED(msg) # Remove the pending unsubscription, add it to the registered ones, and inform the caller s = self._requests[:unsubscribe].delete(msg.unsubscribe_request) if s @@ -535,11 +535,11 @@ end # Processes an error from a request - # @param msg [Wamp::Client::Message::Error] The response from the subscribe + # @param msg [Message::Error] The response from the subscribe def _process_UNSUBSCRIBE_error(msg) # Remove the pending subscription and inform the caller of the failure s = self._requests[:unsubscribe].delete(msg.request_request) if s @@ -578,16 +578,16 @@ # Create a new publish request request = self._generate_id self._requests[:publish][request] = {t: topic, a: args, k: kwargs, o: options, c: callback} if options[:acknowledge] # Send the message - publish = Wamp::Client::Message::Publish.new(request, options, topic, args, kwargs) + publish = Message::Publish.new(request, options, topic, args, kwargs) self._send_message(publish) end # Processes the response to a publish request - # @param msg [Wamp::Client::Message::Published] The response from the subscribe + # @param msg [Message::Published] The response from the subscribe def _process_PUBLISHED(msg) # Remove the pending publish and alert the callback p = self._requests[:publish].delete(msg.publish_request) if p @@ -603,11 +603,11 @@ end end # Processes an error from a publish request - # @param msg [Wamp::Client::Message::Error] The response from the subscribe + # @param msg [Message::Error] The response from the subscribe def _process_PUBLISH_error(msg) # Remove the pending publish and inform the caller of the failure s = self._requests[:publish].delete(msg.request_request) if s @@ -646,16 +646,16 @@ # Create a new registration request request = self._generate_id self._requests[:register][request] = {p: procedure, h: handler, i: interrupt, o: options, c: callback} # Send the message - register = Wamp::Client::Message::Register.new(request, options, procedure) + register = Message::Register.new(request, options, procedure) self._send_message(register) end # Processes the response to a register request - # @param msg [Wamp::Client::Message::Registered] The response from the subscribe + # @param msg [Message::Registered] The response from the subscribe def _process_REGISTERED(msg) # Remove the pending subscription, add it to the registered ones, and inform the caller r = self._requests[:register].delete(msg.register_request) if r @@ -672,11 +672,11 @@ end end # Processes an error from a request - # @param msg [Wamp::Client::Message::Error] The response from the register + # @param msg [Message::Error] The response from the register def _process_REGISTER_error(msg) # Remove the pending registration and inform the caller of the failure r = self._requests[:register].delete(msg.request_request) if r @@ -706,11 +706,11 @@ elsif not error.is_a?(CallError) backtrace = error.is_a?(Exception) ? error.backtrace : nil error = CallError.new('wamp.error.runtime', [error.to_s], { backtrace: backtrace }) end - error_msg = Wamp::Client::Message::Error.new(Wamp::Client::Message::Types::INVOCATION, request, {}, error.error, error.args, error.kwargs) + error_msg = Message::Error.new(Message::Types::INVOCATION, request, {}, error.error, error.args, error.kwargs) self._send_message(error_msg) end # Sends a result for the invocation # @param request [Integer] - The id of the request @@ -720,29 +720,36 @@ # Prevent responses for defers that have already completed or had an error if check_defer and not self._defers[request] return end + # Wrap the result accordingly if result.nil? result = CallResult.new elsif result.is_a?(CallError) # Do nothing elsif not result.is_a?(CallResult) result = CallResult.new([result]) end + # Send either the error or the response if result.is_a?(CallError) self._send_INVOCATION_error(request, result) else - yield_msg = Wamp::Client::Message::Yield.new(request, options, result.args, result.kwargs) + yield_msg = Message::Yield.new(request, options, result.args, result.kwargs) self._send_message(yield_msg) end + + # Remove the defer if this was not a progress update + if check_defer and options[:progress] == nil + self._defers.delete(request) + end end # Processes and event from the broker - # @param msg [Wamp::Client::Message::Invocation] An procedure that was called + # @param msg [Message::Invocation] An procedure that was called def _process_INVOCATION(msg) request = msg.request args = msg.call_arguments || [] kwargs = msg.call_argumentskw || {} @@ -757,37 +764,36 @@ if h begin value = h.call(args, kwargs, details) # If a defer was returned, handle accordingly - if value.is_a? Wamp::Client::Defer::CallDefer + if value.is_a? Defer::CallDefer value.request = request value.registration = msg.registered_registration # Store the defer self._defers[request] = value # On complete, send the result value.on_complete do |defer, result| self.yield(defer.request, result, {}, true) - self._defers.delete(defer.request) end # On error, send the error value.on_error do |defer, error| - self._send_INVOCATION_error(defer.request, error, true) - self._defers.delete(defer.request) + error = CallError.new("wamp.error.runtime", [error]) if error.is_a?(String) + self.yield(defer.request, error, {}, true) end # For progressive, return the progress - if value.is_a? Wamp::Client::Defer::ProgressiveCallDefer + if value.is_a? Defer::ProgressiveCallDefer value.on_progress do |defer, result| self.yield(defer.request, result, {progress: true}, true) end end - # Else it was a normal response + # Else it was a normal response else self.yield(request, value) end rescue Exception => error @@ -797,11 +803,11 @@ end end end # Processes the interrupt - # @param msg [Wamp::Client::Message::Interrupt] An interrupt to a procedure + # @param msg [Message::Interrupt] An interrupt to a procedure def _process_INTERRUPT(msg) request = msg.invocation_request mode = msg.options[:mode] @@ -849,16 +855,16 @@ # Create a new unsubscribe request request = self._generate_id self._requests[:unregister][request] = { r: registration, c: callback } # Send the message - unregister = Wamp::Client::Message::Unregister.new(request, registration.id) + unregister = Message::Unregister.new(request, registration.id) self._send_message(unregister) end # Processes the response to a unregister request - # @param msg [Wamp::Client::Message::Unregistered] The response from the unsubscribe + # @param msg [Message::Unregistered] The response from the unsubscribe def _process_UNREGISTERED(msg) # Remove the pending unregistration, add it to the registered ones, and inform the caller r = self._requests[:unregister].delete(msg.unregister_request) if r @@ -875,11 +881,11 @@ end end # Processes an error from a request - # @param msg [Wamp::Client::Message::Error] The response from the subscribe + # @param msg [Message::Error] The response from the subscribe def _process_UNREGISTER_error(msg) # Remove the pending subscription and inform the caller of the failure r = self._requests[:unregister].delete(msg.request_request) if r @@ -919,11 +925,11 @@ # Create a new call request request = self._generate_id self._requests[:call][request] = {p: procedure, a: args, k: kwargs, o: options, c: callback} # Send the message - msg = Wamp::Client::Message::Call.new(request, options, procedure, args, kwargs) + msg = Message::Call.new(request, options, procedure, args, kwargs) self._send_message(msg) call = Call.new(self, request) # Timeout Logic @@ -938,11 +944,11 @@ call end # Processes the response to a publish request - # @param msg [Wamp::Client::Message::Result] The response from the call + # @param msg [Message::Result] The response from the call def _process_RESULT(msg) details = msg.details || {} call = self._requests[:call][msg.call_request] @@ -960,11 +966,11 @@ end end # Processes an error from a call request - # @param msg [Wamp::Client::Message::Error] The response from the call + # @param msg [Message::Error] The response from the call def _process_CALL_error(msg) # Remove the pending publish and inform the caller of the failure call = self._requests[:call].delete(msg.request_request) if call @@ -993,10 +999,10 @@ end self.class.check_nil('call', call, false) # Send the message - cancel = Wamp::Client::Message::Cancel.new(call.id, { mode: mode }) + cancel = Message::Cancel.new(call.id, { mode: mode }) self._send_message(cancel) end #endregion \ No newline at end of file