lib/wamp/client/session.rb in wamp_client-0.1.4 vs lib/wamp/client/session.rb in wamp_client-0.2.0

- old
+ new

@@ -1,1019 +1,352 @@ -=begin - -Copyright (c) 2018 Eric Chapman - -MIT License - -Permission is hereby granted, free of charge, to any person obtaining -a copy of this software and associated documentation files (the -"Software"), to deal in the Software without restriction, including -without limitation the rights to use, copy, modify, merge, publish, -distribute, sublicense, and/or sell copies of the Software, and to -permit persons to whom the Software is furnished to do so, subject to -the following conditions: - -The above copyright notice and this permission notice shall be -included in all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE -LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION -WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -=end - require 'wamp/client/transport/base' require 'wamp/client/message' require 'wamp/client/check' require 'wamp/client/version' +require 'wamp/client/event' +require 'wamp/client/request/require' +require 'wamp/client/manager/require' module Wamp module Client - WAMP_FEATURES = { - caller: { - features: { - caller_identification: true, - call_timeout: true, - call_canceling: true, - progressive_call_results: true - } - }, - callee: { - features: { - caller_identification: true, - ##call_trustlevels: true, - pattern_based_registration: true, - shared_registration: true, - ##call_timeout: true, - call_canceling: true, - progressive_call_results: true, - registration_revocation: true - } - }, - publisher: { - features: { - publisher_identification: true, - subscriber_blackwhite_listing: true, - publisher_exclusion: true - } - }, - subscriber: { - features: { - publisher_identification: true, - ##publication_trustlevels: true, - pattern_based_subscription: true, - subscription_revocation: true - ##event_history: true, - } - } + + CLOSED_SESSION_METHOD_LOOKUP = { + Message::Types::WELCOME => -> s, m { s.establish.welcome(m) }, + Message::Types::CHALLENGE => -> s, m { s.establish.challenge(m) }, + Message::Types::ABORT => -> s, m { s.establish.abort(m) }, } - HANDLER_LOOKUP = { + OPEN_SESSION_METHOD_LOOKUP = { + # Establish Response + Message::Types::GOODBYE => -> s, m { s.establish.goodbye(m) }, + # Error Responses - Message::Types::SUBSCRIBE => -> s,m { s._process_SUBSCRIBE_error(m) }, - Message::Types::UNSUBSCRIBE => -> s,m { s._process_UNSUBSCRIBE_error(m) }, - Message::Types::PUBLISH => -> s,m { s._process_PUBLISH_error(m) }, - Message::Types::REGISTER => -> s,m { s._process_REGISTER_error(m) }, - Message::Types::UNREGISTER => -> s,m { s._process_UNREGISTER_error(m) }, - Message::Types::CALL => -> s,m { s._process_CALL_error(m) }, + Message::Types::SUBSCRIBE => -> s, m { s.request[:subscribe].error(m) }, + Message::Types::UNSUBSCRIBE => -> s, m { s.request[:unsubscribe].error(m) }, + Message::Types::PUBLISH => -> s, m { s.request[:publish].error(m) }, + Message::Types::REGISTER => -> s, m { s.request[:register].error(m) }, + Message::Types::UNREGISTER => -> s, m { s.request[:unregister].error(m) }, + Message::Types::CALL => -> s, m { s.request[:call].error(m) }, # Result Responses - Message::Types::SUBSCRIBED => -> s,m { s._process_SUBSCRIBED(m) }, - Message::Types::UNSUBSCRIBED => -> s,m { s._process_UNSUBSCRIBED(m) }, - Message::Types::PUBLISHED => -> s,m { s._process_PUBLISHED(m) }, - Message::Types::EVENT => -> s,m { s._process_EVENT(m) }, - Message::Types::REGISTERED => -> s,m { s._process_REGISTERED(m) }, - Message::Types::UNREGISTERED => -> s,m { s._process_UNREGISTERED(m) }, - Message::Types::INVOCATION => -> s,m { s._process_INVOCATION(m) }, - Message::Types::INTERRUPT => -> s,m { s._process_INTERRUPT(m) }, - Message::Types::RESULT => -> s,m { s._process_RESULT(m) }, + Message::Types::SUBSCRIBED => -> s, m { s.request[:subscribe].success(m) }, + Message::Types::UNSUBSCRIBED => -> s, m { s.request[:unsubscribe].success(m) }, + Message::Types::PUBLISHED => -> s, m { s.request[:publish].success(m) }, + Message::Types::EVENT => -> s, m { s.subscription.event(m) }, + Message::Types::REGISTERED => -> s, m { s.request[:register].success(m) }, + Message::Types::UNREGISTERED => -> s, m { s.request[:unregister].success(m) }, + Message::Types::INVOCATION => -> s, m { s.registration.invoke(m) }, + Message::Types::INTERRUPT => -> s, m { s.registration.interrupt(m) }, + Message::Types::RESULT => -> s, m { s.request[:call].success(m) }, } - class CallResult - attr_accessor :args, :kwargs - - def initialize(args=nil, kwargs=nil) - self.args = args || [] - self.kwargs = kwargs || {} - end - end - - class CallError < Exception - attr_accessor :error, :args, :kwargs - - def initialize(error, args=nil, kwargs=nil) - self.error = error - self.args = args || [] - self.kwargs = kwargs || {} - end - end - - class Subscription - attr_accessor :topic, :handler, :options, :session, :id - - def initialize(topic, handler, options, session, id) - self.topic = topic - self.handler = handler - self.options = options - self.session = session - self.id = id - end - - def unsubscribe - self.session.unsubscribe(self) - end - - end - - class Registration - attr_accessor :procedure, :handler, :i_handler, :options, :session, :id - - def initialize(procedure, handler, options, i_handler, session, id) - self.procedure = procedure - self.handler = handler - self.options = options - self.i_handler = i_handler - self.session = session - self.id = id - end - - def unregister - self.session.unregister(self) - end - - end - - class Call - attr_accessor :session, :id - - def initialize(session, id) - self.session = session - self.id = id - end - - def cancel(mode='skip') - self.session.cancel(self, mode) - end - - end - class Session include Check + include Event - # on_join callback is called when the session joins the router. It has the following parameters - # @param details [Hash] Object containing information about the joined session - @on_join - def on_join(&on_join) - @on_join = on_join - end + attr_accessor :transport, :options, :request, :callback, + :subscription, :registration, :establish - # on_leave callback is called when the session leaves the router. It has the following attributes - # @param reason [String] The reason the session left the router - # @param details [Hash] Object containing information about the left session - @on_leave - def on_leave(&on_leave) - @on_leave = on_leave - end + create_event [:join, :challenge, :leave] - # on_challenge callback is called when an authentication challenge is received from the router. It has the - # following attributes - # @param authmethod [String] The type of auth being requested - # @param extra [Hash] Hash containing additional information - # @return signature, extras - @on_challenge - def on_challenge(&on_challenge) - @on_challenge = on_challenge - end - - # Simple setter for callbacks - def on(event, &callback) - case event - when :join - self.on_join(&callback) - when :challenge - self.on_challenge(&callback) - when :leave - self.on_leave(&callback) - else - raise RuntimeError, "Unknown on(event) '#{event}'" - end - end - - attr_accessor :id, :realm, :transport, :options - - # Private attributes - attr_accessor :_goodbye_sent, :_requests, :_subscriptions, :_registrations, :_defers - # Constructor # @param transport [Transport::Base] The transport that the session will use # @param options [Hash] Hash containing different session options # @option options [String] :authid The authentication ID # @option options [Array] :authmethods Different auth methods that this client supports def initialize(transport, options={}) # Parameters - self.id = nil - self.realm = nil self.options = options || {} + # Log the event + logger.info("#{self.class.name} created with options") + logger.info(" uri: #{options[:uri]}") + logger.info(" realm: #{options[:realm]}") + + # Create the send message lambda for the request objects + send_message_lambda = -> m { send_message(m) } + # Outstanding Requests - self._requests = { - publish: {}, - subscribe: {}, - unsubscribe: {}, - call: {}, - register: {}, - unregister: {} + self.request = { + publish: Request::Publish.new(self, send_message_lambda), + subscribe: Request::Subscribe.new(self, send_message_lambda) { |s_id, s| self.subscription.add(s_id, s) }, + unsubscribe: Request::Unsubscribe.new(self, send_message_lambda) { |s_id| self.subscription.remove(s_id) }, + call: Request::Call.new(self, send_message_lambda), + register: Request::Register.new(self, send_message_lambda) { |r_id, r| self.registration.add(r_id, r) }, + unregister: Request::Unregister.new(self, send_message_lambda) { |r_id| self.registration.remove(r_id) }, } # Init Subs and Regs in place - self._subscriptions = {} - self._registrations = {} - self._defers = {} + self.subscription = Manager::Subscription.new(self, send_message_lambda) + self.registration = Manager::Registration.new(self, send_message_lambda) + self.establish = Manager::Establish.new(self, send_message_lambda) + # Setup session callbacks + self.callback = {} + # Setup Transport self.transport = transport - self.transport.on_message do |msg| - self._receive_message(msg) + self.transport.on :message do |msg| + receive_message(msg) end - # Other parameters - self._goodbye_sent = false - - # Setup session callbacks - @on_join = nil - @on_leave = nil - @on_challenge = nil - end # Returns 'true' if the session is open + # def is_open? - !self.id.nil? + self.establish.is_open? end + # Returns the ID of the session + # + def id + self.establish.id + end + + # Returns the realm of the session + # + def realm + self.establish.realm + end + # Joins the WAMP Router + # # @param realm [String] The name of the realm def join(realm) - if is_open? - raise RuntimeError, "Session must be closed to call 'join'" - end + check_closed + # Check params self.class.check_uri('realm', realm) - self.realm = realm - - details = {} - details[:roles] = WAMP_FEATURES - details[:agent] = "Ruby-Wamp::Client-#{VERSION}" - details[:authid] = self.options[:authid] if self.options[:authid] - details[:authmethods] = self.options[:authmethods] if self.options[:authmethods] - - # Send Hello message - hello = Message::Hello.new(realm, details) - self._send_message(hello) + # Attempt to join + self.establish.join(realm) end # Leaves the WAMP Router + # # @param reason [String] URI signalling the reason for leaving def leave(reason='wamp.close.normal', message='user initiated') - unless is_open? - raise RuntimeError, "Session must be opened to call 'leave'" - end + check_open + # Check params self.class.check_uri('reason', reason, true) self.class.check_string('message', message, true) - details = {} - details[:message] = message - - # Send Goodbye message - goodbye = Message::Goodbye.new(details, reason) - self._send_message(goodbye) - self._goodbye_sent = true + # Leave the session + self.establish.leave(reason, message) end - # Generates an ID according to the specification (Section 5.1.2) - def _generate_id - rand(0..9007199254740992) - end - - # Converts and error message to a hash - # @param msg [Message::Error] - def _error_to_hash(msg) - { - error: msg.error, - args: msg.arguments, - kwargs: msg.argumentskw - } - end - - # Sends a message - # @param msg [Message::Base] - def _send_message(msg) - # Log the message - logger.debug("#{self.class.name} TX: #{msg.to_s}") - - # Send it to the transport - self.transport.send_message(msg.payload) - end - - # Processes received messages - # @param msg [Array] - def _receive_message(msg) - - # Print the raw message - logger.debug("#{self.class.name} RX(raw): #{msg.to_s}") - - # Parse the WAMP message - message = Message.parse(msg) - - # Print the parsed WAMP message - logger.debug("#{self.class.name} RX: #{message.to_s}") - - # WAMP Session is not open - if self.id.nil? - - # Parse the welcome message - if message.is_a? Message::Welcome - # Get the session ID - self.id = message.session - - # Log joining the session - logger.info("#{self.class.name} joined session with realm '#{message.details[:realm]}'") - - # Call the callback if it is set - @on_join.call(message.details) unless @on_join.nil? - elsif message.is_a? Message::Challenge - # Log challenge received - logger.debug("#{self.class.name} auth challenge '#{message.authmethod}', extra: #{message.extra}") - - # Call the callback if set - if @on_challenge - signature, extra = @on_challenge.call(message.authmethod, message.extra) - else - signature = nil - extra = nil - end - - signature ||= '' - extra ||= {} - - authenticate = Message::Authenticate.new(signature, extra) - self._send_message(authenticate) - - elsif message.is_a? Message::Abort - # Log leaving the session - logger.info("#{self.class.name} left session '#{message.reason}'") - - # Call the callback if it is set - @on_leave.call(message.reason, message.details) unless @on_leave.nil? - end - - # Wamp Session is open - else - - # If goodbye, close the session - if message.is_a? Message::Goodbye - - # If we didn't send the goodbye, respond - unless self._goodbye_sent - goodbye = Message::Goodbye.new({}, 'wamp.error.goodbye_and_out') - self._send_message(goodbye) - end - - # Close out session - self.id = nil - self.realm = nil - self._goodbye_sent = false - @on_leave.call(message.reason, message.details) unless @on_leave.nil? - - else - - # Else this is a normal message. Lookup the handler and call it - type = message.is_a?(Message::Error) ? message.request_type : message.class.type - handler = HANDLER_LOOKUP[type] - - if handler != nil - handler.call(self, message) - else - logger.error("#{self.class.name} unknown message type '#{type}'") - end - end - end - - end - - #region Subscribe Logic - # Subscribes to a topic + # # @param topic [String] The topic to subscribe to # @param handler [lambda] The handler(args, kwargs, details) when an event is received # @param options [Hash] The options for the subscription # @param callback [block] The callback(subscription, error) called to signal if the subscription was a success or not def subscribe(topic, handler, options={}, &callback) - unless is_open? - raise RuntimeError, "Session must be open to call 'subscribe'" - end + check_open + # Check params self.class.check_uri('topic', topic) self.class.check_dict('options', options) self.class.check_nil('handler', handler, false) - # Create a new subscribe request - request = self._generate_id - self._requests[:subscribe][request] = {t: topic, h: handler, o: options, c: callback} - - # Send the message - subscribe = Message::Subscribe.new(request, options, topic) - self._send_message(subscribe) + # Make the request + make_request(:subscribe, :request, topic, handler, options, &callback) end - # Processes the response to a subscribe request - # @param msg [Message::Subscribed] The response from the subscribe - def _process_SUBSCRIBED(msg) - - # Remove the pending subscription, add it to the registered ones, and inform the caller - s = self._requests[:subscribe].delete(msg.subscribe_request) - if s - - details = {} - details[:topic] = s[:t] unless details[:topic] - details[:type] = 'subscribe' - details[:session] = self - - n_s = Subscription.new(s[:t], s[:h], s[:o], self, msg.subscription) - self._subscriptions[msg.subscription] = n_s - c = s[:c] - c.call(n_s, nil, details) if c - end - - end - - # Processes an error from a request - # @param msg [Message::Error] The response from the subscribe - def _process_SUBSCRIBE_error(msg) - - # Remove the pending subscription and inform the caller of the failure - s = self._requests[:subscribe].delete(msg.request_request) - if s - - details = msg.details || {} - details[:topic] = s[:t] unless details[:topic] - details[:type] = 'subscribe' - details[:session] = self - - c = s[:c] - c.call(nil, self._error_to_hash(msg), details) if c - end - - end - - # Processes and event from the broker - # @param msg [Message::Event] An event that was published - def _process_EVENT(msg) - - args = msg.publish_arguments || [] - kwargs = msg.publish_argumentskw || {} - - s = self._subscriptions[msg.subscribed_subscription] - if s - details = msg.details || {} - details[:publication] = msg.published_publication - details[:session] = self - - h = s.handler - h.call(args, kwargs, details) if h - end - - end - - #endregion - - #region Unsubscribe Logic - # Unsubscribes from a subscription + # # @param subscription [Subscription] The subscription object from when the subscription was created # @param callback [block] The callback(subscription, error, details) called to signal if the subscription was a success or not def unsubscribe(subscription, &callback) - unless is_open? - raise RuntimeError, "Session must be open to call 'unsubscribe'" - end + check_open + # Check params self.class.check_nil('subscription', subscription, false) - # Create a new unsubscribe request - request = self._generate_id - self._requests[:unsubscribe][request] = { s: subscription, c: callback } - - # Send the message - unsubscribe = Message::Unsubscribe.new(request, subscription.id) - self._send_message(unsubscribe) + # Make the request + make_request(:unsubscribe, :request, subscription, &callback) end - # Processes the response to a unsubscribe request - # @param msg [Message::Unsubscribed] The response from the unsubscribe - def _process_UNSUBSCRIBED(msg) - - # Remove the pending unsubscription, add it to the registered ones, and inform the caller - s = self._requests[:unsubscribe].delete(msg.unsubscribe_request) - if s - n_s = s[:s] - self._subscriptions.delete(n_s.id) - - details = {} - details[:topic] = s[:s].topic - details[:type] = 'unsubscribe' - details[:session] = self - - c = s[:c] - c.call(n_s, nil, details) if c - end - - end - - - # Processes an error from a request - # @param msg [Message::Error] The response from the subscribe - def _process_UNSUBSCRIBE_error(msg) - - # Remove the pending subscription and inform the caller of the failure - s = self._requests[:unsubscribe].delete(msg.request_request) - if s - - details = msg.details || {} - details[:topic] = s[:s].topic unless details[:topic] - details[:type] = 'unsubscribe' - details[:session] = self - - c = s[:c] - c.call(nil, self._error_to_hash(msg), details) if c - end - - end - - #endregion - - #region Publish Logic - # Publishes and event to a topic + # # @param topic [String] The topic to publish the event to # @param args [Array] The arguments # @param kwargs [Hash] The keyword arguments # @param options [Hash] The options for the publish # @param callback [block] The callback(publish, error, details) called to signal if the publish was a success or not def publish(topic, args=nil, kwargs=nil, options={}, &callback) - unless is_open? - raise RuntimeError, "Session must be open to call 'publish'" - end + check_open + # Check params self.class.check_uri('topic', topic) self.class.check_dict('options', options) self.class.check_list('args', args, true) self.class.check_dict('kwargs', kwargs, true) - # Create a new publish request - request = self._generate_id - self._requests[:publish][request] = {t: topic, a: args, k: kwargs, o: options, c: callback} if options[:acknowledge] - - # Send the message - publish = Message::Publish.new(request, options, topic, args, kwargs) - self._send_message(publish) + # Make the request + make_request(:publish, :request, topic, args, kwargs, options, &callback) end - # Processes the response to a publish request - # @param msg [Message::Published] The response from the subscribe - def _process_PUBLISHED(msg) - - # Remove the pending publish and alert the callback - p = self._requests[:publish].delete(msg.publish_request) - if p - - details = {} - details[:topic] = p[:t] - details[:type] = 'publish' - details[:publication] = msg.publication - details[:session] = self - - c = p[:c] - c.call(p, nil, details) if c - end - - end - - # Processes an error from a publish request - # @param msg [Message::Error] The response from the subscribe - def _process_PUBLISH_error(msg) - - # Remove the pending publish and inform the caller of the failure - s = self._requests[:publish].delete(msg.request_request) - if s - - details = msg.details || {} - details[:topic] = s[:t] unless details[:topic] - details[:type] = 'publish' - details[:session] = self - - c = s[:c] - c.call(nil, self._error_to_hash(msg), details) if c - end - - end - - #endregion - - #region Register Logic - # Register to a procedure + # # @param procedure [String] The procedure to register for # @param handler [lambda] The handler(args, kwargs, details) when an invocation is received # @param options [Hash, nil] The options for the registration # @param interrupt [lambda] The handler(request, mode) when an interrupt is received # @param callback [block] The callback(registration, error, details) called to signal if the registration was a success or not def register(procedure, handler, options=nil, interrupt=nil, &callback) - unless is_open? - raise RuntimeError, "Session must be open to call 'register'" - end + check_open options ||= {} + # Check params self.class.check_uri('procedure', procedure) self.class.check_nil('handler', handler, false) - # Create a new registration request - request = self._generate_id - self._requests[:register][request] = {p: procedure, h: handler, i: interrupt, o: options, c: callback} - - # Send the message - register = Message::Register.new(request, options, procedure) - self._send_message(register) + # Make the request + make_request(:register, :request, procedure, handler, options, interrupt, &callback) end - # Processes the response to a register request - # @param msg [Message::Registered] The response from the subscribe - def _process_REGISTERED(msg) - - # Remove the pending subscription, add it to the registered ones, and inform the caller - r = self._requests[:register].delete(msg.register_request) - if r - n_r = Registration.new(r[:p], r[:h], r[:o], r[:i], self, msg.registration) - self._registrations[msg.registration] = n_r - - details = {} - details[:procedure] = r[:p] - details[:type] = 'register' - details[:session] = self - - c = r[:c] - c.call(n_r, nil, details) if c - end - - end - - # Processes an error from a request - # @param msg [Message::Error] The response from the register - def _process_REGISTER_error(msg) - - # Remove the pending registration and inform the caller of the failure - r = self._requests[:register].delete(msg.request_request) - if r - - details = msg.details || {} - details[:procedure] = r[:p] unless details[:procedure] - details[:type] = 'register' - details[:session] = self - - c = r[:c] - c.call(nil, self._error_to_hash(msg), details) if c - end - - end - - # Sends an error back to the caller - # @param request[Integer] - The request ID - # @param error - def _send_INVOCATION_error(request, error, check_defer=false) - # Prevent responses for defers that have already completed or had an error - if check_defer and not self._defers[request] - return - end - - if error.nil? - error = CallError.new('wamp.error.runtime') - elsif not error.is_a?(CallError) - backtrace = error.is_a?(Exception) ? error.backtrace : nil - error = CallError.new('wamp.error.runtime', [error.to_s], { backtrace: backtrace }) - end - - error_msg = Message::Error.new(Message::Types::INVOCATION, request, {}, error.error, error.args, error.kwargs) - self._send_message(error_msg) - end - # Sends a result for the invocation + # # @param request [Integer] - The id of the request # @param result [CallError, CallResult, anything] - If it is a CallError, the error will be returned # @param options [Hash] - The options to be sent with the yield def yield(request, result, options={}, check_defer=false) - # Prevent responses for defers that have already completed or had an error - if check_defer and not self._defers[request] - return - end + check_open - # Wrap the result accordingly - if result.nil? - result = CallResult.new - elsif result.is_a?(CallError) - # Do nothing - elsif not result.is_a?(CallResult) - result = CallResult.new([result]) - end - - # Send either the error or the response - if result.is_a?(CallError) - self._send_INVOCATION_error(request, result) - else - yield_msg = Message::Yield.new(request, options, result.args, result.kwargs) - self._send_message(yield_msg) - end - - # Remove the defer if this was not a progress update - if check_defer and options[:progress] == nil - self._defers.delete(request) - end + # Call the registration yield method + self.registration.yield(request, result, options, check_defer) end - - # Processes and event from the broker - # @param msg [Message::Invocation] An procedure that was called - def _process_INVOCATION(msg) - - request = msg.request - args = msg.call_arguments || [] - kwargs = msg.call_argumentskw || {} - - details = msg.details || {} - details[:request] = request - details[:session] = self - - r = self._registrations[msg.registered_registration] - if r - h = r.handler - if h - begin - value = h.call(args, kwargs, details) - - # If a defer was returned, handle accordingly - if value.is_a? Defer::CallDefer - value.request = request - value.registration = msg.registered_registration - - # Store the defer - self._defers[request] = value - - # On complete, send the result - value.on_complete do |defer, result| - self.yield(defer.request, result, {}, true) - end - - # On error, send the error - value.on_error do |defer, error| - error = CallError.new("wamp.error.runtime", [error]) if error.is_a?(String) - self.yield(defer.request, error, {}, true) - end - - # For progressive, return the progress - if value.is_a? Defer::ProgressiveCallDefer - value.on_progress do |defer, result| - self.yield(defer.request, result, {progress: true}, true) - end - end - - # Else it was a normal response - else - self.yield(request, value) - end - - rescue Exception => error - self._send_INVOCATION_error(request, error) - end - - end - end - end - - # Processes the interrupt - # @param msg [Message::Interrupt] An interrupt to a procedure - def _process_INTERRUPT(msg) - - request = msg.invocation_request - mode = msg.options[:mode] - - defer = self._defers[request] - if defer - r = self._registrations[defer.registration] - if r - # If it exists, call the interrupt handler to inform it of the interrupt - i = r.i_handler - error = nil - if i - begin - error = i.call(request, mode) - rescue Exception => e - error = e - end - end - - error ||= 'interrupt' - - # Send the error back to the client - self._send_INVOCATION_error(request, error, true) - end - - # Delete the defer - self._defers.delete(request) - end - - end - - #endregion - - #region Unregister Logic - # Unregisters from a procedure + # # @param registration [Registration] The registration object from when the registration was created # @param callback [block] The callback(registration, error, details) called to signal if the unregistration was a success or not def unregister(registration, &callback) - unless is_open? - raise RuntimeError, "Session must be open to call 'unregister'" - end + check_open + # Check params self.class.check_nil('registration', registration, false) - # Create a new unsubscribe request - request = self._generate_id - self._requests[:unregister][request] = { r: registration, c: callback } - - # Send the message - unregister = Message::Unregister.new(request, registration.id) - self._send_message(unregister) + # Make the request + make_request(:unregister, :request, registration, &callback) end - # Processes the response to a unregister request - # @param msg [Message::Unregistered] The response from the unsubscribe - def _process_UNREGISTERED(msg) - - # Remove the pending unregistration, add it to the registered ones, and inform the caller - r = self._requests[:unregister].delete(msg.unregister_request) - if r - r_s = r[:r] - self._registrations.delete(r_s.id) - - details = {} - details[:procedure] = r_s.procedure - details[:type] = 'unregister' - details[:session] = self - - c = r[:c] - c.call(r_s, nil, details) if c - end - - end - - # Processes an error from a request - # @param msg [Message::Error] The response from the subscribe - def _process_UNREGISTER_error(msg) - - # Remove the pending subscription and inform the caller of the failure - r = self._requests[:unregister].delete(msg.request_request) - if r - - details = msg.details || {} - details[:procedure] = r[:r].procedure unless details[:procedure] - details[:type] = 'unregister' - details[:session] = self - - c = r[:c] - c.call(nil, self._error_to_hash(msg), details) if c - end - - end - - #endregion - - #region Call Logic - # Publishes and event to a topic + # # @param procedure [String] The procedure to invoke # @param args [Array] The arguments # @param kwargs [Hash] The keyword arguments # @param options [Hash] The options for the call # @param callback [block] The callback(result, error, details) called to signal if the call was a success or not # @return [Call] An object representing the call def call(procedure, args=nil, kwargs=nil, options={}, &callback) - unless is_open? - raise RuntimeError, "Session must be open to call 'call'" - end + check_open + # Check params self.class.check_uri('procedure', procedure) self.class.check_dict('options', options) self.class.check_list('args', args, true) self.class.check_dict('kwargs', kwargs, true) - # Create a new call request - request = self._generate_id - self._requests[:call][request] = {p: procedure, a: args, k: kwargs, o: options, c: callback} + # Make the request + request_id = make_request(:call, :request, procedure, args, kwargs, options, &callback) - # Send the message - msg = Message::Call.new(request, options, procedure, args, kwargs) - self._send_message(msg) + # Create the call object + call = Request::CallObject.new(self, request_id) - call = Call.new(self, request) - # Timeout Logic if options[:timeout] and options[:timeout] > 0 + # Once the timer expires, if the call hasn't completed, cancel it self.transport.add_timer(options[:timeout]) do - # Once the timer expires, if the call hasn't completed, cancel it - if self._requests[:call][call.id] - call.cancel - end + call.cancel end end call end - # Processes the response to a publish request - # @param msg [Message::Result] The response from the call - def _process_RESULT(msg) + # Cancels a call + # + # @param call [Call] - The call object + # @param mode [String] - The mode of the skip. Options are 'skip', 'kill', 'killnowait' + def cancel(call, mode='skip') + check_open - details = msg.details || {} + # Check params + self.class.check_nil('call', call, false) - call = self._requests[:call][msg.call_request] + # Cancel the request + make_request(:call, :cancel, call.id, mode) + end - # Don't remove if progress is true and the options had receive_progress true - self._requests[:call].delete(msg.call_request) unless (details[:progress] and (call and call[:o][:receive_progress])) + private - if call - details[:procedure] = call[:p] unless details[:procedure] - details[:type] = 'call' - details[:session] = self + def check_closed + if is_open? + raise RuntimeError, "session must be closed to call this method" + end + end - c = call[:c] - c.call(CallResult.new(msg.yield_arguments, msg.yield_argumentskw), nil, details) if c + def check_open + unless is_open? + raise RuntimeError, "session must be open to call this method" end + end + def make_request(name, method, *args, &callback) + self.request[name].send(method, *args, &callback) end - # Processes an error from a call request - # @param msg [Message::Error] The response from the call - def _process_CALL_error(msg) + def logger + Wamp::Client.logger + end - # Remove the pending publish and inform the caller of the failure - call = self._requests[:call].delete(msg.request_request) - if call + def send_message(msg) - details = msg.details || {} - details[:procedure] = call[:p] unless details[:procedure] - details[:type] = 'call' - details[:session] = self + # Log the message + logger.debug("#{self.class.name} TX: #{msg.to_s}") - c = call[:c] - c.call(nil, self._error_to_hash(msg), details) if c - end - + # Send it to the transport + self.transport.send_message(msg.payload) end - #endregion + def receive_message(msg) - #region Cancel Logic + # Print the raw message + logger.debug("#{self.class.name} RX(raw): #{msg.to_s}") - # Cancels a call - # @param call [Call] - The call object - # @param mode [String] - The mode of the skip. Options are 'skip', 'kill', 'killnowait' - def cancel(call, mode='skip') - unless is_open? - raise RuntimeError, "Session must be open to call 'cancel'" - end + # Parse the WAMP message + message = Message.parse(msg) - self.class.check_nil('call', call, false) + # Print the parsed WAMP message + logger.debug("#{self.class.name} RX: #{message.to_s}") - # Send the message - cancel = Message::Cancel.new(call.id, { mode: mode }) - self._send_message(cancel) - end + # Get the lookup based on the state of the session + lookup = self.is_open? ? OPEN_SESSION_METHOD_LOOKUP : CLOSED_SESSION_METHOD_LOOKUP - #endregion + # Get the type of message + type = message.is_a?(Message::Error) ? message.request_type : message.class.type - private + # Get the handler + handler = lookup[type] - # Returns the logger - # - def logger - Wamp::Client.logger + # Execute the handler + if handler != nil + # Catch any standard exception and log it + begin + handler.call(self, message) + rescue StandardError => e + logger.error("#{self.class.name} - #{e.message}") + e.backtrace.each { |line| logger.error(" #{line}") } + end + else + logger.error("#{self.class.name} unknown message type '#{type}'") + end end end end end \ No newline at end of file