lib/plumbing/actor/threaded.rb in standard-procedure-plumbing-0.4.2 vs lib/plumbing/actor/threaded.rb in standard-procedure-plumbing-0.4.3
- old
+ new
@@ -16,14 +16,12 @@
end
# Send the message to the target and wrap the result
def send_message(message_name, *args, **params, &block)
Message.new(@target, message_name, Plumbing::Actor.transporter.marshal(*args, **params), block, Concurrent::MVar.new).tap do |message|
- @mutex.synchronize do
- @queue << message
- send_messages if @queue.any?
- end
+ @queue << message
+ send_messages
end
end
def safely(&)
send_message(:perform_safely, &)
@@ -36,18 +34,21 @@
within_actor? ? @queue.clear : @mutex.synchronize { @queue.clear }
end
protected
- def future(&) = Concurrent::Promises.future(&)
+ def in_context(&)
+ Concurrent::Promises.future do
+ @mutex.synchronize(&)
+ end
+ end
private
def send_messages
- future do
- @mutex.synchronize do
- message = @queue.shift
- message&.call
+ in_context do
+ while (message = @queue.shift)
+ message.call
end
end
end
class Message < Concurrent::ImmutableStruct.new(:target, :message_name, :packed_args, :unsafe_block, :result)