lib/plumbing/actor/threaded.rb in standard-procedure-plumbing-0.4.5 vs lib/plumbing/actor/threaded.rb in standard-procedure-plumbing-0.4.6

- old
+ new

@@ -6,28 +6,27 @@ require_relative "transporter" module Plumbing module Actor class Threaded - attr_reader :target - def initialize target @target = target @queue = Concurrent::Array.new @mutex = Thread::Mutex.new end # Send the message to the target and wrap the result def send_message(message_name, *args, **params, &block) - puts "->#{@target.class}##{message_name}(#{args.inspect}, #{params.inspect})\n#{Thread.current.name}" if Plumbing.config.debug + Plumbing.config.logger.debug { "-> #{@target.class}##{message_name}(#{args.inspect}, #{params.inspect})\n#{Thread.current.name}" } Message.new(@target, message_name, Plumbing::Actor.transporter.marshal(*args), Plumbing::Actor.transporter.marshal(params).first, block, Concurrent::MVar.new).tap do |message| @queue << message send_messages end end def safely(&) + Plumbing.config.logger.debug { "-> #{@target.class}#perform_safely\n#{Thread.current.name}" } send_message(:perform_safely, &) nil end def in_context? = @mutex.owned? @@ -56,16 +55,16 @@ class Message < Concurrent::ImmutableStruct.new(:target, :message_name, :packed_args, :packed_params, :unsafe_block, :result) def call args = Plumbing::Actor.transporter.unmarshal(*packed_args) params = Plumbing::Actor.transporter.unmarshal(packed_params) - puts "=> #{target.class}##{message_name}(#{args.first.inspect}, #{params.first.inspect}, &#{!unsafe_block.nil?})\n#{Thread.current.name}" if Plumbing.config.debug + Plumbing.config.logger.debug { "---> #{target.class}##{message_name}(#{args.first.inspect}, #{params.first.inspect}, &#{!unsafe_block.nil?})\n#{Thread.current.name}" } value = target.send message_name, *args, **params.first, &unsafe_block + Plumbing.config.logger.debug { "===> #{target.class}##{message_name} => #{value}\n#{Thread.current.name}" } result.put Plumbing::Actor.transporter.marshal(value) rescue => ex - puts ex - puts ex.backtrace + Plumbing.config.logger.debug { "!!!! #{target.class}##{message_name} => #{ex}\n#{Thread.current.name}" } result.put ex end def value value = Plumbing::Actor.transporter.unmarshal(*result.take(Plumbing.config.timeout)).first