Sha256: 6d19d70e3eca92ee86814510485188a44dc8356f4c183e845733233b7fef38f0

Contents?: true

Size: 1.5 KB

Versions: 1

Compression:

Stored size: 1.5 KB

Contents

# frozen_string_literal: false

require 'forwardable'
require 'grpc_kit/session/headers'
require 'grpc_kit/session/stream_status'
require 'grpc_kit/session/recv_buffer'
require 'grpc_kit/session/send_buffer'

module GrpcKit
  module Session
    class Stream
      extend Forwardable

      delegate %i[end_write end_write?] => :@pending_send_data
      delegate %i[end_read end_read?] => :@pending_recv_data
      delegate %i[close close_remote close_local close? close_remote? close_local?] => :@status

      attr_reader :headers, :pending_send_data, :pending_recv_data, :trailer_data, :status
      attr_accessor :inflight, :stream_id

      def initialize(stream_id:, send_data: nil, recv_data: nil)
        @stream_id = stream_id
        @end_read_stream = false
        @headers = GrpcKit::Session::Headers.new
        @pending_send_data = send_data || GrpcKit::Session::SendBuffer.new
        @pending_recv_data = recv_data || GrpcKit::Session::RecvBuffer.new

        @inflight = false
        @trailer_data = {}
        @status = GrpcKit::Session::StreamStatus.new
        @draining = false
      end

      def drain
        @draining = true
      end

      def write_trailers_data(tariler)
        @trailer_data = tariler
      end

      def write_send_data(data, last: false)
        @pending_send_data.write(data, last: last)
      end

      def read_recv_data(last: false)
        @pending_recv_data.read(last: last)
      end

      def add_header(name, value)
        @headers.add(name, value)
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
grpc_kit-0.1.8 lib/grpc_kit/session/stream.rb