lib/asir/transport.rb in asir-1.1.12 vs lib/asir/transport.rb in asir-1.2.0
- old
+ new
@@ -1,8 +1,9 @@
require 'time'
require 'asir/thread_variable'
require 'asir/message/delay'
+require 'asir/message/state'
require 'asir/transport/conduit'
module ASIR
# !SLIDE
# Transport
@@ -21,50 +22,48 @@
# Transport#send_message
# * Encode Message.
# * Send encoded Message.
# * Receive decoded Result.
def send_message message
- @message_count ||= 0; @message_count += 1
+ @message_count ||= 0; @message_count += 1 # NOT THREAD-SAFE
message.create_timestamp! if needs_message_timestamp? message
message.create_identifier! if needs_message_identifier? message
- @before_send_message.call(self, message) if @before_send_message
relative_message_delay! message
- message_payload = encoder.prepare.encode(message)
- opaque_result = _send_message(message, message_payload)
- receive_result(message, opaque_result)
+ state = Message::State.new(:message => message, :message_payload => encoder.prepare.encode(message))
+ @before_send_message.call(self, state) if @before_send_message
+ _send_message(state)
+ receive_result(state)
end
# !SLIDE
# Transport#receive_message
# Receive Message payload from stream.
- def receive_message stream
- # $stderr.puts " #{$$} #{self} receive_message #{stream}"
- @message_count ||= 0; @message_count += 1
- additional_data = { }
- if req_and_state = _receive_message(stream, additional_data)
- message = req_and_state[0] = encoder.prepare.decode(req_and_state.first)
- message.additional_data!.update(additional_data) if message
- if @after_receive_message
- @after_receive_message.call(self, message)
+ def receive_message state
+ @message_count ||= 0; @message_count += 1 # NOT THREAD-SAFE
+ if received = _receive_message(state)
+ if message = state.message = encoder.prepare.decode(state.message_payload)
+ message.additional_data!.update(state.additional_data) if state.additional_data
+ @after_receive_message.call(self, state) if @after_receive_message
+ self
end
end
- req_and_state
end
# !SLIDE END
# !SLIDE
# Transport#send_result
# Send Result to stream.
- def send_result result, stream, message_state
- message = result.message
+ def send_result state
+ result = state.result
+ message = state.message
if @one_way && message.block
message.block.call(result)
else
- # avoid sending back entire Message.
+ # Avoid sending back entire Message in Result.
result.message = nil unless @coder_needs_result_message
- result_payload = decoder.prepare.encode(result)
- _send_result(message, result, result_payload, stream, message_state)
+ state.result_payload = decoder.prepare.encode(result)
+ _send_result(state)
end
end
attr_accessor :coder_needs_result_message
# !SLIDE END
@@ -73,23 +72,28 @@
# Transport#receive_result
# Receieve Result from stream:
# * Receive Result payload
# * Decode Result.
# * Extract Result result or exception.
- def receive_result message, opaque_result
- result_payload = _receive_result(message, opaque_result)
- result = decoder.prepare.decode(result_payload)
+ # * Invoke Exception or return Result value.
+ def receive_result state
+ value = nil
+ return value unless _receive_result(state)
+ result = state.result ||= decoder.prepare.decode(state.result_payload)
+ message = state.message
if result && ! message.one_way
+ result.message = message
if exc = result.exception
invoker.invoke!(exc, self)
else
if ! @one_way && message.block
message.block.call(result)
end
- result.result
+ value = result.result
end
end
+ value
end
# !SLIDE END
def initialize *args
@verbose = 0
@@ -116,12 +120,12 @@
attr_accessor :after_invoke_message
# Proc to call with exception, if exception occurs within #serve_message!, but outside
# Message#invoke!.
#
- # trans.on_exception.call(trans, exception, :message, Message_instance, nil)
- # trans.on_exception.call(trans, exception, :result, Message_instance, Result_instance)
+ # trans.on_exception.call(trans, exception, :message, message_state)
+ # trans.on_exception.call(trans, exception, :result, message_state)
attr_accessor :on_exception
attr_accessor :needs_message_identifier, :needs_message_timestamp
def needs_message_identifier? m; @needs_message_identifier; end
def needs_message_timestamp? m; @needs_message_timestamp; end
@@ -137,45 +141,45 @@
alias :_receive_result :_subclass_responsibility
# !SLIDE
# Serve a Message.
def serve_message! in_stream, out_stream
- message = message_state = message_ok = result = result_ok = nil
+ state = message_ok = result = result_ok = nil
exception = original_exception = unforwardable_exception = nil
- message, message_state = receive_message(in_stream)
- if message
+ state = Message::State.new(:in_stream => in_stream, :out_stream => out_stream)
+ if receive_message(state)
message_ok = true
- result = invoke_message!(message)
+ invoke_message!(state)
result_ok = true
if @after_invoke_message
- @after_invoke_message.call(self, message, result)
+ @after_invoke_message.call(self, state)
end
self
else
nil
end
rescue ::Exception => exc
exception = original_exception = exc
_log [ :message_error, exc ]
- @on_exception.call(self, exc, :message, message, nil) if @on_exception
+ @on_exception.call(self, exc, :message, state) if @on_exception
ensure
begin
if message_ok
if exception && ! result_ok
case exception
when *Error::Unforwardable.unforwardable
unforwardable_exception = exception = Error::Unforwardable.new(exception)
end
- result = Result.new(message, nil, exception)
+ state.result = Result.new(state.message, nil, exception)
end
if out_stream
- send_result(result, out_stream, message_state)
+ send_result(state)
end
end
rescue ::Exception => exc
- _log [ :result_error, exc ]
- @on_exception.call(self, exc, :result, message, result) if @on_exception
+ _log [ :result_error, exc, exc.backtrace ]
+ @on_exception.call(self, exc, :result, state) if @on_exception
end
raise original_exception if unforwardable_exception
end
# !SLIDE pause
@@ -225,24 +229,28 @@
@decoder ||=
encoder
end
# Invokes the Message object, returns a Result object.
- def invoke_message! message
- result = nil
+ def invoke_message! state
Transport.with_attr! :current, self do
- with_attr! :message, message do
- wait_for_delay! message
- result = invoker.invoke!(message, self)
+ with_attr! :message_state, state do
+ with_attr! :message, state.message do
+ wait_for_delay! state.message
+ state.result = invoker.invoke!(state.message, self)
# Hook for Exceptions.
- if @on_result_exception && result.exception
- @on_result_exception.call(self, result)
+ if @on_result_exception && state.result.exception
+ @on_result_exception.call(self, state)
end
end
end
- result
+ end
+ self
end
- # The current Message being handled.
+
+ # The current Message::State.
+ attr_accessor_thread :message_state
+ # The current Message being invoked. DEPRECATED.
attr_accessor_thread :message
# The current active Transport.
cattr_accessor_thread :current