Sha256: 766beb1d66b935a7643617c739f2a92b0adc7e3a1a901e1b98bfa7142c9ecee8

Contents?: true

Size: 1.33 KB

Versions: 4

Compression:

Stored size: 1.33 KB

Contents

module PipeRpc
  class Hub::Socket
    def initialize(hub, args = {})
      @hub = hub
      @input = args.fetch(:input)
      @output = args.fetch(:output)
      @on_sent = []
      @on_received = []
    end

    def read
      raise ClosedError if @input.closed?

      # infinite #readpartial that also works with mruby-restricted_io
      packed = ''
      loop do
        packed << @input.sysread(2*16)
        break unless @input.class.select([@input], nil, nil, 0)
      end

      unpack(packed).tap do |unpacked|
        @on_received.each{ |cb| cb.call(unpacked) }
      end
    rescue MessagePack::Error => e
      write ErrorResponse.new(id: nil, error: { code: -32700, data: { message: "MessagePack: #{e.message}" } })
    end

    def write(obj)
      raise ClosedError if @output.closed?
      payload = obj.to_h
      @on_sent.each{ |callback| callback.call(payload) }
      @output.syswrite payload.to_msgpack
    end

    def on_sent(&on_sent)
      @on_sent << on_sent
    end

    def on_received(&on_received)
      @on_received << on_received
    end

    def close
      @input.close
      @output.close
    end

    def unpack(packed)
      if ::Object.const_defined?(:MRUBY_VERSION)
        MessagePack.unpack(packed, true) # to create unknown symbols in mruby
      else
        MessagePack.unpack(packed)
      end
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
pipe_rpc-2.2.1 lib/pipe_rpc/hub_socket.rb
pipe_rpc-2.2.0 lib/pipe_rpc/hub_socket.rb
pipe_rpc-2.1.0 lib/pipe_rpc/hub_socket.rb
pipe_rpc-2.0.0 lib/pipe_rpc/hub_socket.rb