lib/rflow/component.rb in rflow-0.0.5 vs lib/rflow/component.rb in rflow-1.0.0a1

- old
+ new

@@ -1,16 +1,18 @@ +require 'ostruct' + require 'rflow/message' require 'rflow/component/port' class RFlow class Component # Keep track of available component subclasses def self.inherited(subclass) RFlow::Configuration.add_available_component(subclass) end - + # The Component class methods used in the creation of a component class << self def defined_input_ports @defined_input_ports ||= Hash.new end @@ -21,11 +23,11 @@ # TODO: Update the class vs instance stuffs here to be correct # Port defintions only have names # TODO: consider class-based UUIDs to identify component types - + # Define an input port with a given name def input_port(port_name) define_port(defined_input_ports, port_name) end @@ -39,93 +41,146 @@ def define_port(collection, port_name) collection[port_name.to_s] = true # Create the port accessor method based on the port name define_method port_name.to_s.to_sym do - port = ports.by_name[port_name.to_s] + port = ports.by_name[port_name.to_s] return port if port - + # If the port was not connected, return a port-like object # that can respond/log but doesn't send any data. Note, # it won't be available in the 'by_uuid' collection, as it - # doesn't have a configured instance_uuid + # doesn't have a configured uuid RFlow.logger.debug "'#{self.name}##{port_name}' not connected, creating a disconnected port" - disconnected_port = DisconnectedPort.new(port_name, 0) + + disconnected_port = DisconnectedPort.new(OpenStruct.new(:name => port_name, :uuid => 0)) ports << disconnected_port disconnected_port end end + + + # Attempt to instantiate a component described by the config + # specification. This assumes that the specification of a + # component is a fully qualified Ruby class that has already + # been loaded. It will first attempt to find subclasses of + # RFlow::Component (in the available_components hash) and then + # attempt to constantize the specification into a different + # class. Future releases will support external (i.e. non-managed + # components), but the current stuff only supports Ruby classes + def build(config) + if config.managed? + RFlow.logger.debug "Instantiating component '#{config.name}' as '#{config.specification}' (#{config.uuid})" + begin + RFlow.logger.debug RFlow.configuration.available_components.inspect + instantiated_component = if RFlow.configuration.available_components.include? config.specification + RFlow.logger.debug "Component found in configuration.available_components['#{config.specification}']" + RFlow.configuration.available_components[config.specification].new(config) + else + RFlow.logger.debug "Component not found in configuration.available_components, constantizing component '#{config.specification}'" + config.specification.constantize.new(config) + end + rescue NameError => e + error_message = "Could not instantiate component '#{config.name}' as '#{config.specification}' (#{config.uuid}): the class '#{config.specification}' was not found" + RFlow.logger.error error_message + raise RuntimeError, error_message + rescue Exception => e + error_message = "Could not instantiate component '#{config.name}' as '#{config.specification}' (#{config.uuid}): #{e.class} #{e.message}" + RFlow.logger.error error_message + raise RuntimeError, error_message + end + else + error_message = "Non-managed components not yet implemented for component '#{config.name}' as '#{config.specification}' (#{config.uuid})" + RFlow.logger.error error_message + raise NotImplementedError, error_message + end + + instantiated_component + end end - attr_reader :instance_uuid - attr_reader :name - attr_reader :configuration - attr_reader :ports - - def initialize(uuid, name=nil, configuration=nil) - @instance_uuid = uuid - @name = name + attr_reader :config, :uuid, :name, :ports + + def initialize(config) + @config = config + @uuid = config.uuid + @name = config.name @ports = PortCollection.new - @configuration = configuration + + configure_ports! + configure_connections! + configure!(config.options) end - + # Returns a list of connected input ports. Each port will have # one or more keys associated with a particular connection. def input_ports ports.by_type["RFlow::Component::InputPort"] end - + # Returns a list of connected output ports. Each port will have # one or more keys associated with the particular connection. def output_ports ports.by_type["RFlow::Component::OutputPort"] end - + # Returns a list of disconnected output ports. def disconnected_ports ports.by_type["RFlow::Component::DisconnectedPort"] end - - # TODO: DRY up the following two methods by factoring out into a meta-method - - def configure_input_port!(port_name, port_instance_uuid, port_options={}) - unless self.class.defined_input_ports.include? port_name - raise ArgumentError, "Input port '#{port_name}' not defined on component '#{self.class}'" + + def configure_ports! + # Send the port configuration to each component + config.input_ports.each do |input_port_config| + RFlow.logger.debug "Configuring component '#{name}' (#{uuid}) with input port '#{input_port_config.name}' (#{input_port_config.uuid})" + configure_input_port!(input_port_config) end - ports << InputPort.new(port_name, port_instance_uuid, port_options) + + config.output_ports.each do |output_port_config| + RFlow.logger.debug "Configuring component '#{name}' (#{uuid}) with output port '#{output_port_config.name}' (#{output_port_config.uuid})" + configure_output_port!(output_port_config) + end end - - def configure_output_port!(port_name, port_instance_uuid, port_options={}) - unless self.class.defined_output_ports.include? port_name - raise ArgumentError, "Output port '#{port_name}' not defined on component '#{self.class}'" + + def configure_input_port!(port_config) + unless self.class.defined_input_ports.include? port_config.name + raise ArgumentError, "Input port '#{port_config.name}' not defined on component '#{self.class}'" end - ports << OutputPort.new(port_name, port_instance_uuid, port_options) + ports << InputPort.new(port_config) end - # Only supports Ruby types. - # TODO: figure out how to dynamically load the built-in - # connections, or require them at the top of the file and not rely - # on rflow.rb requiring 'rflow/connections' - def configure_connection!(port_instance_uuid, port_key, connection_type, connection_uuid, connection_name=nil, connection_options={}) - case connection_type - when 'RFlow::Configuration::ZMQConnection' - connection = RFlow::Connections::ZMQConnection.new(connection_uuid, connection_name, connection_options) - else - raise ArgumentError, "Only ZMQConnections currently supported" + def configure_output_port!(port_config) + unless self.class.defined_output_ports.include? port_config.name + raise ArgumentError, "Output port '#{port_config.name}' not defined on component '#{self.class}'" end + ports << OutputPort.new(port_config) + end - ports.by_uuid[port_instance_uuid.to_s].add_connection(port_key, connection) - connection + + def configure_connections! + config.input_ports.each do |input_port_config| + input_port_config.input_connections.each do |input_connection_config| + RFlow.logger.debug "Configuring input port '#{input_port_config.name}' (#{input_port_config.uuid}) key '#{input_connection_config.input_port_key}' with #{input_connection_config.type.to_s} connection '#{input_connection_config.name}' (#{input_connection_config.uuid})" + ports.by_uuid[input_port_config.uuid].add_connection(input_connection_config.input_port_key, Connection.build(input_connection_config)) + end + end + + config.output_ports.each do |output_port_config| + output_port_config.output_connections.each do |output_connection_config| + RFlow.logger.debug "Configuring output port '#{output_port_config.name}' (#{output_port_config.uuid}) key '#{output_connection_config.output_port_key}' with #{output_connection_config.type.to_s} connection '#{output_connection_config.name}' (#{output_connection_config.uuid})" + ports.by_uuid[output_port_config.uuid].add_connection(output_connection_config.output_port_key, Connection.build(output_connection_config)) + end + end end - + # Tell the component to establish it's ports' connections, i.e. make # the connection. Uses the underlying connection object. Also # establishes the callbacks for each of the input ports def connect! input_ports.each do |input_port| @@ -139,38 +194,38 @@ process_message(input_port, input_port_key, connection, message) end end end end - + output_ports.each do |output_port| output_port.connect! end end - + def to_s - string = "Component '#{name}' (#{instance_uuid})\n" + string = "Component '#{name}' (#{uuid})\n" ports.each do |port| port.keys.each do |port_key| port[port_key].each do |connection| - string << "\t#{port.class.to_s} '#{port.name}' (#{port.instance_uuid}) key '#{port_key}' connection '#{connection.name}' (#{connection.instance_uuid})\n" + string << "\t#{port.class.to_s} '#{port.name}' (#{port.uuid}) key '#{port_key}' connection '#{connection.name}' (#{connection.uuid})\n" end end end string end - + # Method that should be overridden by a subclass to provide for # component-specific configuration. The subclass should use the # self.configuration attribute (@configuration) to store its # particular configuration. The incoming deserialized_configuration # parameter is from the RFlow configuration database and is (most # likely) a hash. Don't assume that the keys are symbols def configure!(deserialized_configuration); end - + # Main component running method. Subclasses should implement if # they want to set up any EventMachine stuffs (servers, clients, # etc) def run!; end @@ -185,8 +240,8 @@ # Method called after all components have been shutdown! and just # before the global RFlow exit. Sublcasses should implement to # cleanup any leftover state, e.g. flush file handles, etc def cleanup!; end - + end # class Component end # class RFlow