lib/plumbing/actor/threaded.rb in standard-procedure-plumbing-0.4.0 vs lib/plumbing/actor/threaded.rb in standard-procedure-plumbing-0.4.1

- old
+ new

@@ -10,45 +10,59 @@ attr_reader :target def initialize target @target = target @queue = Concurrent::Array.new + @mutex = Thread::Mutex.new end # Send the message to the target and wrap the result def send_message message_name, *args, &block Message.new(@target, message_name, Plumbing::Actor.transporter.marshal(*args), block, Concurrent::MVar.new).tap do |message| - @queue << message - send_messages if @queue.size == 1 + @mutex.synchronize do + @queue << message + send_messages if @queue.any? + end end end - protected + def safely(&) + send_message(:perform_safely, &) + nil + end - def future(&) - Concurrent::Promises.future(&) + def within_actor? = @mutex.owned? + + def stop + within_actor? ? @queue.clear : @mutex.synchronize { @queue.clear } end + protected + + def future(&) = Concurrent::Promises.future(&) + private def send_messages future do - while (message = @queue.shift) - message.call + @mutex.synchronize do + message = @queue.shift + message&.call end end end class Message < Concurrent::ImmutableStruct.new(:target, :message_name, :packed_args, :unsafe_block, :result) def call args = Plumbing::Actor.transporter.unmarshal(*packed_args) value = target.send message_name, *args, &unsafe_block + result.put Plumbing::Actor.transporter.marshal(value) rescue => ex result.put ex end - def await + def value value = Plumbing::Actor.transporter.unmarshal(*result.take(Plumbing.config.timeout)).first raise value if value.is_a? Exception value end end