lib/plumbing/actor/threaded.rb in standard-procedure-plumbing-0.4.5 vs lib/plumbing/actor/threaded.rb in standard-procedure-plumbing-0.4.6
- old
+ new
@@ -6,28 +6,27 @@
require_relative "transporter"
module Plumbing
module Actor
class Threaded
- attr_reader :target
-
def initialize target
@target = target
@queue = Concurrent::Array.new
@mutex = Thread::Mutex.new
end
# Send the message to the target and wrap the result
def send_message(message_name, *args, **params, &block)
- puts "->#{@target.class}##{message_name}(#{args.inspect}, #{params.inspect})\n#{Thread.current.name}" if Plumbing.config.debug
+ Plumbing.config.logger.debug { "-> #{@target.class}##{message_name}(#{args.inspect}, #{params.inspect})\n#{Thread.current.name}" }
Message.new(@target, message_name, Plumbing::Actor.transporter.marshal(*args), Plumbing::Actor.transporter.marshal(params).first, block, Concurrent::MVar.new).tap do |message|
@queue << message
send_messages
end
end
def safely(&)
+ Plumbing.config.logger.debug { "-> #{@target.class}#perform_safely\n#{Thread.current.name}" }
send_message(:perform_safely, &)
nil
end
def in_context? = @mutex.owned?
@@ -56,16 +55,16 @@
class Message < Concurrent::ImmutableStruct.new(:target, :message_name, :packed_args, :packed_params, :unsafe_block, :result)
def call
args = Plumbing::Actor.transporter.unmarshal(*packed_args)
params = Plumbing::Actor.transporter.unmarshal(packed_params)
- puts "=> #{target.class}##{message_name}(#{args.first.inspect}, #{params.first.inspect}, &#{!unsafe_block.nil?})\n#{Thread.current.name}" if Plumbing.config.debug
+ Plumbing.config.logger.debug { "---> #{target.class}##{message_name}(#{args.first.inspect}, #{params.first.inspect}, &#{!unsafe_block.nil?})\n#{Thread.current.name}" }
value = target.send message_name, *args, **params.first, &unsafe_block
+ Plumbing.config.logger.debug { "===> #{target.class}##{message_name} => #{value}\n#{Thread.current.name}" }
result.put Plumbing::Actor.transporter.marshal(value)
rescue => ex
- puts ex
- puts ex.backtrace
+ Plumbing.config.logger.debug { "!!!! #{target.class}##{message_name} => #{ex}\n#{Thread.current.name}" }
result.put ex
end
def value
value = Plumbing::Actor.transporter.unmarshal(*result.take(Plumbing.config.timeout)).first