require 'eventmachine' require 'evma_httpserver' require 'rflow' class RFlow module Components module HTTP class Server < RFlow::Component input_port :response_port output_port :request_port attr_accessor :port, :listen, :server_signature, :connections def configure!(config) @listen = config['listen'] ? config['listen'] : '127.0.0.1' @port = config['port'] ? config['port'].to_i : 8000 @connections = {} end def run! @server_signature = EM.start_server(@listen, @port, Connection) do |conn| conn.server = self self.connections[conn.signature.to_s] = conn RFlow.logger.debug { "#{name}: Connection from #{conn.client_ip}:#{conn.client_port} to #{conn.server_ip}:#{conn.server_port}" } end end # Getting all messages to response_port, which we need to filter for # those that pertain to this component and have active connections. # This is done by inspecting the provenance, specifically the # context attribute that we stored originally def process_message(input_port, input_port_key, connection, message) return unless message.data_type_name == 'RFlow::Message::Data::HTTP::Response' my_events = message.provenance.find_all {|processing_event| processing_event.component_instance_uuid == uuid} my_events.each do |processing_event| connection_signature_string = processing_event.context.to_s if connections[connection_signature_string] connections[connection_signature_string].send_http_response message end end end class Connection < EventMachine::Connection include EventMachine::HttpServer attr_accessor :server attr_reader :client_ip, :client_port, :server_ip, :server_port def post_init @client_port, @client_ip = Socket.unpack_sockaddr_in(get_peername) rescue ["?", "?.?.?.?"] @server_port, @server_ip = Socket.unpack_sockaddr_in(get_sockname) rescue ["?", "?.?.?.?"] super no_environment_strings end def receive_data(data) RFlow.logger.debug { "#{server.name}: Received #{data.bytesize} bytes of data from #{client_ip}:#{client_port} to #{@server_ip}:#{@server_port}" } super end def process_http_request RFlow.logger.debug { "#{server.name}: Received HTTP request from #{client_ip}:#{client_port} to #{@server_ip}:#{@server_port} for #{@http_request_uri}" } server.request_port.send_message(RFlow::Message.new('RFlow::Message::Data::HTTP::Request').tap do |m| m.data.client_ip = @client_ip m.data.client_port = @client_port m.data.server_ip = @server_ip m.data.server_port = @server_port m.data.method = @http_request_method m.data.uri = @http_request_uri m.data.query_string = @http_query_string m.data.protocol = @http_protocol m.data.content = @http_post_content m.data.headers = {} @http_headers.split(/\0/).each do |header| name, val = header.split(/:\s*/, 2) m.data.headers[name] = val end m.provenance << RFlow::Message::ProcessingEvent.new(server.uuid, Time.now.utc).tap do |e| e.context = signature.to_s e.completed_at = Time.now.utc end end) rescue Exception => e RFlow.logger.error "#{server.name}: Error processing HTTP request from #{client_ip}:#{client_port} to #{@server_ip}:#{@server_port} for #{@http_request_uri}: #{e.class.name}: #{e.message}, because: #{e.backtrace.inspect}" end def send_http_response(response_message = nil) resp = EventMachine::DelegatedHttpResponse.new(self).tap do |r| # Default values r.status = 200 r.content = "" r.headers["Content-Type"] = "text/html" r.headers["Server"] = "Apache" if response_message r.status = response_message.data.status_code r.content = response_message.data.content response_message.data.headers.each do |header, value| r.headers[header] = value end end end RFlow.logger.debug { "#{server.name}: Sending an HTTP response #{resp.status} to #{client_ip}:#{client_port}" } resp.send_response close_connection_after_writing end # Called when a connection is torn down for whatever reason. # Remove this connection from the server's list def unbind(reason = nil) RFlow.logger.debug { "#{server.name}: Disconnected from HTTP client #{client_ip}:#{client_port}#{reason.nil? ? '' : " due to '#{reason}'"}" } server.connections.delete(self.signature.to_s) super() end end end end end end