lib/rflow/component.rb in rflow-1.0.0a2 vs lib/rflow/component.rb in rflow-1.0.0a3
- old
+ new
@@ -22,20 +22,11 @@
def define_port(collection, name)
collection[name.to_s] = true
# Create the port accessor method based on the port name
define_method name.to_s.to_sym do
- port = ports.by_name[name.to_s]
- return port if port
-
- # If the port was not connected, return a port-like object
- # that can respond/log but doesn't send any data. Note,
- # it won't be available in the 'by_uuid' collection, as it
- # doesn't have a configured uuid
- RFlow.logger.debug "'#{self.name}##{name}' not connected, creating a disconnected port"
-
- DisconnectedPort.new(OpenStruct.new(:name => name, :uuid => 0)).tap {|d| ports << d }
+ ports.by_name[name.to_s]
end
end
# Attempt to instantiate a component described by the config
# specification. This assumes that the specification of a
@@ -48,56 +39,90 @@
def build(config)
raise NotImplementedError, "Non-managed components not yet implemented for component '#{config.name}' as '#{config.specification}' (#{config.uuid})" unless config.managed?
RFlow.logger.debug "Instantiating component '#{config.name}' as '#{config.specification}' (#{config.uuid})"
begin
- component = RFlow.configuration.available_components[config.specification]
+ component_class = RFlow.configuration.available_components[config.specification]
- if component
+ if component_class
RFlow.logger.debug "Component found in configuration.available_components['#{config.specification}']"
- component.new(config)
else
RFlow.logger.debug "Component not found in configuration.available_components, constantizing component '#{config.specification}'"
- config.specification.constantize.new(config)
+ component_class = config.specification.constantize
end
+
+ component_class.new(uuid: config.uuid, name: config.name).tap do |component|
+ config.input_ports.each {|p| component.configure_input_port! p.name, uuid: p.uuid }
+ config.output_ports.each {|p| component.configure_output_port! p.name, uuid: p.uuid }
+
+ config.input_ports.each do |p|
+ p.input_connections.each do |c|
+ component.send(p.name.to_sym).add_connection c.input_port_key, Connection.build(c)
+ end
+ end
+
+ config.output_ports.each do |p|
+ p.output_connections.each do |c|
+ component.send(p.name.to_sym).add_connection c.output_port_key, Connection.build(c)
+ end
+ end
+ end
rescue NameError => e
- raise RuntimeError, "Could not instantiate component '#{config.name}' as '#{config.specification}' (#{config.uuid}): the class '#{config.specification}' was not found"
+ raise RuntimeError, "Could not instantiate component '#{config.name}' as '#{config.specification}' (#{config.uuid}): the class '#{config.specification}' could not be loaded (#{e.message})"
rescue Exception => e
- raise RuntimeError, "Could not instantiate component '#{config.name}' as '#{config.specification}' (#{config.uuid}): #{e.class} #{e.message}"
+ raise RuntimeError, "Could not instantiate component '#{config.name}' as '#{config.specification}' (#{config.uuid}): #{e.class} #{e.message}, because: #{e.backtrace.inspect}"
end
end
end
- attr_reader :uuid, :name, :ports
+ attr_accessor :uuid, :name
+ attr_reader :ports
- def initialize(config)
- @config = config
- @uuid = config.uuid
- @name = config.name
+ def initialize(args = {})
+ @name = args[:name]
+ @uuid = args[:uuid]
@ports = PortCollection.new
- configure_ports!
- configure_connections!
+ self.class.defined_input_ports.each {|name, _| ports << InputPort.new(self, name: name) }
+ self.class.defined_output_ports.each {|name, _| ports << OutputPort.new(self, name: name) }
end
# Returns a list of connected input ports. Each port will have
# one or more keys associated with a particular connection.
def input_ports; ports.by_type["RFlow::Component::InputPort"]; end
# Returns a list of connected output ports. Each port will have
# one or more keys associated with the particular connection.
def output_ports; ports.by_type["RFlow::Component::OutputPort"]; end
- # Returns a list of disconnected output ports.
- def disconnected_ports; ports.by_type["RFlow::Component::DisconnectedPort"]; end
+ def configure_input_port!(port_name, options = {})
+ RFlow.logger.debug "Configuring component '#{name}' (#{uuid}) input port '#{port_name}' (#{options[:uuid]})"
+ unless self.class.defined_input_ports.include? port_name
+ raise ArgumentError, "Input port '#{port_name}' not defined on component '#{self.class}'"
+ end
+ ports.by_name[port_name].uuid = options[:uuid]
+ end
+ def configure_output_port!(port_name, options = {})
+ RFlow.logger.debug "Configuring component '#{name}' (#{uuid}) output port '#{port_name}' (#{options[:uuid]})"
+ unless self.class.defined_output_ports.include? port_name
+ raise ArgumentError, "Output port '#{port_name}' not defined on component '#{self.class}'"
+ end
+ ports.by_name[port_name].uuid = options[:uuid]
+ end
+
# Tell the component to establish its ports' connections, i.e. make
# the connection. Uses the underlying connection object. Also
# establishes the callbacks for each of the input ports
- def connect!
+ def connect_inputs!
input_ports.each {|port| port.recv_callback = method(:process_message) }
input_ports.each(&:connect!)
+ end
+
+ # Tell the component to establish its ports' connections, i.e. make
+ # the connection. Uses the underlying connection object.
+ def connect_outputs!
output_ports.each(&:connect!)
end
def to_s
string = "Component '#{name}' (#{uuid})\n"
@@ -135,42 +160,7 @@
# Method called after all components have been shutdown! and just
# before the global RFlow exit. Sublcasses should implement to
# cleanup any leftover state, e.g. flush file handles, etc
def cleanup!; end
-
- private
- def configure_ports!
- @config.input_ports.each do |p|
- RFlow.logger.debug "Configuring component '#{name}' (#{uuid}) with input port '#{p.name}' (#{p.uuid})"
- unless self.class.defined_input_ports.include? p.name
- raise ArgumentError, "Input port '#{p.name}' not defined on component '#{self.class}'"
- end
- ports << InputPort.new(p)
- end
-
- @config.output_ports.each do |p|
- RFlow.logger.debug "Configuring component '#{name}' (#{uuid}) with output port '#{p.name}' (#{p.uuid})"
- unless self.class.defined_output_ports.include? p.name
- raise ArgumentError, "Output port '#{p.name}' not defined on component '#{self.class}'"
- end
- ports << OutputPort.new(p)
- end
- end
-
- def configure_connections!
- @config.input_ports.each do |p|
- p.input_connections.each do |c|
- RFlow.logger.debug "Configuring input port '#{p.name}' (#{p.uuid}) key '#{c.input_port_key}' with #{c.type.to_s} connection '#{c.name}' (#{c.uuid})"
- ports.by_uuid[p.uuid].add_connection c.input_port_key, Connection.build(c)
- end
- end
-
- @config.output_ports.each do |p|
- p.output_connections.each do |c|
- RFlow.logger.debug "Configuring output port '#{p.name}' (#{p.uuid}) key '#{c.output_port_key}' with #{c.type.to_s} connection '#{c.name}' (#{c.uuid})"
- ports.by_uuid[p.uuid].add_connection c.output_port_key, Connection.build(c)
- end
- end
- end
end
end