lib/rflow/components/http/server.rb in rflow-components-http-0.0.3 vs lib/rflow/components/http/server.rb in rflow-components-http-0.0.6

- old
+ new

@@ -29,24 +29,19 @@ # Getting all messages to response_port, which we need to filter for # those that pertain to this component and have active connections. # This is done by inspecting the provenance, specifically the # context attribute that we stored originally def process_message(input_port, input_port_key, connection, message) - RFlow.logger.debug "Received a message" + RFlow.logger.debug { "#{self.class.name}: Received a #{message.data_type_name}" } return unless message.data_type_name == 'RFlow::Message::Data::HTTP::Response' - - - RFlow.logger.debug "Received a HTTP::Response message, determining if its mine" my_events = message.provenance.find_all {|processing_event| processing_event.component_instance_uuid == instance_uuid} - RFlow.logger.debug "Found #{my_events.size} processing events from me" - # Attempt to send the data to each context match + my_events.each do |processing_event| - RFlow.logger.debug "Inspecting #{processing_event.context}" - connection_signature = processing_event.context - if connections[connection_signature] - RFlow.logger.debug "Found connection for #{processing_event.context}" - connections[connection_signature].send_http_response message + connection_signature_string = processing_event.context.to_s + if connections[connection_signature_string] + RFlow.logger.debug { "#{self.class.name}: Found connection for #{connection_signature_string}" } + connections[connection_signature_string].send_http_response message end end end class Connection < EventMachine::Connection @@ -56,24 +51,24 @@ attr_reader :client_ip, :client_port, :server_ip, :server_port def post_init @client_port, @client_ip = Socket.unpack_sockaddr_in(get_peername) rescue ["?", "?.?.?.?"] @server_port, @server_ip = Socket.unpack_sockaddr_in(get_sockname) rescue ["?", "?.?.?.?"] - RFlow.logger.debug "Connection from #{@client_ip}:#{@client_port} to #{@server_ip}:#{@server_port}" + RFlow.logger.debug { "#{self.class.name}: Connection from #{@client_ip}:#{@client_port} to #{@server_ip}:#{@server_port}" } super no_environment_strings end def receive_data(data) - RFlow.logger.debug "Received #{data.bytesize} bytes of data from #{client_ip}:#{client_port} to #{@server_ip}:#{@server_port}" + RFlow.logger.debug { "#{self.class.name}: Received #{data.bytesize} bytes of data from #{client_ip}:#{client_port} to #{@server_ip}:#{@server_port}" } super end def process_http_request - RFlow.logger.debug "Received a HTTP request from #{client_ip}:#{client_port} to #{@server_ip}:#{@server_port}" + RFlow.logger.debug { "#{self.class.name}: Received HTTP request from #{client_ip}:#{client_port} to #{@server_ip}:#{@server_port} for #{@http_request_uri}" } processing_event = RFlow::Message::ProcessingEvent.new(server.instance_uuid, Time.now.utc) request_message = RFlow::Message.new('RFlow::Message::Data::HTTP::Request') @@ -92,20 +87,19 @@ @http_headers.split(/\0/).each do |header| name, val = header.split(/:\s*/, 2) request_message.data.headers[name] = val end - processing_event.context = signature + processing_event.context = signature.to_s processing_event.completed_at = Time.now.utc request_message.provenance << processing_event server.request_port.send_message request_message end def send_http_response(response_message=nil) - RFlow.logger.debug "Sending an HTTP response to #{client_ip}:#{client_port}" resp = EventMachine::DelegatedHttpResponse.new(self) # Default values resp.status = 200 resp.content = "" @@ -114,23 +108,25 @@ if response_message resp.status = response_message.data.status_code resp.content = response_message.data.content response_message.data.headers.each do |header, value| - resp[headers] = value + resp.headers[header] = value end end + RFlow.logger.debug { "#{self.class.name}: Sending a HTTP response #{resp.status} to #{client_ip}:#{client_port}" } + resp.send_response close_connection_after_writing end # Called when a connection is torn down for whatever reason. # Remove this connection from the server's list def unbind(reason=nil) - RFlow.logger.debug "Disconnected from HTTP client #{client_ip}:#{client_port} due to '#{reason}'" - server.connections.delete(self.signature) + RFlow.logger.debug { "#{self.class.name}: Disconnected from HTTP client #{client_ip}:#{client_port}#{reason.nil? ? '' : " due to '#{reason}'"}" } + server.connections.delete(self.signature.to_s) end end end end