Sha256: 388d61571c43a6a52b1d2c3226982a0071e64f3b77aa48287c1ea4b3b2cd21d7
Contents?: true
Size: 1.76 KB
Versions: 16
Compression:
Stored size: 1.76 KB
Contents
# frozen_string_literal: true export_default :LineReader Core = import('./core') # a stream that can read single lines from another stream class LineReader # Initializes the line reader with a source and optional line separator # @param source [Stream] source stream # @param sep [String] line separator def initialize(source = nil, sep = $/) @source = source if source source.on(:data) { |data| push(data) } source.on(:close) { close } source.on(:error) { |err| error(err) } end @read_buffer = +'' @separator = sep @separator_size = sep.bytesize end # Pushes data into the read buffer and emits lines # @param data [String] data to be read # @return [void] def push(data) @read_buffer << data emit_lines end # Emits lines from the read buffer # @return [void] def emit_lines while (line = _gets) @lines_promise.resolve(line) end end # Returns a line sliced from the read buffer # @return [String] line def _gets idx = @read_buffer.index(@separator) idx && @read_buffer.slice!(0, idx + @separator_size) end def gets Core.promise do |p| @lines_promise = p end end # Returns a async generator of lines # @return [Promise] line generator def lines Core.generator do |p| @lines_promise = p end end # Iterates asynchronously over lines received # @return [void] def each_line(&block) lines.each(&block) end # Closes the stream and cancels any pending reads # @return [void] def close @lines_promise&.stop end # handles error generated by source # @param err [Exception] raised error # @return [void] def error(err) return unless @lines_promise @lines_promise.stop @lines_promise.reject(err) end end
Version data entries
16 entries across 16 versions & 1 rubygems