lib/asir/transport.rb in asir-0.2.0 vs lib/asir/transport.rb in asir-1.0.1
- old
+ new
@@ -1,7 +1,9 @@
require 'time'
+require 'asir/thread_variable'
require 'asir/message/delay'
+require 'asir/transport/conduit'
module ASIR
# !SLIDE
# Transport
#
@@ -9,23 +11,23 @@
# Service: Receive the Message from the Client.
# Service: Invoke the Message.
# Service: Send the Result to the Client.
# Client: Receive the Result from the Service.
class Transport
- include Log, Initialization, AdditionalData, Message::Delay
+ include Log, Initialization, AdditionalData, Message::Delay, ThreadVariable, Conduit
attr_accessor :encoder, :decoder, :one_way
# !SLIDE
# Transport#send_message
# * Encode Message.
# * Send encoded Message.
# * Receive decoded Result.
def send_message message
@message_count ||= 0; @message_count += 1
- message.create_timestamp! if needs_message_timestamp?
- message.create_identifier! if needs_message_identifier?
+ message.create_timestamp! if needs_message_timestamp? message
+ message.create_identifier! if needs_message_identifier? message
@before_send_message.call(self, message) if @before_send_message
relative_message_delay! message
message_payload = encoder.dup.encode(message)
opaque_result = _send_message(message, message_payload)
receive_result(message, opaque_result)
@@ -55,17 +57,10 @@
# !SLIDE
# Transport#send_result
# Send Result to stream.
def send_result result, stream, message_state
message = result.message
- if @on_result_exception && result.exception
- begin
- @on_result_exception.call(self, result)
- rescue ::Exception => exc
- _log { [ :send_result, :result, result, :on_result_exception, exc ] }
- end
- end
if @one_way && message.block
message.block.call(result)
else
result.message = nil # avoid sending back entire Message.
result_payload = decoder.dup.encode(result)
@@ -83,11 +78,11 @@
def receive_result message, opaque_result
result_payload = _receive_result(message, opaque_result)
result = decoder.dup.decode(result_payload)
if result && ! message.one_way
if exc = result.exception
- exc.invoke!
+ invoker.invoke!(exc, self)
else
if ! @one_way && message.block
message.block.call(result)
end
result.result
@@ -103,31 +98,31 @@
# Incremented for each message sent or received.
attr_accessor :message_count
# A Proc to call within #receive_message, after #_receive_message.
- # trans.after_receiver_message(trans, message)
+ # trans.after_receive_message(trans, message)
attr_accessor :after_receive_message
# A Proc to call within #send_message, before #_send_message.
# trans.before_send_message(trans, message)
attr_accessor :before_send_message
- # Proc to call after #_send_result if result.exception.
+ # Proc to call with #invoke_message! if result.exception.
# trans.on_result_exception.call(trans, result)
attr_accessor :on_result_exception
# Proc to call with exception, if exception occurs within #serve_message!, but outside
# Message#invoke!.
#
- # trans.on_exception.call(trans, exception, :message, Message_instance)
+ # trans.on_exception.call(trans, exception, :message, Message_instance, nil)
# trans.on_exception.call(trans, exception, :result, Message_instance, Result_instance)
attr_accessor :on_exception
attr_accessor :needs_message_identifier, :needs_message_timestamp
- alias :needs_message_identifier? :needs_message_identifier
- alias :needs_message_timestamp? :needs_message_timestamp
+ def needs_message_identifier? m; @needs_message_identifier; end
+ def needs_message_timestamp? m; @needs_message_timestamp; end
attr_accessor :verbose
def _subclass_responsibility *args
raise Error::SubclassResponsibility "subclass responsibility"
@@ -152,11 +147,11 @@
nil
end
rescue ::Exception => exc
exception = original_exception = exc
_log [ :message_error, exc ]
- @on_exception.call(self, exc, :message, message) if @on_exception
+ @on_exception.call(self, exc, :message, message, nil) if @on_exception
ensure
begin
if message_ok
if exception && ! result_ok
case exception
@@ -178,10 +173,11 @@
# !SLIDE pause
# !SLIDE
# Transport Server Support
+ attr_accessor :running
def stop! force = false
@running = false
stop_server! if respond_to?(:stop_server!)
raise Error::Terminate if force
@@ -223,15 +219,32 @@
encoder
end
# Invokes the Message object, returns a Result object.
def invoke_message! message
- _processing_message = @processing_message
- @processing_message = true
- wait_for_delay! message
- message.invoke!
- ensure
- @processing_message = _processing_message
+ result = nil
+ Transport.with_attr! :current, self do
+ with_attr! :message, message do
+ wait_for_delay! message
+ result = invoker.invoke!(message, self)
+ # Hook for Exceptions.
+ if @on_result_exception && result.exception
+ @on_result_exception.call(self, result)
+ end
+ end
+ end
+ result
+ end
+ # The current Message being handled.
+ attr_accessor_thread :message
+
+ # The current active Transport.
+ cattr_accessor_thread :current
+
+ # The Invoker responsible for invoking the Message.
+ attr_accessor :invoker
+ def invoker
+ @invoker ||= Invoker.new
end
# !SLIDE END
# !SLIDE resume