Sha256: 429456d3c4c55d3ff3c12595f398143c7790c6f1cbaba38d02247e345948e11b

Contents?: true

Size: 1.63 KB

Versions: 5

Compression:

Stored size: 1.63 KB

Contents

module PipeRpc
  class Hub::Socket
    def initialize(hub, args = {})
      @hub = hub
      @input = args.fetch(:input)
      @output = args.fetch(:output)
      @on_sent = []
      @on_received = []
      @closed_reason = "lost connection to other side"
    end

    def read
      raise ClosedError.new(@closed_reason) if @input.closed?

      # infinite #readpartial that also works with mruby-restricted_io
      packed = ''
      loop do
        packed << @input.sysread(2*16)
        break unless @input.class.select([@input], nil, nil, 0)
      end

      unpack(packed).tap do |unpacked|
        @on_received.each{ |cb| cb.call(unpacked) }
      end
    rescue MessagePack::Error => e
      write ErrorResponse.new(id: nil, error: { code: -32700, data: { message: "MessagePack: #{e.message}" } })
    end

    def write(obj)
      raise ClosedError.new(@closed_reason) if @output.closed?
      payload = obj.to_h
      @on_sent.each{ |callback| callback.call(payload) }
      @output.syswrite payload.to_msgpack
    end

    def on_sent(&on_sent)
      @on_sent << on_sent
      on_sent
    end

    def off_sent(callback)
      @on_sent.delete(callback)
    end

    def on_received(&on_received)
      @on_received << on_received
      on_received
    end

    def off_received(callback)
      @on_received.delete(callback)
    end

    def close(reason)
      @closed_reason = reason
      @input.close
      @output.close
    end

    def unpack(packed)
      if ::Object.const_defined?(:MRUBY_VERSION)
        MessagePack.unpack(packed, true) # to create unknown symbols in mruby
      else
        MessagePack.unpack(packed)
      end
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
pipe_rpc-2.5.0 lib/pipe_rpc/hub_socket.rb
pipe_rpc-2.4.0 lib/pipe_rpc/hub_socket.rb
pipe_rpc-2.3.0 lib/pipe_rpc/hub_socket.rb
pipe_rpc-2.2.3 lib/pipe_rpc/hub_socket.rb
pipe_rpc-2.2.2 lib/pipe_rpc/hub_socket.rb