require 'eventmachine' require 'evma_httpserver' require 'rflow' class RFlow module Components module HTTP # Implements a HTTP server based on +eventmachine_httpserver+. Accepts incoming HTTP # connections, marshals the HTTP request into an RFlow message, annotates the message # with a bit of provenance (see below) and then sends the message out its {request_port}. # When a HTTP response message is received on {response_port}, checks the provenance to see # if it matches an underlying TCP/HTTP connection and, if so, creates an actual HTTP # response from the incoming message and sends it to the client. # # The HTTP request message sent from the HTTP server component utilizes the # +RFlow::Message+ provenance feature to annotate a request message with a bit of metadata # that allows subsequent response messages to be matched to their underlying TCP/HTTP # connections. This means that any component that processes HTTP request messages to # generate response messages must copy the provenance from the request message to the # response message. class Server < RFlow::Component # @!attribute [r] response_port # Expects +RFlow::Message+s of type {RFlow::Message::Data::HTTP::Response} # representing responses being sent back corresponding to previously received # requests. Each response should have +provenance+ copied from the request it's # responding to. # @return [RFlow::Component::InputPort] input_port :response_port # @!attribute [r] request_port # Produces +RFlow::Message+s of type {RFlow::Message::Data::HTTP::Request} # representing requests that have arrived on the socket managed by the component. # Each features +provenance+ that must be copied to any response message. # @return [RFlow::Component::OutputPort] output_port :request_port # @!visibility private attr_accessor :port, :listen, :server_signature, :connections, :closed_connections, :proxy_real_client_ip_header, :proxy_real_client_port_header, :proxy_real_server_ip_header, :proxy_real_server_port_header # RFlow-called method at startup. # @return [void] def configure!(config) @listen = config['listen'] ? config['listen'] : '127.0.0.1' @port = config['port'] ? config['port'].to_i : 8000 @proxy_real_client_ip_header = config.has_key?('proxy-real-client-ip-header') ? config['proxy-real-client-ip-header'] : 'X-Real-IP' @proxy_real_client_port_header = config.has_key?('proxy-real-client-port-header') ? config['proxy-real-client-port-header'] : 'X-Real-Port' @proxy_real_server_ip_header = config.has_key?('proxy-real-server-ip-header') ? config['proxy-real-server-ip-header'] : 'X-Server-IP' @proxy_real_server_port_header = config.has_key?('proxy-real-server-port-header') ? config['proxy-real-server-port-header'] : 'X-Server-Port' @connections = {} @closed_connections = ActiveSupport::Cache::MemoryStore.new(expires_in: 5.minutes) end # RFlow-called method at startup. # @return [void] def run! @server_signature = EM.start_server(@listen, @port, Connection) do |conn| conn.server = self self.connections[conn.signature.to_s] = conn RFlow.logger.debug { "#{name}: Connection from #{conn.client_details} to #{conn.server_details}" } end end # RFlow-called method upon message arrival. # # Filters for messages that pertain to this component and have active connections # by inspecting the provenance, specifically the context attribute that we stored # originally. # # @return [void] def process_message(input_port, input_port_key, connection, message) return unless message.data_type_name == 'RFlow::Message::Data::HTTP::Response' my_events = message.provenance.find_all {|processing_event| processing_event.component_instance_uuid == uuid} my_events.each do |processing_event| connection_signature_string = processing_event.context.to_s if connections[connection_signature_string] connections[connection_signature_string].send_http_response message else conn = closed_connections.read(connection_signature_string) if conn RFlow.logger.info "#{name}: Could not send HTTP response to #{conn.client_details}: connection is already closed" else RFlow.logger.info "#{name}: Could not send HTTP response to : connection is already closed" end end end end # @!visibility private class ClosedConnection attr_accessor :client_details # @!visibility private def initialize(client_details) @client_details = client_details end end # @!visibility private class Connection < EventMachine::Connection include EventMachine::HttpServer attr_accessor :server attr_reader :client_ip, :client_port, :server_ip, :server_port # @!visibility private def post_init @client_port, @client_ip = Socket.unpack_sockaddr_in(get_peername) rescue ["?", "?.?.?.?"] @server_port, @server_ip = Socket.unpack_sockaddr_in(get_sockname) rescue ["?", "?.?.?.?"] super no_environment_strings end # @!visibility private def client_details if @real_client_ip "#{client_ip}:#{client_port} (proxied from #{@real_client_ip}:#{@real_client_port})" else "#{client_ip}:#{client_port}" end end # @!visibility private def server_details if @real_server_ip "#{server_ip}:#{server_port} (proxied as #{@real_server_ip}:#{@real_server_port})" else "#{server_ip}:#{server_port}" end end # @!visibility private def receive_data(data) RFlow.logger.debug { "#{server.name}: Received #{data.bytesize} bytes of data from #{client_details} to #{server_details}" } super end # @!visibility private def process_http_request RFlow.logger.debug { "#{server.name}: Received HTTP request from #{client_details} to #{server_details} for #{@http_request_uri}" } server.request_port.send_message(RFlow::Message.new('RFlow::Message::Data::HTTP::Request').tap do |m| m.data.client_ip = client_ip m.data.client_port = client_port m.data.server_ip = server_ip m.data.server_port = server_port m.data.method = @http_request_method m.data.uri = @http_request_uri m.data.query_string = @http_query_string m.data.protocol = @http_protocol m.data.content = @http_post_content m.data.headers = {} @http_headers.split(/\0/).each do |header| name, val = header.split(/:\s*/, 2) m.data.headers[name] = val if server.proxy_real_client_ip_header && (name == server.proxy_real_client_ip_header) @real_client_ip ||= val elsif server.proxy_real_client_port_header && (name == server.proxy_real_client_port_header) @real_client_port ||= val elsif server.proxy_real_server_ip_header && (name == server.proxy_real_server_ip_header) @real_server_ip ||= val elsif server.proxy_real_server_port_header && (name == server.proxy_real_server_port_header) @real_server_port ||= val end end m.provenance << RFlow::Message::ProcessingEvent.new(server.uuid, Time.now.utc).tap do |e| e.context = signature.to_s e.completed_at = Time.now.utc end end) rescue Exception => e RFlow.logger.error "#{server.name}: Error processing HTTP request from #{client_details} to #{server_details} for #{@http_request_uri}: #{e.class.name}: #{e.message}, because: #{e.backtrace.inspect}" end # @!visibility private def send_http_response(response_message = nil) resp = EventMachine::DelegatedHttpResponse.new(self).tap do |r| # Default values r.status = 200 r.content = "" r.headers["Content-Type"] = "text/html" r.headers["Server"] = "Apache" if response_message r.status = response_message.data.status_code r.content = response_message.data.content response_message.data.headers.each do |header, value| r.headers[header] = value end end end RFlow.logger.debug { "#{server.name}: Sending an HTTP response #{resp.status} to #{client_details}" } resp.send_response close_connection_after_writing end # Called when a connection is torn down for whatever reason. # Remove this connection from the server's list # @!visibility private def unbind(reason = nil) RFlow.logger.debug { "#{server.name}: Disconnected from HTTP client #{client_details}#{reason.nil? ? '' : " due to '#{reason}'"}" } server.closed_connections.write(signature.to_s, ClosedConnection.new(client_details)) server.connections.delete(signature.to_s) super() end end end end end end