Sha256: baa8eb662c6bc16c81cc1d545f98ad30195d860ba25cc19ba8d5ede70b6d3ce8
Contents?: true
Size: 1.45 KB
Versions: 2
Compression:
Stored size: 1.45 KB
Contents
# frozen_string_literal: true require 'forwardable' require 'grpc_kit/streams/packable' module GrpcKit class Stream include GrpcKit::Streams::Packable extend Forwardable delegate %i[stream_id end_write end_read end_write? end_read?] => :@stream # @params protobuf [GrpcKit::Protobuffer] # @params session [GrpcKit::Session::Server|GrpcKit::Session::Client] # @params stream [GrpcKit::Session::Stream] primitive H2 stream id def initialize(protobuf:, session:, stream:) @protobuf = protobuf @session = session @stream = stream end def each loop { yield(recv) } end def send(data, last: false) req = @protobuf.encode(data) @stream.write_send_data(pack(req), last: last) end def recv(last: false) data = unpack(read(last: last)) unless data raise StopIteration end compressed, size, buf = *data unless size == buf.size raise "inconsistent data: #{buf}" end if compressed raise 'compress option is unsupported' end @protobuf.decode(buf) end private def read(last: false) loop do data = @stream.read_recv_data(last: last) if data.empty? # Consider `end_read` is set in this invocation if @stream.end_read? && !last return nil end @session.run_once redo end return data end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
grpc_kit-0.1.2 | lib/grpc_kit/stream.rb |
grpc_kit-0.1.1 | lib/grpc_kit/stream.rb |