Sha256: e05ba6422056a59e229931bd112b28325f1c3f04b8c17ca05c4cc6bf1044d4a5

Contents?: true

Size: 1.91 KB

Versions: 1

Compression:

Stored size: 1.91 KB

Contents

require "concurrent/array"
require "concurrent/mvar"
require "concurrent/immutable_struct"
require "concurrent/promises"
require_relative "transporter"

module Plumbing
  module Actor
    class Threaded
      attr_reader :target

      def initialize target
        @target = target
        @queue = Concurrent::Array.new
        @mutex = Thread::Mutex.new
      end

      # Send the message to the target and wrap the result
      def send_message(message_name, *args, **params, &block)
        Message.new(@target, message_name, Plumbing::Actor.transporter.marshal(*args, **params), block, Concurrent::MVar.new).tap do |message|
          @mutex.synchronize do
            @queue << message
            send_messages if @queue.any?
          end
        end
      end

      def safely(&)
        send_message(:perform_safely, &)
        nil
      end

      def within_actor? = @mutex.owned?

      def stop
        within_actor? ? @queue.clear : @mutex.synchronize { @queue.clear }
      end

      protected

      def future(&) = Concurrent::Promises.future(&)

      private

      def send_messages
        future do
          @mutex.synchronize do
            message = @queue.shift
            message&.call
          end
        end
      end

      class Message < Concurrent::ImmutableStruct.new(:target, :message_name, :packed_args, :unsafe_block, :result)
        def call
          args = Plumbing::Actor.transporter.unmarshal(*packed_args)
          value = target.send message_name, *args, &unsafe_block

          result.put Plumbing::Actor.transporter.marshal(value)
        rescue => ex
          result.put ex
        end

        def value
          value = Plumbing::Actor.transporter.unmarshal(*result.take(Plumbing.config.timeout)).first
          raise value if value.is_a? Exception
          value
        end
      end
    end

    def self.transporter
      @transporter ||= Plumbing::Actor::Transporter.new
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
standard-procedure-plumbing-0.4.2 lib/plumbing/actor/threaded.rb