lib/hatetepe/client.rb in hatetepe-0.4.1 vs lib/hatetepe/client.rb in hatetepe-0.5.0.pre
- old
+ new
@@ -1,218 +1,247 @@
require "em-synchrony"
require "eventmachine"
-require "rack"
-require "uri"
require "hatetepe/builder"
require "hatetepe/connection"
-require "hatetepe/deferred_status_fix"
require "hatetepe/parser"
require "hatetepe/request"
require "hatetepe/version"
-module Hatetepe
- class Client < Hatetepe::Connection; end
-end
+module Hatetepe::Client
+ include Hatetepe::Connection
-require "hatetepe/client/keep_alive"
-require "hatetepe/client/pipeline"
+ # @api private
+ Job = Struct.new(:fiber, :request, :sent, :response)
-class Hatetepe::Client
- attr_reader :app, :config
- attr_reader :parser, :builder
- attr_reader :requests, :pending_transmission, :pending_response
-
+ # The default configuration.
+ #
+ # @api public
+ CONFIG_DEFAULTS = {
+ :timeout => 5,
+ :connect_timeout => 5
+ }
+
+ # The configuration for this Client instance.
+ #
+ # @api public
+ attr_reader :config
+
+ # The pipe of middleware and request transmission/response reception.
+ #
+ # @api private
+ attr_reader :app
+
+ # Initializes a new Client instance.
+ #
+ # @param [Hash] config
+ # Configuration values that overwrite the defaults.
+ #
+ # @api semipublic
def initialize(config)
- @config = config
- @parser, @builder = Hatetepe::Parser.new, Hatetepe::Builder.new
-
- @requests = []
- @pending_transmission, @pending_response = {}, {}
-
- @app = Rack::Builder.new.tap do |b|
- b.use KeepAlive
- b.use Pipeline
- b.run method(:send_request)
- end.to_app
-
- super
+ @config = CONFIG_DEFAULTS.merge(config)
end
-
+
+ # Initializes the parser, request queue, and middleware pipe.
+ #
+ # TODO: Use +Rack::Builder+ for building the app pipe.
+ #
+ # @see EM::Connection#post_init
+ #
+ # @api semipublic
def post_init
- parser.on_response << method(:receive_response)
- # XXX check if the connection is still present
- builder.on_write << method(:send_data)
- #builder.on_write {|data| p "client >> #{data}" }
-
- self.processing_enabled = true
+ @builder, @parser = Hatetepe::Builder.new, Hatetepe::Parser.new
+ @builder.on_write << method(:send_data)
+ # @builder.on_write {|data| p "|--> #{data}" }
+ @parser.on_response << method(:receive_response)
+
+ @queue = []
+
+ @app = method(:send_request)
+
+ self.comm_inactivity_timeout = config[:timeout]
+ self.pending_connect_timeout = config[:connect_timeout]
end
-
+
+ # Feeds response data into the parser.
+ #
+ # @see EM::Connection#receive_data
+ #
+ # @param [String] data
+ # The received data that's gonna be fed into the parser.
+ #
+ # @api semipublic
def receive_data(data)
- #p "client << #{data}"
- parser << data
- rescue => e
- close_connection
- raise e
+ # p "|<-- #{data}"
+ @parser << data
end
-
- def send_request(request)
- id = request.object_id
-
- request.headers.delete "X-Hatetepe-Single"
- builder.request request.to_a
- pending_transmission[id].succeed
-
- pending_response[id] = EM::DefaultDeferrable.new
- EM::Synchrony.sync pending_response[id]
- ensure
- pending_response.delete id
+
+ # Aborts all outstanding requests.
+ #
+ # @see EM::Connection#unbind
+ #
+ # @api semipublic
+ def unbind(reason)
+ super
+ @queue.each {|job| job.fiber.resume(:kill) }
end
-
- def receive_response(response)
- requests.find {|req| !req.response }.tap do |req|
- req.response = response
- pending_response[req.object_id].succeed response
- end
- end
-
- def <<(request)
- request.headers["Host"] ||= "#{config[:host]}:#{config[:port]}"
- request.connection = self
- unless processing_enabled?
- request.fail
- return
- end
-
- requests << request
-
+ # Sends a request and waits for the response without blocking.
+ #
+ # Transmission and reception are performed within a separate +Fiber+.
+ # +#succeed+ and +#fail+ will be called on the +request+ passing the
+ # response, depending on whether the response indicates success (100-399)
+ # or failure (400-599).
+ #
+ # The request will +#fail+ with a +nil+ response if the connection was
+ # closed for whatever reason.
+ #
+ # @api public
+ def <<(request)
Fiber.new do
- begin
- pending_transmission[request.object_id] = EM::DefaultDeferrable.new
-
- app.call(request).tap do |response|
- request.response = response
- # XXX check for response.nil?
- status = (response && response.success?) ? :succeed : :fail
- requests.delete(request).send status, response
- end
- ensure
- pending_transmission.delete request.object_id
+ response = @app.call(request)
+
+ if !response || response.failure?
+ request.fail(response)
+ else
+ request.succeed(response)
end
end.resume
end
-
- def request(verb, uri, headers = {}, body = nil, http_version = "1.1")
- headers["User-Agent"] ||= "hatetepe/#{Hatetepe::VERSION}"
-
- body = wrap_body(body)
- headers, body = encode_body(headers.dup, body)
-
- request = Hatetepe::Request.new(verb, uri, headers, body, http_version)
- self << request
- # XXX shouldn't this happen in ::request ?
- self.processing_enabled = false
-
- EM::Synchrony.sync request
-
- request.response.body.close_write if request.verb == "HEAD"
-
- request.response
+ # Builds a +Request+, sends it, and blocks while waiting for the response.
+ #
+ # @param [Symbol, String] verb
+ # The HTTP method verb, e.g. +:get+ or +"PUT"+.
+ # @param [String, URI] uri
+ # The request URI.
+ # @param [Hash] headers (optional)
+ # The request headers.
+ # @param [#each] body (optional)
+ # A request body object whose +#each+ method yields objects that respond
+ # to +#to_s+.
+ #
+ # @return [Hatetepe::Response, nil]
+ #
+ # @api public
+ def request(verb, uri, headers = {}, body = [])
+ request = Hatetepe::Request.new(verb, uri, headers, body)
+ self << request
+ EM::Synchrony.sync(request)
end
-
+
+ # Gracefully stops the client.
+ #
+ # Waits for all requests to finish and then stops the client.
+ #
+ # @api public
def stop
- unless requests.empty?
- last_response = EM::Synchrony.sync(requests.last)
- EM::Synchrony.sync last_response.body if last_response.body
- end
+ wait
+ stop!
+ end
+
+ # Immediately stops the client by closing the connection.
+ #
+ # This will lead to EventMachine's event loop calling {#unbind}, which fail
+ # all outstanding requests.
+ #
+ # @see #unbind
+ #
+ # @api public
+ def stop!
close_connection
end
-
- def unbind
- super
-
- EM.next_tick do
- requests.each do |req|
- # fail state triggers
- req.object_id.tap do |id|
- pending_transmission[id].fail if pending_transmission[id]
- pending_response[id].fail if pending_response[id]
- end
- # fail reponse body if the response has already been started
- if req.response
- req.response.body.tap {|b| b.close_write unless b.closed_write? }
- end
- # XXX FiberError: dead fiber called because req already succeeded
- # or failed, see github.com/eventmachine/eventmachine/issues/287
- req.fail req.response
- end
+
+ # Blocks until the last request has finished receiving its response.
+ #
+ # Returns immediately if there are no outstanding requests.
+ #
+ # @api public
+ def wait
+ if job = @queue.last
+ EM::Synchrony.sync(job.request)
+ EM::Synchrony.sync(job.response.body) if job.response
end
end
-
- def wrap_body(body)
- if body.respond_to? :each
- body
- elsif body.respond_to? :read
- [body.read]
- elsif body
- [body]
- else
- []
- end
+
+ # Starts a new Client.
+ #
+ # @param [Hash] config
+ # The +:host+ and +:port+ the Client should connect to.
+ #
+ # @return [Hatetepe::Client]
+ # The new Client instance.
+ #
+ # @api public
+ def self.start(config)
+ EM.connect(config[:host], config[:port], self, config)
end
-
- def encode_body(headers, body)
- multipart, urlencoded = false, false
- if Hash === body
- query = lambda do |value|
- case value
- when Array
- value.each &query
- when Hash
- value.values.each &query
- when Rack::Multipart::UploadedFile
- multipart = true
- end
- end
- body.values.each &query
- urlencoded = !multipart
- end
-
- body = if multipart
- boundary = Rack::Multipart::MULTIPART_BOUNDARY
- headers["Content-Type"] = "multipart/form-data; boundary=#{boundary}"
- [Rack::Multipart.build_multipart(body)]
- elsif urlencoded
- headers["Content-Type"] = "application/x-www-form-urlencoded"
- [Rack::Utils.build_nested_query(body)]
- else
- body
- end
-
- [headers, body]
+
+ # @api public
+ def self.request(verb, uri, headers = {}, body = [])
end
-
- class << self
- def start(config)
- EM.connect config[:host], config[:port], self, config
+
+ # Feeds the request into the builder and blocks while waiting for the
+ # response to arrive.
+ #
+ # Supports the request bit of HTTP pipelining by waiting until the previous
+ # request has been sent.
+ #
+ # @param [Hatetepe::Request] request
+ # The request that's gonna be sent.
+ #
+ # @return [Hatetepe::Response, nil]
+ # The received response or +nil+ if the connection has been closed before
+ # receiving a response.
+ #
+ # @api private
+ def send_request(request)
+ previous = @queue.last
+ current = Job.new(Fiber.current, request, false)
+ @queue << current
+
+ # wait for the previous request to be sent
+ while previous && !previous.sent
+ return if Fiber.yield == :kill
end
-
- def request(verb, uri, headers = {}, body = nil)
- uri = URI(uri)
- client = start(:host => uri.host, :port => uri.port)
-
- headers["X-Hatetepe-Single"] = true
- client.request(verb, uri.request_uri, headers, body).tap do |*|
- client.stop
- end
+
+ # send the request
+ self.comm_inactivity_timeout = 0
+ @builder.request(request.to_a)
+ current.sent = true
+ self.comm_inactivity_timeout = config[:timeout]
+
+ # wait for the response
+ while !current.response
+ return if Fiber.yield == :kill
end
+
+ # clean up and return response
+ @queue.delete(current)
+ current.response
end
-
- [self, self.singleton_class].each do |cls|
- [:get, :head, :post, :put, :delete,
- :options, :trace, :connect].each do |verb|
- cls.send(:define_method, verb) {|uri, *args| request verb, uri, *args }
+
+ # Relates an incoming response to the corresponding request.
+ #
+ # Supports the response bit of HTTP pipelining by relating responses to
+ # requests in the order the requests were sent.
+ #
+ # TODO: raise a more meaningful error.
+ #
+ # @param [Hatetepe::Response] response
+ # The incoming response
+ #
+ # @raise [RuntimeError]
+ # There is no request that's waiting for a response.
+ #
+ # @api private
+ def receive_response(response)
+ query = proc {|j| j.response.nil? }
+
+ if job = @queue.find(&query)
+ job.response = response
+ job.fiber.resume
+ else
+ raise "Received response but didn't expect one: #{response.status}"
end
end
end