lib/hatetepe/client.rb in hatetepe-0.5.2 vs lib/hatetepe/client.rb in hatetepe-0.6.0.pre

- old
+ new

@@ -1,327 +1,79 @@ -require "em-synchrony" -require "eventmachine" +# encoding: utf-8 -require "hatetepe/builder" -require "hatetepe/connection" -require "hatetepe/parser" -require "hatetepe/request" -require "hatetepe/version" - module Hatetepe - HatetepeError = Class.new(StandardError) - RequestError = Class.new(HatetepeError) - ClientError = Class.new(RequestError) - ServerError = Class.new(RequestError) -end + class Client + include Support::Handlers + include Connection::Status -module Hatetepe::Client - include Hatetepe::Connection + attr_reader :config - # @api private - Job = Struct.new(:fiber, :request, :sent, :response) + def initialize(config) + @config = config + @requests = [] - # The default configuration. - # - # @api public - CONFIG_DEFAULTS = { - :timeout => 5, - :connect_timeout => 5 - } + setup_connection + setup_handlers + notify_handlers(:post_init) + end - # The configuration for this Client instance. - # - # @api public - attr_reader :config + def request(http_method, uri) + request = Request.new(http_method, uri) + request.finished.fulfill + perform(request) + request.served.sync + end - # The pipe of middleware and request transmission/response reception. - # - # @api private - attr_reader :app + def perform(request) + @requests << request - # Initializes a new Client instance. - # - # @param [Hash] config - # Configuration values that overwrite the defaults. - # - # @api semipublic - def initialize(config) - @config = CONFIG_DEFAULTS.merge(config) - @ssl_handshake_completed = EM::DefaultDeferrable.new - end - - # Initializes the parser, request queue, and middleware pipe. - # - # @see EM::Connection#post_init - # - # @api semipublic - def post_init - @builder, @parser = Hatetepe::Builder.new, Hatetepe::Parser.new - @builder.on_write << method(:send_data) - # @builder.on_write {|data| p "|--> #{data}" } - @parser.on_response << method(:receive_response) - - @queue = [] - - @app = proc {|request| send_request(request) } - - self.comm_inactivity_timeout = config[:timeout] - self.pending_connect_timeout = config[:connect_timeout] - - start_tls if config[:ssl] - end - - def ssl_handshake_completed - EM::Synchrony.next_tick { @ssl_handshake_completed.succeed } - end - - # Feeds response data into the parser. - # - # @see EM::Connection#receive_data - # - # @param [String] data - # The received data that's gonna be fed into the parser. - # - # @api semipublic - def receive_data(data) - # p "|<-- #{data}" - @parser << data - end - - # Aborts all outstanding requests. - # - # @see EM::Connection#unbind - # - # @api semipublic - def unbind(reason) - super - @queue.each {|job| job.fiber.resume(:kill) } - end - - # Sends a request and waits for the response without blocking. - # - # Transmission and reception are performed within a separate +Fiber+. - # +#succeed+ and +#fail+ will be called on the +request+ passing the - # response, depending on whether the response indicates success (100-399) - # or failure (400-599). - # - # The request will +#fail+ with a +nil+ response if the connection was - # closed for whatever reason. - # - # TODO find out if there are more cases where the response body - # should automatically be closed. - # - # @api public - def <<(request) - Fiber.new do - EM::Synchrony.sync(@ssl_handshake_completed) if config[:ssl] - - response = @app.call(request) - - if response && (request.verb == "HEAD" || response.status == 204) - response.body.close_write + fiber = Fiber.new do + notify_handlers(:perform, request) + @connection.serialize(request) end + fiber.resume + end - if !response - request.fail - elsif response.failure? - request.fail(response) - else - request.succeed(response) - end - end.resume - end - - # Builds a +Request+, sends it, and blocks while waiting for the response. - # - # @param [Symbol, String] verb - # The HTTP method verb, e.g. +:get+ or +"PUT"+. - # @param [String, URI] uri - # The request URI. - # @param [Hash] headers (optional) - # The request headers. - # @param [#each] body (optional) - # A request body object whose +#each+ method yields objects that respond - # to +#to_s+. - # - # @return [Hatetepe::Response, nil] - # - # @api public - def request(verb, uri, headers = {}, body = []) - uri = URI(uri) - uri.scheme ||= @config[:ssl] ? 'http' : 'https' - uri.host ||= @config[:host] - uri.port ||= @config[:port] - - headers['Host'] ||= "#{uri.host}:#{uri.port}" - - request = Hatetepe::Request.new(verb, URI(uri.to_s), headers, body) - self << request - EM::Synchrony.sync(request) - end - - # Like +#request+, but raises errors for 4xx and 5xx responses. - # - # @param [Symbol, String] verb - # The HTTP method verb, e.g. +:get+ or +"PUT"+. - # @param [String, URI] uri - # The request URI. - # @param [Hash] headers (optional) - # The request headers. - # @param [#each] body (optional) - # A request body object whose +#each+ method yields objects that respond - # to +#to_s+. - # - # @return [Hatetepe]::Response, nil] - # - # @raise [Hatetepe::ClientError] - # If the server responded with a 4xx status code. - # @raise [Hatetepe::ServerError] - # If the server responded with a 5xx status code. - # @raise [Hatetepe::RequestError] - # If the client failed to receive any response at all. - def request!(verb, uri, headers = {}, body = []) - response = request(verb, uri, headers, body) - - if response.nil? - raise Hatetepe::RequestError - elsif response.status >= 500 - raise Hatetepe::ServerError - elsif response.status >= 400 - raise Hatetepe::ClientError + def close + @connection.close end - response - end + def receive(response) + unless (request = correlate(response)) + raise ClientError, 'Unable to correlate with request' + end - # Gracefully stops the client. - # - # Waits for all requests to finish and then stops the client. - # - # @api public - def stop - wait - stop! - end - - # Immediately stops the client by closing the connection. - # - # This will lead to EventMachine's event loop calling {#unbind}, which fail - # all outstanding requests. - # - # @see #unbind - # - # @api public - def stop! - close_connection - end - - # Blocks until the last request has finished receiving its response. - # - # Returns immediately if there are no outstanding requests. - # - # @api public - def wait - if job = @queue.last - EM::Synchrony.sync(job.request) - EM::Synchrony.sync(job.response.body) if job.response + setup_cleanup(request, response) + notify_handlers(:receive, request, response) end - end - # Starts a new Client. - # - # @param [Hash] config - # The +:host+ and +:port+ the Client should connect to. - # - # @return [Hatetepe::Client] - # The new Client instance. - # - # @api public - def self.start(config) - EM.connect(config[:host], config[:port], self, config) - end - - # @api public - def self.request(verb, uri, headers = {}, body = []) - uri = URI(uri) - client = start(host: uri.host, port: uri.port, ssl: uri.scheme == 'https') - client.request(verb, uri, headers, body) - end - - # Feeds the request into the builder and blocks while waiting for the - # response to arrive. - # - # Supports the request bit of HTTP pipelining by waiting until the previous - # request has been sent. - # - # @param [Hatetepe::Request] request - # The request that's gonna be sent. - # - # @return [Hatetepe::Response, nil] - # The received response or +nil+ if the connection has been closed before - # receiving a response. - # - # @api private - def send_request(request) - previous = @queue.last - current = Job.new(Fiber.current, request, false) - @queue << current - - # wait for the previous request to be sent - while previous && !previous.sent - return if Fiber.yield == :kill + def teardown(reason) + while (request = @requests.shift) + if (response = request.served.value) + response.finished.reject(reason) + else + request.served.reject(reason) + end + end end - # send the request - self.comm_inactivity_timeout = 0 - @builder.request(request.to_a) - current.sent = true - self.comm_inactivity_timeout = config[:timeout] + private - # wait for the response - while !current.response - return if Fiber.yield == :kill + def setup_connection + @connection = + EM.connect(@config[:address], @config[:port], Connection::EventMachine) + @connection.parse(method(:receive)) + @connection.closed.then(method(:teardown)) end - # clean up and return response - @queue.delete(current) - current.response - end - - # Relates an incoming response to the corresponding request. - # - # Supports the response bit of HTTP pipelining by relating responses to - # requests in the order the requests were sent. - # - # TODO: raise a more meaningful error. - # - # @param [Hatetepe::Response] response - # The incoming response - # - # @raise [RuntimeError] - # There is no request that's waiting for a response. - # - # @api private - def receive_response(response) - query = proc {|j| j.response.nil? } - - if job = @queue.find(&query) - job.response = response - job.fiber.resume - else - raise "Received response but didn't expect one: #{response.status}" + def correlate(response) + @requests + .detect { |request| request.served.pending? } + .tap { |request| request.served.fulfill(response) if request } end - end - module VerbMethods - [ - :get, :head, :options, :put, :post, :delete, :patch, :connect - ].each do |verb| - define_method(verb) do |uri, headers = {}, body = []| - request(verb, uri, headers, body) - end + def setup_cleanup(request, response) + callback = proc { @requests.delete(request) } + response.finished.then(callback, callback) end end - - include VerbMethods - extend VerbMethods end