lib/rflow/components/http/server.rb in rflow-components-http-1.0.0a2 vs lib/rflow/components/http/server.rb in rflow-components-http-1.0.0a3

- old
+ new

@@ -1,24 +1,22 @@ require 'eventmachine' require 'evma_httpserver' - require 'rflow' class RFlow module Components module HTTP - class Server < RFlow::Component input_port :response_port output_port :request_port attr_accessor :port, :listen, :server_signature, :connections def configure!(config) @listen = config['listen'] ? config['listen'] : '127.0.0.1' @port = config['port'] ? config['port'].to_i : 8000 - @connections = Hash.new + @connections = {} end def run! @server_signature = EM.start_server(@listen, @port, Connection) do |conn| conn.server = self @@ -56,81 +54,75 @@ RFlow.logger.debug { "#{self.class.name}: Connection from #{@client_ip}:#{@client_port} to #{@server_ip}:#{@server_port}" } super no_environment_strings end - def receive_data(data) RFlow.logger.debug { "#{self.class.name}: Received #{data.bytesize} bytes of data from #{client_ip}:#{client_port} to #{@server_ip}:#{@server_port}" } super end - def process_http_request RFlow.logger.debug { "#{self.class.name}: Received HTTP request from #{client_ip}:#{client_port} to #{@server_ip}:#{@server_port} for #{@http_request_uri}" } - processing_event = RFlow::Message::ProcessingEvent.new(server.uuid, Time.now.utc) + server.request_port.send_message(RFlow::Message.new('RFlow::Message::Data::HTTP::Request').tap do |m| + m.data.client_ip = @client_ip + m.data.client_port = @client_port + m.data.server_ip = @server_ip + m.data.server_port = @server_port - request_message = RFlow::Message.new('RFlow::Message::Data::HTTP::Request') + m.data.method = @http_request_method + m.data.uri = @http_request_uri + m.data.query_string = @http_query_string + m.data.protocol = @http_protocol + m.data.content = @http_post_content + m.data.headers = {} - request_message.data.client_ip = @client_ip - request_message.data.client_port = @client_port - request_message.data.server_ip = @server_ip - request_message.data.server_port = @server_port + @http_headers.split(/\0/).each do |header| + name, val = header.split(/:\s*/, 2) + m.data.headers[name] = val + end - request_message.data.method = @http_request_method - request_message.data.uri = @http_request_uri - request_message.data.query_string = @http_query_string - request_message.data.protocol = @http_protocol - request_message.data.content = @http_post_content - request_message.data.headers = {} - - @http_headers.split(/\0/).each do |header| - name, val = header.split(/:\s*/, 2) - request_message.data.headers[name] = val - end - - processing_event.context = signature.to_s - processing_event.completed_at = Time.now.utc - request_message.provenance << processing_event - - server.request_port.send_message request_message + m.provenance << RFlow::Message::ProcessingEvent.new(server.uuid, Time.now.utc).tap do |e| + e.context = signature.to_s + e.completed_at = Time.now.utc + end + end) + rescue Exception => e + RFlow.logger.error "Error processing HTTP request from #{client_ip}:#{client_port} to #{@server_ip}:#{@server_port} for #{@http_request_uri}: #{e.class.name}: #{e.message}, because: #{e.backtrace.inspect}" end + def send_http_response(response_message = nil) + resp = EventMachine::DelegatedHttpResponse.new(self).tap do |r| + # Default values + r.status = 200 + r.content = "" + r.headers["Content-Type"] = "text/html" + r.headers["Server"] = "Apache" - def send_http_response(response_message=nil) - resp = EventMachine::DelegatedHttpResponse.new(self) - - # Default values - resp.status = 200 - resp.content = "" - resp.headers["Content-Type"] = "text/html" - resp.headers["Server"] = "Apache" - - if response_message - resp.status = response_message.data.status_code - resp.content = response_message.data.content - response_message.data.headers.each do |header, value| - resp.headers[header] = value + if response_message + r.status = response_message.data.status_code + r.content = response_message.data.content + response_message.data.headers.each do |header, value| + r.headers[header] = value + end end end RFlow.logger.debug { "#{self.class.name}: Sending a HTTP response #{resp.status} to #{client_ip}:#{client_port}" } resp.send_response close_connection_after_writing end - # Called when a connection is torn down for whatever reason. # Remove this connection from the server's list - def unbind(reason=nil) + def unbind(reason = nil) RFlow.logger.debug { "#{self.class.name}: Disconnected from HTTP client #{client_ip}:#{client_port}#{reason.nil? ? '' : " due to '#{reason}'"}" } server.connections.delete(self.signature.to_s) super() end end end - end end end