lib/rflow/components/http/server.rb in rflow-components-http-1.0.0a2 vs lib/rflow/components/http/server.rb in rflow-components-http-1.0.0a3
- old
+ new
@@ -1,24 +1,22 @@
require 'eventmachine'
require 'evma_httpserver'
-
require 'rflow'
class RFlow
module Components
module HTTP
-
class Server < RFlow::Component
input_port :response_port
output_port :request_port
attr_accessor :port, :listen, :server_signature, :connections
def configure!(config)
@listen = config['listen'] ? config['listen'] : '127.0.0.1'
@port = config['port'] ? config['port'].to_i : 8000
- @connections = Hash.new
+ @connections = {}
end
def run!
@server_signature = EM.start_server(@listen, @port, Connection) do |conn|
conn.server = self
@@ -56,81 +54,75 @@
RFlow.logger.debug { "#{self.class.name}: Connection from #{@client_ip}:#{@client_port} to #{@server_ip}:#{@server_port}" }
super
no_environment_strings
end
-
def receive_data(data)
RFlow.logger.debug { "#{self.class.name}: Received #{data.bytesize} bytes of data from #{client_ip}:#{client_port} to #{@server_ip}:#{@server_port}" }
super
end
-
def process_http_request
RFlow.logger.debug { "#{self.class.name}: Received HTTP request from #{client_ip}:#{client_port} to #{@server_ip}:#{@server_port} for #{@http_request_uri}" }
- processing_event = RFlow::Message::ProcessingEvent.new(server.uuid, Time.now.utc)
+ server.request_port.send_message(RFlow::Message.new('RFlow::Message::Data::HTTP::Request').tap do |m|
+ m.data.client_ip = @client_ip
+ m.data.client_port = @client_port
+ m.data.server_ip = @server_ip
+ m.data.server_port = @server_port
- request_message = RFlow::Message.new('RFlow::Message::Data::HTTP::Request')
+ m.data.method = @http_request_method
+ m.data.uri = @http_request_uri
+ m.data.query_string = @http_query_string
+ m.data.protocol = @http_protocol
+ m.data.content = @http_post_content
+ m.data.headers = {}
- request_message.data.client_ip = @client_ip
- request_message.data.client_port = @client_port
- request_message.data.server_ip = @server_ip
- request_message.data.server_port = @server_port
+ @http_headers.split(/\0/).each do |header|
+ name, val = header.split(/:\s*/, 2)
+ m.data.headers[name] = val
+ end
- request_message.data.method = @http_request_method
- request_message.data.uri = @http_request_uri
- request_message.data.query_string = @http_query_string
- request_message.data.protocol = @http_protocol
- request_message.data.content = @http_post_content
- request_message.data.headers = {}
-
- @http_headers.split(/\0/).each do |header|
- name, val = header.split(/:\s*/, 2)
- request_message.data.headers[name] = val
- end
-
- processing_event.context = signature.to_s
- processing_event.completed_at = Time.now.utc
- request_message.provenance << processing_event
-
- server.request_port.send_message request_message
+ m.provenance << RFlow::Message::ProcessingEvent.new(server.uuid, Time.now.utc).tap do |e|
+ e.context = signature.to_s
+ e.completed_at = Time.now.utc
+ end
+ end)
+ rescue Exception => e
+ RFlow.logger.error "Error processing HTTP request from #{client_ip}:#{client_port} to #{@server_ip}:#{@server_port} for #{@http_request_uri}: #{e.class.name}: #{e.message}, because: #{e.backtrace.inspect}"
end
+ def send_http_response(response_message = nil)
+ resp = EventMachine::DelegatedHttpResponse.new(self).tap do |r|
+ # Default values
+ r.status = 200
+ r.content = ""
+ r.headers["Content-Type"] = "text/html"
+ r.headers["Server"] = "Apache"
- def send_http_response(response_message=nil)
- resp = EventMachine::DelegatedHttpResponse.new(self)
-
- # Default values
- resp.status = 200
- resp.content = ""
- resp.headers["Content-Type"] = "text/html"
- resp.headers["Server"] = "Apache"
-
- if response_message
- resp.status = response_message.data.status_code
- resp.content = response_message.data.content
- response_message.data.headers.each do |header, value|
- resp.headers[header] = value
+ if response_message
+ r.status = response_message.data.status_code
+ r.content = response_message.data.content
+ response_message.data.headers.each do |header, value|
+ r.headers[header] = value
+ end
end
end
RFlow.logger.debug { "#{self.class.name}: Sending a HTTP response #{resp.status} to #{client_ip}:#{client_port}" }
resp.send_response
close_connection_after_writing
end
-
# Called when a connection is torn down for whatever reason.
# Remove this connection from the server's list
- def unbind(reason=nil)
+ def unbind(reason = nil)
RFlow.logger.debug { "#{self.class.name}: Disconnected from HTTP client #{client_ip}:#{client_port}#{reason.nil? ? '' : " due to '#{reason}'"}" }
server.connections.delete(self.signature.to_s)
super()
end
end
end
-
end
end
end