Sha256: f171324083a5fef649669b4592a716a814083c0e44d918d28900b34a015ab387

Contents?: true

Size: 1.7 KB

Versions: 4

Compression:

Stored size: 1.7 KB

Contents

require 'weakref'

module DCell
  class RPC < Celluloid::SyncCall
    def initialize(id, caller, method, arguments, block)
      @id, @caller, @method, @arguments, @block = id, caller, method, arguments, block
    end

    # Custom marshaller for compatibility with Celluloid::Mailbox marshalling
    def _dump(level)
      payload = Marshal.dump [@caller, @method, @arguments, @block]
      "#{@id}:#{payload}"
    end

    # Loader for custom marshal format
    def self._load(string)
      id = string.slice!(0, string.index(":") + 1)
      match = id.match(/^([a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12})@(.+?):$/)
      raise ArgumentError, "couldn't parse call ID" unless match

      uuid, node_id = match[1], match[2]

      if DCell.id == node_id
        Manager.claim uuid
      else
        caller, method, arguments, block = Marshal.load(string)
        RPC.new("#{uuid}@#{node_id}", caller, method, arguments, block)
      end
    end

    # Tracks calls-in-flight
    class Manager
      @mutex  = Mutex.new
      @ids    = {}
      @calls  = {}

      def self.register(call)
        @mutex.lock
        begin
          call_id = @ids[call.object_id]
          unless call_id
            call_id = Celluloid.uuid
            @ids[call.object_id] = call_id
          end

          @calls[call_id] = WeakRef.new(call)
          call_id
        ensure
          @mutex.unlock rescue nil
        end
      end

      def self.claim(call_id)
        @mutex.lock
        begin
          ref = @calls.delete(call_id)
          ref.__getobj__ if ref
        rescue WeakRef::RefError
          # Nothing to see here, folks
        ensure
          @mutex.unlock rescue nil
        end
      end
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
dcell-0.13.0 lib/dcell/rpc.rb
dcell-0.13.0.pre lib/dcell/rpc.rb
dcell-0.12.0.pre lib/dcell/rpc.rb
dcell-0.10.0 lib/dcell/rpc.rb