Sha256: 5fa8c77560e843e83074d3fc0b9514831826462c5f7310ac6c1c95d36b6e17d5

Contents?: true

Size: 1.86 KB

Versions: 3

Compression:

Stored size: 1.86 KB

Contents

# frozen_string_literal: true

module GrpcKit
  module Session
    class RecvBuffer
      class Closed < Exception; end

      def initialize
        @buffer = +''.b
        @end = false
        @queue = Queue.new
      end

      # @param data [String]
      # @return [void]
      def write(data)
        @queue << data
      rescue ClosedQueueError
        raise Closed, "[BUG] write to closed queue"
      end

      # @return [Boolean]
      def empty?
        @queue.empty?
      end

      # @return [Boolean]
      def closed?
        @queue.closed?
      end

      # @return [void]
      def close
        @queue.close
      end

      # This method is not thread safe (as RecvBuffer is designed to be a multi-producer/single-consumer)
      # @param size [Integer,nil]
      # @param last [Boolean]
      # @param blocking [Boolean]
      # @return [String,Symbol,nil]
      def read(size = nil, last: false, blocking:)
        if @buffer.empty?
          return nil if empty? && closed?
          return :wait_readable if empty? && !blocking

          # Consume existing data as much as possible to continue (important on clients where single-threaded)
          loop do
            begin
              data = @queue.shift(!blocking)
              @buffer << data if data
            rescue ThreadError
              break
            end

            break if empty?
          end 
        end

        buf = if size.nil? || @buffer.bytesize < size
          rbuf = @buffer
          @buffer = ''.b
          rbuf
        else
          @buffer.freeze
          rbuf = @buffer.byteslice(0, size)
          @buffer = @buffer.byteslice(size, @buffer.bytesize)
          rbuf
        end

        end_read if last
        buf
      end

      # @return [Boolean]
      def end_read?
        @end
      end

      # @return [void]
      def end_read
        @end = true
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
grpc_kit-0.5.1 lib/grpc_kit/session/recv_buffer.rb
grpc_kit-0.5.0 lib/grpc_kit/session/recv_buffer.rb
grpc_kit-0.4.0 lib/grpc_kit/session/recv_buffer.rb