Sha256: 766beb1d66b935a7643617c739f2a92b0adc7e3a1a901e1b98bfa7142c9ecee8
Contents?: true
Size: 1.33 KB
Versions: 4
Compression:
Stored size: 1.33 KB
Contents
module PipeRpc class Hub::Socket def initialize(hub, args = {}) @hub = hub @input = args.fetch(:input) @output = args.fetch(:output) @on_sent = [] @on_received = [] end def read raise ClosedError if @input.closed? # infinite #readpartial that also works with mruby-restricted_io packed = '' loop do packed << @input.sysread(2*16) break unless @input.class.select([@input], nil, nil, 0) end unpack(packed).tap do |unpacked| @on_received.each{ |cb| cb.call(unpacked) } end rescue MessagePack::Error => e write ErrorResponse.new(id: nil, error: { code: -32700, data: { message: "MessagePack: #{e.message}" } }) end def write(obj) raise ClosedError if @output.closed? payload = obj.to_h @on_sent.each{ |callback| callback.call(payload) } @output.syswrite payload.to_msgpack end def on_sent(&on_sent) @on_sent << on_sent end def on_received(&on_received) @on_received << on_received end def close @input.close @output.close end def unpack(packed) if ::Object.const_defined?(:MRUBY_VERSION) MessagePack.unpack(packed, true) # to create unknown symbols in mruby else MessagePack.unpack(packed) end end end end
Version data entries
4 entries across 4 versions & 1 rubygems
Version | Path |
---|---|
pipe_rpc-2.2.1 | lib/pipe_rpc/hub_socket.rb |
pipe_rpc-2.2.0 | lib/pipe_rpc/hub_socket.rb |
pipe_rpc-2.1.0 | lib/pipe_rpc/hub_socket.rb |
pipe_rpc-2.0.0 | lib/pipe_rpc/hub_socket.rb |