Sha256: e1771283c56337b462c71ef25074ef3f72b93808bf5bcc9363d16cae05e24b82

Contents?: true

Size: 1.09 KB

Versions: 2

Compression:

Stored size: 1.09 KB

Contents

# frozen_string_literal: true

module GrpcKit
  module Streams
    class Stream
      include GrpcKit::Rpcs::Packable

      def initialize(protobuf:, session:, stream:)
        @protobuf = protobuf
        @session = session
        @stream = stream
      end

      def send(last: false)
        req = @protobuf.encode(data)
        @stream.write_send_data(pack(req), last: last)
      end

      def recv(last: false)
        data = unpack(read(last: last))

        unless data
          raise StopIteration
        end

        compressed, size, buf = *data

        unless size == buf.size
          raise "inconsistent data: #{buf}"
        end

        if compressed
          raise 'compress option is unsupported'
        end

        @protobuf.decode(buf)
      end

      private

      def read(last: false)
        loop do
          data = @stream.read_recv_data(last: last)
          if data.empty?
            if @stream.end_read?
              return nil
            end

            @session.run_once
            redo
          end

          return data
        end
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
grpc_kit-0.1.2 lib/grpc_kit/streams/stream.rb
grpc_kit-0.1.1 lib/grpc_kit/streams/stream.rb