lib/hatetepe/client.rb in hatetepe-0.4.1 vs lib/hatetepe/client.rb in hatetepe-0.5.0.pre

- old
+ new

@@ -1,218 +1,247 @@ require "em-synchrony" require "eventmachine" -require "rack" -require "uri" require "hatetepe/builder" require "hatetepe/connection" -require "hatetepe/deferred_status_fix" require "hatetepe/parser" require "hatetepe/request" require "hatetepe/version" -module Hatetepe - class Client < Hatetepe::Connection; end -end +module Hatetepe::Client + include Hatetepe::Connection -require "hatetepe/client/keep_alive" -require "hatetepe/client/pipeline" + # @api private + Job = Struct.new(:fiber, :request, :sent, :response) -class Hatetepe::Client - attr_reader :app, :config - attr_reader :parser, :builder - attr_reader :requests, :pending_transmission, :pending_response - + # The default configuration. + # + # @api public + CONFIG_DEFAULTS = { + :timeout => 5, + :connect_timeout => 5 + } + + # The configuration for this Client instance. + # + # @api public + attr_reader :config + + # The pipe of middleware and request transmission/response reception. + # + # @api private + attr_reader :app + + # Initializes a new Client instance. + # + # @param [Hash] config + # Configuration values that overwrite the defaults. + # + # @api semipublic def initialize(config) - @config = config - @parser, @builder = Hatetepe::Parser.new, Hatetepe::Builder.new - - @requests = [] - @pending_transmission, @pending_response = {}, {} - - @app = Rack::Builder.new.tap do |b| - b.use KeepAlive - b.use Pipeline - b.run method(:send_request) - end.to_app - - super + @config = CONFIG_DEFAULTS.merge(config) end - + + # Initializes the parser, request queue, and middleware pipe. + # + # TODO: Use +Rack::Builder+ for building the app pipe. + # + # @see EM::Connection#post_init + # + # @api semipublic def post_init - parser.on_response << method(:receive_response) - # XXX check if the connection is still present - builder.on_write << method(:send_data) - #builder.on_write {|data| p "client >> #{data}" } - - self.processing_enabled = true + @builder, @parser = Hatetepe::Builder.new, Hatetepe::Parser.new + @builder.on_write << method(:send_data) + # @builder.on_write {|data| p "|--> #{data}" } + @parser.on_response << method(:receive_response) + + @queue = [] + + @app = method(:send_request) + + self.comm_inactivity_timeout = config[:timeout] + self.pending_connect_timeout = config[:connect_timeout] end - + + # Feeds response data into the parser. + # + # @see EM::Connection#receive_data + # + # @param [String] data + # The received data that's gonna be fed into the parser. + # + # @api semipublic def receive_data(data) - #p "client << #{data}" - parser << data - rescue => e - close_connection - raise e + # p "|<-- #{data}" + @parser << data end - - def send_request(request) - id = request.object_id - - request.headers.delete "X-Hatetepe-Single" - builder.request request.to_a - pending_transmission[id].succeed - - pending_response[id] = EM::DefaultDeferrable.new - EM::Synchrony.sync pending_response[id] - ensure - pending_response.delete id + + # Aborts all outstanding requests. + # + # @see EM::Connection#unbind + # + # @api semipublic + def unbind(reason) + super + @queue.each {|job| job.fiber.resume(:kill) } end - - def receive_response(response) - requests.find {|req| !req.response }.tap do |req| - req.response = response - pending_response[req.object_id].succeed response - end - end - - def <<(request) - request.headers["Host"] ||= "#{config[:host]}:#{config[:port]}" - request.connection = self - unless processing_enabled? - request.fail - return - end - - requests << request - + # Sends a request and waits for the response without blocking. + # + # Transmission and reception are performed within a separate +Fiber+. + # +#succeed+ and +#fail+ will be called on the +request+ passing the + # response, depending on whether the response indicates success (100-399) + # or failure (400-599). + # + # The request will +#fail+ with a +nil+ response if the connection was + # closed for whatever reason. + # + # @api public + def <<(request) Fiber.new do - begin - pending_transmission[request.object_id] = EM::DefaultDeferrable.new - - app.call(request).tap do |response| - request.response = response - # XXX check for response.nil? - status = (response && response.success?) ? :succeed : :fail - requests.delete(request).send status, response - end - ensure - pending_transmission.delete request.object_id + response = @app.call(request) + + if !response || response.failure? + request.fail(response) + else + request.succeed(response) end end.resume end - - def request(verb, uri, headers = {}, body = nil, http_version = "1.1") - headers["User-Agent"] ||= "hatetepe/#{Hatetepe::VERSION}" - - body = wrap_body(body) - headers, body = encode_body(headers.dup, body) - - request = Hatetepe::Request.new(verb, uri, headers, body, http_version) - self << request - # XXX shouldn't this happen in ::request ? - self.processing_enabled = false - - EM::Synchrony.sync request - - request.response.body.close_write if request.verb == "HEAD" - - request.response + # Builds a +Request+, sends it, and blocks while waiting for the response. + # + # @param [Symbol, String] verb + # The HTTP method verb, e.g. +:get+ or +"PUT"+. + # @param [String, URI] uri + # The request URI. + # @param [Hash] headers (optional) + # The request headers. + # @param [#each] body (optional) + # A request body object whose +#each+ method yields objects that respond + # to +#to_s+. + # + # @return [Hatetepe::Response, nil] + # + # @api public + def request(verb, uri, headers = {}, body = []) + request = Hatetepe::Request.new(verb, uri, headers, body) + self << request + EM::Synchrony.sync(request) end - + + # Gracefully stops the client. + # + # Waits for all requests to finish and then stops the client. + # + # @api public def stop - unless requests.empty? - last_response = EM::Synchrony.sync(requests.last) - EM::Synchrony.sync last_response.body if last_response.body - end + wait + stop! + end + + # Immediately stops the client by closing the connection. + # + # This will lead to EventMachine's event loop calling {#unbind}, which fail + # all outstanding requests. + # + # @see #unbind + # + # @api public + def stop! close_connection end - - def unbind - super - - EM.next_tick do - requests.each do |req| - # fail state triggers - req.object_id.tap do |id| - pending_transmission[id].fail if pending_transmission[id] - pending_response[id].fail if pending_response[id] - end - # fail reponse body if the response has already been started - if req.response - req.response.body.tap {|b| b.close_write unless b.closed_write? } - end - # XXX FiberError: dead fiber called because req already succeeded - # or failed, see github.com/eventmachine/eventmachine/issues/287 - req.fail req.response - end + + # Blocks until the last request has finished receiving its response. + # + # Returns immediately if there are no outstanding requests. + # + # @api public + def wait + if job = @queue.last + EM::Synchrony.sync(job.request) + EM::Synchrony.sync(job.response.body) if job.response end end - - def wrap_body(body) - if body.respond_to? :each - body - elsif body.respond_to? :read - [body.read] - elsif body - [body] - else - [] - end + + # Starts a new Client. + # + # @param [Hash] config + # The +:host+ and +:port+ the Client should connect to. + # + # @return [Hatetepe::Client] + # The new Client instance. + # + # @api public + def self.start(config) + EM.connect(config[:host], config[:port], self, config) end - - def encode_body(headers, body) - multipart, urlencoded = false, false - if Hash === body - query = lambda do |value| - case value - when Array - value.each &query - when Hash - value.values.each &query - when Rack::Multipart::UploadedFile - multipart = true - end - end - body.values.each &query - urlencoded = !multipart - end - - body = if multipart - boundary = Rack::Multipart::MULTIPART_BOUNDARY - headers["Content-Type"] = "multipart/form-data; boundary=#{boundary}" - [Rack::Multipart.build_multipart(body)] - elsif urlencoded - headers["Content-Type"] = "application/x-www-form-urlencoded" - [Rack::Utils.build_nested_query(body)] - else - body - end - - [headers, body] + + # @api public + def self.request(verb, uri, headers = {}, body = []) end - - class << self - def start(config) - EM.connect config[:host], config[:port], self, config + + # Feeds the request into the builder and blocks while waiting for the + # response to arrive. + # + # Supports the request bit of HTTP pipelining by waiting until the previous + # request has been sent. + # + # @param [Hatetepe::Request] request + # The request that's gonna be sent. + # + # @return [Hatetepe::Response, nil] + # The received response or +nil+ if the connection has been closed before + # receiving a response. + # + # @api private + def send_request(request) + previous = @queue.last + current = Job.new(Fiber.current, request, false) + @queue << current + + # wait for the previous request to be sent + while previous && !previous.sent + return if Fiber.yield == :kill end - - def request(verb, uri, headers = {}, body = nil) - uri = URI(uri) - client = start(:host => uri.host, :port => uri.port) - - headers["X-Hatetepe-Single"] = true - client.request(verb, uri.request_uri, headers, body).tap do |*| - client.stop - end + + # send the request + self.comm_inactivity_timeout = 0 + @builder.request(request.to_a) + current.sent = true + self.comm_inactivity_timeout = config[:timeout] + + # wait for the response + while !current.response + return if Fiber.yield == :kill end + + # clean up and return response + @queue.delete(current) + current.response end - - [self, self.singleton_class].each do |cls| - [:get, :head, :post, :put, :delete, - :options, :trace, :connect].each do |verb| - cls.send(:define_method, verb) {|uri, *args| request verb, uri, *args } + + # Relates an incoming response to the corresponding request. + # + # Supports the response bit of HTTP pipelining by relating responses to + # requests in the order the requests were sent. + # + # TODO: raise a more meaningful error. + # + # @param [Hatetepe::Response] response + # The incoming response + # + # @raise [RuntimeError] + # There is no request that's waiting for a response. + # + # @api private + def receive_response(response) + query = proc {|j| j.response.nil? } + + if job = @queue.find(&query) + job.response = response + job.fiber.resume + else + raise "Received response but didn't expect one: #{response.status}" end end end