lib/hatetepe/client.rb in hatetepe-0.3.1 vs lib/hatetepe/client.rb in hatetepe-0.4.0

- old
+ new

@@ -1,111 +1,186 @@ require "em-synchrony" require "eventmachine" +require "rack" require "uri" -require "hatetepe/body" require "hatetepe/builder" +require "hatetepe/connection" +require "hatetepe/deferred_status_fix" require "hatetepe/parser" require "hatetepe/request" -require "hatetepe/response" require "hatetepe/version" module Hatetepe - class Client < EM::Connection - def self.start(config) - EM.connect config[:host], config[:port], self, config - end + class Client < Hatetepe::Connection; end +end + +require "hatetepe/client/keep_alive" +require "hatetepe/client/pipeline" + +class Hatetepe::Client + attr_reader :app, :config + attr_reader :parser, :builder + attr_reader :requests, :pending_transmission, :pending_response + + def initialize(config) + @config = config + @parser, @builder = Hatetepe::Parser.new, Hatetepe::Builder.new - def self.request(verb, uri, headers = {}, body = nil) - uri = URI.parse(uri) - client = start(:host => uri.host, :port => uri.port) - - headers["User-Agent"] ||= "hatetepe/#{VERSION}" - - Request.new(verb, uri.request_uri).tap do |req| - req.headers = headers - req.body = body || Body.new.tap {|b| b.close_write } - client << req - EM::Synchrony.sync req - end.response + @requests = [] + @pending_transmission, @pending_response = {}, {} + + @app = Rack::Builder.new.tap do |b| + b.use KeepAlive + b.use Pipeline + b.run method(:send_request) + end.to_app + + super + end + + def post_init + parser.on_response << method(:receive_response) + # XXX check if the connection is still present + builder.on_write << method(:send_data) + #builder.on_write {|data| p "client >> #{data}" } + + self.processing_enabled = true + end + + def receive_data(data) + #p "client << #{data}" + parser << data + rescue => e + close_connection + raise e + end + + def send_request(request) + id = request.object_id + + request.headers.delete "X-Hatetepe-Single" + builder.request request.to_a + pending_transmission[id].succeed + + pending_response[id] = EM::DefaultDeferrable.new + EM::Synchrony.sync pending_response[id] + ensure + pending_response.delete id + end + + def receive_response(response) + requests.find {|req| !req.response }.tap do |req| + req.response = response + pending_response[req.object_id].succeed response end + end + + def <<(request) + request.connection = self + unless processing_enabled? + request.fail + return + end - class << self - [:get, :head].each do |verb| - define_method verb do |uri, headers = {}| - request verb.to_s.upcase, uri, headers + requests << request + + Fiber.new do + begin + pending_transmission[request.object_id] = EM::DefaultDeferrable.new + + app.call(request).tap do |response| + request.response = response + # XXX check for response.nil? + status = (response && response.success?) ? :succeed : :fail + requests.delete(request).send status, response end + ensure + pending_transmission.delete request.object_id end - [:options, :post, :put, :delete, :trace, :connect].each do |verb| - define_method verb do |uri, headers = {}, body = nil| - request verb.to_s.upcase, uri, headers, body - end - end + end.resume + end + + def request(verb, uri, headers = {}, body = nil, http_version = "1.1") + headers["Host"] ||= "#{config[:host]}:#{config[:port]}" + headers["User-Agent"] ||= "hatetepe/#{Hatetepe::VERSION}" + + body = wrap_body(body) + if headers["Content-Type"] == "application/x-www-form-urlencoded" + enum = Enumerator.new(body) + headers["Content-Length"] = enum.inject(0) {|a, e| a + e.length } end - attr_reader :config - attr_reader :requests, :parser, :builder + request = Hatetepe::Request.new(verb, uri, headers, body, http_version) + self << request + self.processing_enabled = false + EM::Synchrony.sync request - def initialize(config) - @config = config - @requests = [] - @parser, @builder = Parser.new, Builder.new - super - end + request.response.body.close_write if request.verb == "HEAD" - def post_init - parser.on_response do |response| - requests.find {|req| !req.response }.response = response - end - - parser.on_headers do - requests.reverse.find {|req| !!req.response }.tap do |req| - req.response.body.source = self - req.succeed req.response - end - end - - #builder.on_write {|chunk| - # ap "-> #{chunk}" - #} - builder.on_write << method(:send_data) + request.response + end + + def stop + unless requests.empty? + last_response = EM::Synchrony.sync(requests.last) + EM::Synchrony.sync last_response.body if last_response.body end + close_connection + end + + def unbind + super - def <<(request) - request.headers["Host"] = "#{config[:host]}:#{config[:port]}" - - requests << request - Fiber.new do - builder.request_line request.verb, request.uri - - if request.headers["Content-Type"] == "application/x-www-form-urlencoded" - if request.body.respond_to? :read - request.headers["Content-Length"] = request.body.read.bytesize - else - request.headers["Content-Length"] = request.body.length - end + EM.next_tick do + requests.each do |req| + # fail state triggers + req.object_id.tap do |id| + pending_transmission[id].fail if pending_transmission[id] + pending_response[id].fail if pending_response[id] end - builder.headers request.headers - - b = request.body - if Body === b || b.respond_to?(:each) - builder.body b - elsif b.respond_to? :read - builder.body [b.read] - else - builder.body [b] + # fail reponse body if the response has already been started + if req.response + req.response.body.tap {|b| b.close_write unless b.closed_write? } end - - builder.complete - end.resume + # XXX FiberError: dead fiber called because req already succeeded + # or failed, see github.com/eventmachine/eventmachine/issues/287 + req.fail req.response + end end - - def receive_data(data) - #ap "<- #{data}" - parser << data + end + + def wrap_body(body) + if body.respond_to? :each + body + elsif body.respond_to? :read + [body.read] + elsif body + [body] + else + [] end + end + + class << self + def start(config) + EM.connect config[:host], config[:port], self, config + end - def stop - close_connection + def request(verb, uri, headers = {}, body = nil) + uri = URI(uri) + client = start(:host => uri.host, :port => uri.port) + + headers["X-Hatetepe-Single"] = true + client.request(verb, uri.request_uri, headers, body).tap do |*| + client.stop + end + end + end + + [self, self.singleton_class].each do |cls| + [:get, :head, :post, :put, :delete, + :options, :trace, :connect].each do |verb| + cls.send(:define_method, verb) {|uri, *args| request verb, uri, *args } end end end