Sha256: 5fa8c77560e843e83074d3fc0b9514831826462c5f7310ac6c1c95d36b6e17d5
Contents?: true
Size: 1.86 KB
Versions: 3
Compression:
Stored size: 1.86 KB
Contents
# frozen_string_literal: true module GrpcKit module Session class RecvBuffer class Closed < Exception; end def initialize @buffer = +''.b @end = false @queue = Queue.new end # @param data [String] # @return [void] def write(data) @queue << data rescue ClosedQueueError raise Closed, "[BUG] write to closed queue" end # @return [Boolean] def empty? @queue.empty? end # @return [Boolean] def closed? @queue.closed? end # @return [void] def close @queue.close end # This method is not thread safe (as RecvBuffer is designed to be a multi-producer/single-consumer) # @param size [Integer,nil] # @param last [Boolean] # @param blocking [Boolean] # @return [String,Symbol,nil] def read(size = nil, last: false, blocking:) if @buffer.empty? return nil if empty? && closed? return :wait_readable if empty? && !blocking # Consume existing data as much as possible to continue (important on clients where single-threaded) loop do begin data = @queue.shift(!blocking) @buffer << data if data rescue ThreadError break end break if empty? end end buf = if size.nil? || @buffer.bytesize < size rbuf = @buffer @buffer = ''.b rbuf else @buffer.freeze rbuf = @buffer.byteslice(0, size) @buffer = @buffer.byteslice(size, @buffer.bytesize) rbuf end end_read if last buf end # @return [Boolean] def end_read? @end end # @return [void] def end_read @end = true end end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
grpc_kit-0.5.1 | lib/grpc_kit/session/recv_buffer.rb |
grpc_kit-0.5.0 | lib/grpc_kit/session/recv_buffer.rb |
grpc_kit-0.4.0 | lib/grpc_kit/session/recv_buffer.rb |