lib/httpx/plugins/stream.rb in httpx-0.9.0 vs lib/httpx/plugins/stream.rb in httpx-0.10.0

- old
+ new

@@ -5,30 +5,126 @@ # # This plugin adds support for stream response (text/event-stream). # module Stream module InstanceMethods - def stream - headers("accept" => "text/event-stream", - "cache-control" => "no-cache") + private + + def request(*args, stream: false, **options) + return super(*args, **options) unless stream + + requests = args.first.is_a?(Request) ? args : build_requests(*args, options) + + raise Error, "only 1 response at a time is supported for streaming requests" unless requests.size == 1 + + StreamResponse.new(requests.first, self) end end + module RequestMethods + attr_accessor :stream + end + module ResponseMethods - def complete? - super || - stream? && - @stream_complete + def stream + @request.stream end + end - def stream? - @headers["content-type"].start_with?("text/event-stream") + module ResponseBodyMethods + def initialize(*, **) + super + @stream = @response.stream end - def <<(data) - res = super - @stream_complete = true if String(data).end_with?("\n\n") - res + def write(chunk) + return super unless @stream + + @stream.on_chunk(chunk.to_s.dup) + end + + private + + def transition(*) + return if @stream + + super + end + end + + class StreamResponse + def initialize(request, session) + @request = request + @session = session + @options = @request.options + end + + def each(&block) + return enum_for(__method__) unless block_given? + + raise Error, "response already streamed" if @response + + @request.stream = self + + begin + @on_chunk = block + + response.raise_for_status + response.close + ensure + @on_chunk = nil + end + end + + def each_line + return enum_for(__method__) unless block_given? + + line = +"" + + each do |chunk| + line << chunk + + while (idx = line.index("\n")) + yield line.byteslice(0..idx - 1) + + line = line.byteslice(idx + 1..-1) + end + end + end + + # This is a ghost method. It's to be used ONLY internally, when processing streams + def on_chunk(chunk) + raise NoMethodError unless @on_chunk + + @on_chunk.call(chunk) + end + + # :nocov: + def inspect + "#<StreamResponse:#{object_id}>" + end + # :nocov: + + def to_s + response.to_s + end + + private + + def response + @response ||= @session.__send__(:send_requests, @request, @options).first + end + + def respond_to_missing?(*args) + @options.response_class.respond_to?(*args) || super + end + + def method_missing(meth, *args, &block) + if @options.response_class.public_method_defined?(meth) + response.__send__(meth, *args, &block) + else + super + end end end end register_plugin :stream, Stream end