lib/plumbing/actor/threaded.rb in standard-procedure-plumbing-0.4.2 vs lib/plumbing/actor/threaded.rb in standard-procedure-plumbing-0.4.3

- old
+ new

@@ -16,14 +16,12 @@ end # Send the message to the target and wrap the result def send_message(message_name, *args, **params, &block) Message.new(@target, message_name, Plumbing::Actor.transporter.marshal(*args, **params), block, Concurrent::MVar.new).tap do |message| - @mutex.synchronize do - @queue << message - send_messages if @queue.any? - end + @queue << message + send_messages end end def safely(&) send_message(:perform_safely, &) @@ -36,18 +34,21 @@ within_actor? ? @queue.clear : @mutex.synchronize { @queue.clear } end protected - def future(&) = Concurrent::Promises.future(&) + def in_context(&) + Concurrent::Promises.future do + @mutex.synchronize(&) + end + end private def send_messages - future do - @mutex.synchronize do - message = @queue.shift - message&.call + in_context do + while (message = @queue.shift) + message.call end end end class Message < Concurrent::ImmutableStruct.new(:target, :message_name, :packed_args, :unsafe_block, :result)