lib/grpc_kit/session/stream.rb in grpc_kit-0.1.2 vs lib/grpc_kit/session/stream.rb in grpc_kit-0.1.3
- old
+ new
@@ -1,56 +1,45 @@
# frozen_string_literal: false
require 'forwardable'
require 'grpc_kit/session/buffer'
require 'grpc_kit/session/headers'
+require 'grpc_kit/session/stream_status'
module GrpcKit
module Session
class Stream
extend Forwardable
- delegate end_write: :@pending_send_data
- delegate end_read: :@pending_recv_data
+ delegate %i[end_write end_write?] => :@pending_send_data
+ delegate %i[end_read end_read?] => :@pending_recv_data
+ delegate %i[close close_remote close_local close? close_remote? close_local?] => :@status
- attr_reader :headers, :pending_send_data, :pending_recv_data
- attr_accessor :local_end_stream, :remote_end_stream, :inflight, :stream_id
+ attr_reader :headers, :pending_send_data, :pending_recv_data, :trailer_data, :status
+ attr_accessor :inflight, :stream_id
- def initialize(stream_id:, send_data: GrpcKit::Session::Buffer.new)
+ def initialize(stream_id:, send_data: nil, recv_data: nil)
@stream_id = stream_id
@end_read_stream = false
@headers = GrpcKit::Session::Headers.new
- @pending_send_data = send_data
- @pending_recv_data = GrpcKit::Session::Buffer.new
+ @pending_send_data = send_data || GrpcKit::Session::Buffer.new
+ @pending_recv_data = recv_data || GrpcKit::Session::Buffer.new
- @local_end_stream = false
- @remote_end_stream = false
@inflight = false
+ @trailer_data = {}
+ @status = GrpcKit::Session::StreamStatus.new
end
+ def write_trailers_data(tariler)
+ @trailer_data = tariler
+ end
+
def write_send_data(data, last: false)
@pending_send_data.write(data, last: last)
end
def read_recv_data(last: false)
@pending_recv_data.read(last: last)
- end
-
- def end_write?
- @local_end_stream || @pending_send_data.end_write?
- end
-
- def end_read?
- @remote_end_stream || @pending_recv_data.end_read?
- end
-
- def end_stream?
- end_read? && end_write?
- end
-
- def end_stream
- end_read
- end_write
end
def add_header(name, value)
@headers.add(name, value)
end