Sha256: 52724d452629eeae38c98dea8a799f0203ede9d8c352fbdcdb9e101126d8a04b

Contents?: true

Size: 1.41 KB

Versions: 2

Compression:

Stored size: 1.41 KB

Contents

# frozen_string_literal: false

require 'forwardable'
require 'grpc_kit/session/buffer'
require 'grpc_kit/session/headers'

module GrpcKit
  module Session
    class Stream
      extend Forwardable

      delegate end_write: :@pending_send_data
      delegate end_read: :@pending_recv_data

      attr_reader :headers, :pending_send_data, :pending_recv_data
      attr_accessor :local_end_stream, :remote_end_stream, :inflight, :stream_id

      def initialize(stream_id:, send_data: GrpcKit::Session::Buffer.new)
        @stream_id = stream_id
        @end_read_stream = false
        @headers = GrpcKit::Session::Headers.new
        @pending_send_data = send_data
        @pending_recv_data = GrpcKit::Session::Buffer.new

        @local_end_stream = false
        @remote_end_stream = false
        @inflight = false
      end

      def write_send_data(data, last: false)
        @pending_send_data.write(data, last: last)
      end

      def read_recv_data(last: false)
        @pending_recv_data.read(last: last)
      end

      def end_write?
        @local_end_stream || @pending_send_data.end_write?
      end

      def end_read?
        @remote_end_stream || @pending_recv_data.end_read?
      end

      def end_stream?
        end_read? && end_write?
      end

      def end_stream
        end_read
        end_write
      end

      def add_header(name, value)
        @headers.add(name, value)
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
grpc_kit-0.1.2 lib/grpc_kit/session/stream.rb
grpc_kit-0.1.1 lib/grpc_kit/session/stream.rb