lib/asir/transport.rb in asir-1.1.12 vs lib/asir/transport.rb in asir-1.2.0

- old
+ new

@@ -1,8 +1,9 @@ require 'time' require 'asir/thread_variable' require 'asir/message/delay' +require 'asir/message/state' require 'asir/transport/conduit' module ASIR # !SLIDE # Transport @@ -21,50 +22,48 @@ # Transport#send_message # * Encode Message. # * Send encoded Message. # * Receive decoded Result. def send_message message - @message_count ||= 0; @message_count += 1 + @message_count ||= 0; @message_count += 1 # NOT THREAD-SAFE message.create_timestamp! if needs_message_timestamp? message message.create_identifier! if needs_message_identifier? message - @before_send_message.call(self, message) if @before_send_message relative_message_delay! message - message_payload = encoder.prepare.encode(message) - opaque_result = _send_message(message, message_payload) - receive_result(message, opaque_result) + state = Message::State.new(:message => message, :message_payload => encoder.prepare.encode(message)) + @before_send_message.call(self, state) if @before_send_message + _send_message(state) + receive_result(state) end # !SLIDE # Transport#receive_message # Receive Message payload from stream. - def receive_message stream - # $stderr.puts " #{$$} #{self} receive_message #{stream}" - @message_count ||= 0; @message_count += 1 - additional_data = { } - if req_and_state = _receive_message(stream, additional_data) - message = req_and_state[0] = encoder.prepare.decode(req_and_state.first) - message.additional_data!.update(additional_data) if message - if @after_receive_message - @after_receive_message.call(self, message) + def receive_message state + @message_count ||= 0; @message_count += 1 # NOT THREAD-SAFE + if received = _receive_message(state) + if message = state.message = encoder.prepare.decode(state.message_payload) + message.additional_data!.update(state.additional_data) if state.additional_data + @after_receive_message.call(self, state) if @after_receive_message + self end end - req_and_state end # !SLIDE END # !SLIDE # Transport#send_result # Send Result to stream. - def send_result result, stream, message_state - message = result.message + def send_result state + result = state.result + message = state.message if @one_way && message.block message.block.call(result) else - # avoid sending back entire Message. + # Avoid sending back entire Message in Result. result.message = nil unless @coder_needs_result_message - result_payload = decoder.prepare.encode(result) - _send_result(message, result, result_payload, stream, message_state) + state.result_payload = decoder.prepare.encode(result) + _send_result(state) end end attr_accessor :coder_needs_result_message # !SLIDE END @@ -73,23 +72,28 @@ # Transport#receive_result # Receieve Result from stream: # * Receive Result payload # * Decode Result. # * Extract Result result or exception. - def receive_result message, opaque_result - result_payload = _receive_result(message, opaque_result) - result = decoder.prepare.decode(result_payload) + # * Invoke Exception or return Result value. + def receive_result state + value = nil + return value unless _receive_result(state) + result = state.result ||= decoder.prepare.decode(state.result_payload) + message = state.message if result && ! message.one_way + result.message = message if exc = result.exception invoker.invoke!(exc, self) else if ! @one_way && message.block message.block.call(result) end - result.result + value = result.result end end + value end # !SLIDE END def initialize *args @verbose = 0 @@ -116,12 +120,12 @@ attr_accessor :after_invoke_message # Proc to call with exception, if exception occurs within #serve_message!, but outside # Message#invoke!. # - # trans.on_exception.call(trans, exception, :message, Message_instance, nil) - # trans.on_exception.call(trans, exception, :result, Message_instance, Result_instance) + # trans.on_exception.call(trans, exception, :message, message_state) + # trans.on_exception.call(trans, exception, :result, message_state) attr_accessor :on_exception attr_accessor :needs_message_identifier, :needs_message_timestamp def needs_message_identifier? m; @needs_message_identifier; end def needs_message_timestamp? m; @needs_message_timestamp; end @@ -137,45 +141,45 @@ alias :_receive_result :_subclass_responsibility # !SLIDE # Serve a Message. def serve_message! in_stream, out_stream - message = message_state = message_ok = result = result_ok = nil + state = message_ok = result = result_ok = nil exception = original_exception = unforwardable_exception = nil - message, message_state = receive_message(in_stream) - if message + state = Message::State.new(:in_stream => in_stream, :out_stream => out_stream) + if receive_message(state) message_ok = true - result = invoke_message!(message) + invoke_message!(state) result_ok = true if @after_invoke_message - @after_invoke_message.call(self, message, result) + @after_invoke_message.call(self, state) end self else nil end rescue ::Exception => exc exception = original_exception = exc _log [ :message_error, exc ] - @on_exception.call(self, exc, :message, message, nil) if @on_exception + @on_exception.call(self, exc, :message, state) if @on_exception ensure begin if message_ok if exception && ! result_ok case exception when *Error::Unforwardable.unforwardable unforwardable_exception = exception = Error::Unforwardable.new(exception) end - result = Result.new(message, nil, exception) + state.result = Result.new(state.message, nil, exception) end if out_stream - send_result(result, out_stream, message_state) + send_result(state) end end rescue ::Exception => exc - _log [ :result_error, exc ] - @on_exception.call(self, exc, :result, message, result) if @on_exception + _log [ :result_error, exc, exc.backtrace ] + @on_exception.call(self, exc, :result, state) if @on_exception end raise original_exception if unforwardable_exception end # !SLIDE pause @@ -225,24 +229,28 @@ @decoder ||= encoder end # Invokes the Message object, returns a Result object. - def invoke_message! message - result = nil + def invoke_message! state Transport.with_attr! :current, self do - with_attr! :message, message do - wait_for_delay! message - result = invoker.invoke!(message, self) + with_attr! :message_state, state do + with_attr! :message, state.message do + wait_for_delay! state.message + state.result = invoker.invoke!(state.message, self) # Hook for Exceptions. - if @on_result_exception && result.exception - @on_result_exception.call(self, result) + if @on_result_exception && state.result.exception + @on_result_exception.call(self, state) end end end - result + end + self end - # The current Message being handled. + + # The current Message::State. + attr_accessor_thread :message_state + # The current Message being invoked. DEPRECATED. attr_accessor_thread :message # The current active Transport. cattr_accessor_thread :current