Sha256: baa8eb662c6bc16c81cc1d545f98ad30195d860ba25cc19ba8d5ede70b6d3ce8

Contents?: true

Size: 1.45 KB

Versions: 2

Compression:

Stored size: 1.45 KB

Contents

# frozen_string_literal: true

require 'forwardable'
require 'grpc_kit/streams/packable'

module GrpcKit
  class Stream
    include GrpcKit::Streams::Packable

    extend Forwardable

    delegate %i[stream_id end_write end_read end_write? end_read?] => :@stream

    # @params protobuf [GrpcKit::Protobuffer]
    # @params session [GrpcKit::Session::Server|GrpcKit::Session::Client]
    # @params stream [GrpcKit::Session::Stream] primitive H2 stream id
    def initialize(protobuf:, session:, stream:)
      @protobuf = protobuf
      @session = session
      @stream = stream
    end

    def each
      loop { yield(recv) }
    end

    def send(data, last: false)
      req = @protobuf.encode(data)
      @stream.write_send_data(pack(req), last: last)
    end

    def recv(last: false)
      data = unpack(read(last: last))

      unless data
        raise StopIteration
      end

      compressed, size, buf = *data

      unless size == buf.size
        raise "inconsistent data: #{buf}"
      end

      if compressed
        raise 'compress option is unsupported'
      end

      @protobuf.decode(buf)
    end

    private

    def read(last: false)
      loop do
        data = @stream.read_recv_data(last: last)
        if data.empty?
          # Consider `end_read` is set in this invocation
          if @stream.end_read? && !last
            return nil
          end

          @session.run_once
          redo
        end

        return data
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
grpc_kit-0.1.2 lib/grpc_kit/stream.rb
grpc_kit-0.1.1 lib/grpc_kit/stream.rb