lib/rflow/component.rb in rflow-1.0.0a2 vs lib/rflow/component.rb in rflow-1.0.0a3

- old
+ new

@@ -22,20 +22,11 @@ def define_port(collection, name) collection[name.to_s] = true # Create the port accessor method based on the port name define_method name.to_s.to_sym do - port = ports.by_name[name.to_s] - return port if port - - # If the port was not connected, return a port-like object - # that can respond/log but doesn't send any data. Note, - # it won't be available in the 'by_uuid' collection, as it - # doesn't have a configured uuid - RFlow.logger.debug "'#{self.name}##{name}' not connected, creating a disconnected port" - - DisconnectedPort.new(OpenStruct.new(:name => name, :uuid => 0)).tap {|d| ports << d } + ports.by_name[name.to_s] end end # Attempt to instantiate a component described by the config # specification. This assumes that the specification of a @@ -48,56 +39,90 @@ def build(config) raise NotImplementedError, "Non-managed components not yet implemented for component '#{config.name}' as '#{config.specification}' (#{config.uuid})" unless config.managed? RFlow.logger.debug "Instantiating component '#{config.name}' as '#{config.specification}' (#{config.uuid})" begin - component = RFlow.configuration.available_components[config.specification] + component_class = RFlow.configuration.available_components[config.specification] - if component + if component_class RFlow.logger.debug "Component found in configuration.available_components['#{config.specification}']" - component.new(config) else RFlow.logger.debug "Component not found in configuration.available_components, constantizing component '#{config.specification}'" - config.specification.constantize.new(config) + component_class = config.specification.constantize end + + component_class.new(uuid: config.uuid, name: config.name).tap do |component| + config.input_ports.each {|p| component.configure_input_port! p.name, uuid: p.uuid } + config.output_ports.each {|p| component.configure_output_port! p.name, uuid: p.uuid } + + config.input_ports.each do |p| + p.input_connections.each do |c| + component.send(p.name.to_sym).add_connection c.input_port_key, Connection.build(c) + end + end + + config.output_ports.each do |p| + p.output_connections.each do |c| + component.send(p.name.to_sym).add_connection c.output_port_key, Connection.build(c) + end + end + end rescue NameError => e - raise RuntimeError, "Could not instantiate component '#{config.name}' as '#{config.specification}' (#{config.uuid}): the class '#{config.specification}' was not found" + raise RuntimeError, "Could not instantiate component '#{config.name}' as '#{config.specification}' (#{config.uuid}): the class '#{config.specification}' could not be loaded (#{e.message})" rescue Exception => e - raise RuntimeError, "Could not instantiate component '#{config.name}' as '#{config.specification}' (#{config.uuid}): #{e.class} #{e.message}" + raise RuntimeError, "Could not instantiate component '#{config.name}' as '#{config.specification}' (#{config.uuid}): #{e.class} #{e.message}, because: #{e.backtrace.inspect}" end end end - attr_reader :uuid, :name, :ports + attr_accessor :uuid, :name + attr_reader :ports - def initialize(config) - @config = config - @uuid = config.uuid - @name = config.name + def initialize(args = {}) + @name = args[:name] + @uuid = args[:uuid] @ports = PortCollection.new - configure_ports! - configure_connections! + self.class.defined_input_ports.each {|name, _| ports << InputPort.new(self, name: name) } + self.class.defined_output_ports.each {|name, _| ports << OutputPort.new(self, name: name) } end # Returns a list of connected input ports. Each port will have # one or more keys associated with a particular connection. def input_ports; ports.by_type["RFlow::Component::InputPort"]; end # Returns a list of connected output ports. Each port will have # one or more keys associated with the particular connection. def output_ports; ports.by_type["RFlow::Component::OutputPort"]; end - # Returns a list of disconnected output ports. - def disconnected_ports; ports.by_type["RFlow::Component::DisconnectedPort"]; end + def configure_input_port!(port_name, options = {}) + RFlow.logger.debug "Configuring component '#{name}' (#{uuid}) input port '#{port_name}' (#{options[:uuid]})" + unless self.class.defined_input_ports.include? port_name + raise ArgumentError, "Input port '#{port_name}' not defined on component '#{self.class}'" + end + ports.by_name[port_name].uuid = options[:uuid] + end + def configure_output_port!(port_name, options = {}) + RFlow.logger.debug "Configuring component '#{name}' (#{uuid}) output port '#{port_name}' (#{options[:uuid]})" + unless self.class.defined_output_ports.include? port_name + raise ArgumentError, "Output port '#{port_name}' not defined on component '#{self.class}'" + end + ports.by_name[port_name].uuid = options[:uuid] + end + # Tell the component to establish its ports' connections, i.e. make # the connection. Uses the underlying connection object. Also # establishes the callbacks for each of the input ports - def connect! + def connect_inputs! input_ports.each {|port| port.recv_callback = method(:process_message) } input_ports.each(&:connect!) + end + + # Tell the component to establish its ports' connections, i.e. make + # the connection. Uses the underlying connection object. + def connect_outputs! output_ports.each(&:connect!) end def to_s string = "Component '#{name}' (#{uuid})\n" @@ -135,42 +160,7 @@ # Method called after all components have been shutdown! and just # before the global RFlow exit. Sublcasses should implement to # cleanup any leftover state, e.g. flush file handles, etc def cleanup!; end - - private - def configure_ports! - @config.input_ports.each do |p| - RFlow.logger.debug "Configuring component '#{name}' (#{uuid}) with input port '#{p.name}' (#{p.uuid})" - unless self.class.defined_input_ports.include? p.name - raise ArgumentError, "Input port '#{p.name}' not defined on component '#{self.class}'" - end - ports << InputPort.new(p) - end - - @config.output_ports.each do |p| - RFlow.logger.debug "Configuring component '#{name}' (#{uuid}) with output port '#{p.name}' (#{p.uuid})" - unless self.class.defined_output_ports.include? p.name - raise ArgumentError, "Output port '#{p.name}' not defined on component '#{self.class}'" - end - ports << OutputPort.new(p) - end - end - - def configure_connections! - @config.input_ports.each do |p| - p.input_connections.each do |c| - RFlow.logger.debug "Configuring input port '#{p.name}' (#{p.uuid}) key '#{c.input_port_key}' with #{c.type.to_s} connection '#{c.name}' (#{c.uuid})" - ports.by_uuid[p.uuid].add_connection c.input_port_key, Connection.build(c) - end - end - - @config.output_ports.each do |p| - p.output_connections.each do |c| - RFlow.logger.debug "Configuring output port '#{p.name}' (#{p.uuid}) key '#{c.output_port_key}' with #{c.type.to_s} connection '#{c.name}' (#{c.uuid})" - ports.by_uuid[p.uuid].add_connection c.output_port_key, Connection.build(c) - end - end - end end end