lib/httpx/plugins/stream.rb in httpx-0.13.2 vs lib/httpx/plugins/stream.rb in httpx-0.14.0
- old
+ new
@@ -1,8 +1,95 @@
# frozen_string_literal: true
module HTTPX
+ class StreamResponse
+ def initialize(request, session, connections)
+ @request = request
+ @session = session
+ @connections = connections
+ @options = @request.options
+ end
+
+ def each(&block)
+ return enum_for(__method__) unless block_given?
+
+ raise Error, "response already streamed" if @response
+
+ @request.stream = self
+
+ begin
+ @on_chunk = block
+
+ if @request.response
+ # if we've already started collecting the payload, yield it first
+ # before proceeding
+ body = @request.response.body
+
+ body.each do |chunk|
+ on_chunk(chunk)
+ end
+ end
+
+ response.raise_for_status
+ response.close
+ ensure
+ @on_chunk = nil
+ end
+ end
+
+ def each_line
+ return enum_for(__method__) unless block_given?
+
+ line = +""
+
+ each do |chunk|
+ line << chunk
+
+ while (idx = line.index("\n"))
+ yield line.byteslice(0..idx - 1)
+
+ line = line.byteslice(idx + 1..-1)
+ end
+ end
+ end
+
+ # This is a ghost method. It's to be used ONLY internally, when processing streams
+ def on_chunk(chunk)
+ raise NoMethodError unless @on_chunk
+
+ @on_chunk.call(chunk)
+ end
+
+ # :nocov:
+ def inspect
+ "#<StreamResponse:#{object_id}>"
+ end
+ # :nocov:
+
+ def to_s
+ response.to_s
+ end
+
+ private
+
+ def response
+ @session.__send__(:receive_requests, [@request], @connections, @options) until @request.response
+
+ @request.response
+ end
+
+ def respond_to_missing?(meth, *args)
+ response.respond_to?(meth, *args) || super
+ end
+
+ def method_missing(meth, *args, &block)
+ return super unless response.respond_to?(meth)
+
+ response.__send__(meth, *args, &block)
+ end
+ end
+
module Plugins
#
# This plugin adds support for stream response (text/event-stream).
#
# https://gitlab.com/honeyryderchuck/httpx/wikis/Stream
@@ -13,14 +100,17 @@
def request(*args, stream: false, **options)
return super(*args, **options) unless stream
requests = args.first.is_a?(Request) ? args : build_requests(*args, options)
-
raise Error, "only 1 response at a time is supported for streaming requests" unless requests.size == 1
- StreamResponse.new(requests.first, self)
+ request = requests.first
+
+ connections = _send_requests(requests, request.options)
+
+ StreamResponse.new(request, self, connections)
end
end
module RequestMethods
attr_accessor :stream
@@ -51,81 +141,13 @@
super
end
end
- class StreamResponse
- def initialize(request, session)
- @request = request
- @session = session
- @options = @request.options
- end
-
- def each(&block)
- return enum_for(__method__) unless block_given?
-
- raise Error, "response already streamed" if @response
-
- @request.stream = self
-
- begin
- @on_chunk = block
-
- response.raise_for_status
- response.close
- ensure
- @on_chunk = nil
- end
- end
-
- def each_line
- return enum_for(__method__) unless block_given?
-
- line = +""
-
- each do |chunk|
- line << chunk
-
- while (idx = line.index("\n"))
- yield line.byteslice(0..idx - 1)
-
- line = line.byteslice(idx + 1..-1)
- end
- end
- end
-
- # This is a ghost method. It's to be used ONLY internally, when processing streams
- def on_chunk(chunk)
- raise NoMethodError unless @on_chunk
-
- @on_chunk.call(chunk)
- end
-
- # :nocov:
- def inspect
- "#<StreamResponse:#{object_id}>"
- end
- # :nocov:
-
- def to_s
- response.to_s
- end
-
- private
-
- def response
- @response ||= @session.__send__(:send_requests, @request, @options).first
- end
-
- def respond_to_missing?(*args)
- @options.response_class.respond_to?(*args) || super
- end
-
- def method_missing(meth, *args, &block)
- return super unless @options.response_class.public_method_defined?(meth)
-
- response.__send__(meth, *args, &block)
- end
+ def self.const_missing(const_name)
+ super unless const_name == :StreamResponse
+ warn "DEPRECATION WARNING: the class #{self}::StreamResponse is deprecated. Use HTTPX::StreamResponse instead."
+ HTTPX::StreamResponse
end
end
register_plugin :stream, Stream
end
end