lib/plumbing/actor/threaded.rb in standard-procedure-plumbing-0.4.3 vs lib/plumbing/actor/threaded.rb in standard-procedure-plumbing-0.4.4

- old
+ new

@@ -1,7 +1,8 @@ require "concurrent/array" require "concurrent/mvar" +require "concurrent/scheduled_task" require "concurrent/immutable_struct" require "concurrent/promises" require_relative "transporter" module Plumbing @@ -15,51 +16,56 @@ @mutex = Thread::Mutex.new end # Send the message to the target and wrap the result def send_message(message_name, *args, **params, &block) - Message.new(@target, message_name, Plumbing::Actor.transporter.marshal(*args, **params), block, Concurrent::MVar.new).tap do |message| + puts "->#{@target.class}##{message_name}(#{args.inspect}, #{params.inspect})\n#{Thread.current.name}" if Plumbing.config.debug + Message.new(@target, message_name, Plumbing::Actor.transporter.marshal(*args), Plumbing::Actor.transporter.marshal(params).first, block, Concurrent::MVar.new).tap do |message| @queue << message send_messages end end def safely(&) send_message(:perform_safely, &) nil end - def within_actor? = @mutex.owned? + def in_context? = @mutex.owned? - def stop - within_actor? ? @queue.clear : @mutex.synchronize { @queue.clear } - end + def stop = @queue.clear protected - def in_context(&) - Concurrent::Promises.future do - @mutex.synchronize(&) + def in_actor_thread &block + Concurrent::ScheduledTask.execute(0.1) do + @mutex.synchronize(&block) end end private def send_messages - in_context do - while (message = @queue.shift) - message.call - end + in_context? ? dispatch_messages : in_actor_thread { dispatch_messages } + end + + def dispatch_messages + while (message = @queue.shift) + message.call end end - class Message < Concurrent::ImmutableStruct.new(:target, :message_name, :packed_args, :unsafe_block, :result) + class Message < Concurrent::ImmutableStruct.new(:target, :message_name, :packed_args, :packed_params, :unsafe_block, :result) def call args = Plumbing::Actor.transporter.unmarshal(*packed_args) - value = target.send message_name, *args, &unsafe_block + params = Plumbing::Actor.transporter.unmarshal(packed_params) + puts "=> #{target.class}##{message_name}(#{args.first.inspect}, #{params.first.inspect}, &#{!unsafe_block.nil?})\n#{Thread.current.name}" if Plumbing.config.debug + value = target.send message_name, *args, **params.first, &unsafe_block result.put Plumbing::Actor.transporter.marshal(value) rescue => ex + puts ex + puts ex.backtrace result.put ex end def value value = Plumbing::Actor.transporter.unmarshal(*result.take(Plumbing.config.timeout)).first