lib/asir/transport.rb in asir-0.2.0 vs lib/asir/transport.rb in asir-1.0.1

- old
+ new

@@ -1,7 +1,9 @@ require 'time' +require 'asir/thread_variable' require 'asir/message/delay' +require 'asir/transport/conduit' module ASIR # !SLIDE # Transport # @@ -9,23 +11,23 @@ # Service: Receive the Message from the Client. # Service: Invoke the Message. # Service: Send the Result to the Client. # Client: Receive the Result from the Service. class Transport - include Log, Initialization, AdditionalData, Message::Delay + include Log, Initialization, AdditionalData, Message::Delay, ThreadVariable, Conduit attr_accessor :encoder, :decoder, :one_way # !SLIDE # Transport#send_message # * Encode Message. # * Send encoded Message. # * Receive decoded Result. def send_message message @message_count ||= 0; @message_count += 1 - message.create_timestamp! if needs_message_timestamp? - message.create_identifier! if needs_message_identifier? + message.create_timestamp! if needs_message_timestamp? message + message.create_identifier! if needs_message_identifier? message @before_send_message.call(self, message) if @before_send_message relative_message_delay! message message_payload = encoder.dup.encode(message) opaque_result = _send_message(message, message_payload) receive_result(message, opaque_result) @@ -55,17 +57,10 @@ # !SLIDE # Transport#send_result # Send Result to stream. def send_result result, stream, message_state message = result.message - if @on_result_exception && result.exception - begin - @on_result_exception.call(self, result) - rescue ::Exception => exc - _log { [ :send_result, :result, result, :on_result_exception, exc ] } - end - end if @one_way && message.block message.block.call(result) else result.message = nil # avoid sending back entire Message. result_payload = decoder.dup.encode(result) @@ -83,11 +78,11 @@ def receive_result message, opaque_result result_payload = _receive_result(message, opaque_result) result = decoder.dup.decode(result_payload) if result && ! message.one_way if exc = result.exception - exc.invoke! + invoker.invoke!(exc, self) else if ! @one_way && message.block message.block.call(result) end result.result @@ -103,31 +98,31 @@ # Incremented for each message sent or received. attr_accessor :message_count # A Proc to call within #receive_message, after #_receive_message. - # trans.after_receiver_message(trans, message) + # trans.after_receive_message(trans, message) attr_accessor :after_receive_message # A Proc to call within #send_message, before #_send_message. # trans.before_send_message(trans, message) attr_accessor :before_send_message - # Proc to call after #_send_result if result.exception. + # Proc to call with #invoke_message! if result.exception. # trans.on_result_exception.call(trans, result) attr_accessor :on_result_exception # Proc to call with exception, if exception occurs within #serve_message!, but outside # Message#invoke!. # - # trans.on_exception.call(trans, exception, :message, Message_instance) + # trans.on_exception.call(trans, exception, :message, Message_instance, nil) # trans.on_exception.call(trans, exception, :result, Message_instance, Result_instance) attr_accessor :on_exception attr_accessor :needs_message_identifier, :needs_message_timestamp - alias :needs_message_identifier? :needs_message_identifier - alias :needs_message_timestamp? :needs_message_timestamp + def needs_message_identifier? m; @needs_message_identifier; end + def needs_message_timestamp? m; @needs_message_timestamp; end attr_accessor :verbose def _subclass_responsibility *args raise Error::SubclassResponsibility "subclass responsibility" @@ -152,11 +147,11 @@ nil end rescue ::Exception => exc exception = original_exception = exc _log [ :message_error, exc ] - @on_exception.call(self, exc, :message, message) if @on_exception + @on_exception.call(self, exc, :message, message, nil) if @on_exception ensure begin if message_ok if exception && ! result_ok case exception @@ -178,10 +173,11 @@ # !SLIDE pause # !SLIDE # Transport Server Support + attr_accessor :running def stop! force = false @running = false stop_server! if respond_to?(:stop_server!) raise Error::Terminate if force @@ -223,15 +219,32 @@ encoder end # Invokes the Message object, returns a Result object. def invoke_message! message - _processing_message = @processing_message - @processing_message = true - wait_for_delay! message - message.invoke! - ensure - @processing_message = _processing_message + result = nil + Transport.with_attr! :current, self do + with_attr! :message, message do + wait_for_delay! message + result = invoker.invoke!(message, self) + # Hook for Exceptions. + if @on_result_exception && result.exception + @on_result_exception.call(self, result) + end + end + end + result + end + # The current Message being handled. + attr_accessor_thread :message + + # The current active Transport. + cattr_accessor_thread :current + + # The Invoker responsible for invoking the Message. + attr_accessor :invoker + def invoker + @invoker ||= Invoker.new end # !SLIDE END # !SLIDE resume