lib/rflow/component.rb in rflow-1.0.0a1 vs lib/rflow/component.rb in rflow-1.0.0a2
- old
+ new
@@ -1,223 +1,118 @@
require 'ostruct'
-
require 'rflow/message'
require 'rflow/component/port'
class RFlow
class Component
- # Keep track of available component subclasses
- def self.inherited(subclass)
- RFlow::Configuration.add_available_component(subclass)
- end
-
-
- # The Component class methods used in the creation of a component
class << self
- def defined_input_ports
- @defined_input_ports ||= Hash.new
+ # Keep track of available component subclasses
+ def inherited(subclass)
+ RFlow::Configuration.add_available_component(subclass)
end
- def defined_output_ports
- @defined_output_ports ||= Hash.new
- end
-
- # TODO: Update the class vs instance stuffs here to be correct
- # Port defintions only have names
-
- # TODO: consider class-based UUIDs to identify component types
-
# Define an input port with a given name
- def input_port(port_name)
- define_port(defined_input_ports, port_name)
- end
+ def input_port(name); define_port(defined_input_ports, name); end
# Define an output port with a given name
- def output_port(port_name)
- define_port(defined_output_ports, port_name)
- end
+ def output_port(name); define_port(defined_output_ports, name); end
- # Helper method to keep things DRY for standard component
- # definition methods input_port and output_port
- def define_port(collection, port_name)
- collection[port_name.to_s] = true
+ def defined_input_ports; @defined_input_ports ||= {}; end
+ def defined_output_ports; @defined_output_ports ||= {}; end
+ def define_port(collection, name)
+ collection[name.to_s] = true
+
# Create the port accessor method based on the port name
- define_method port_name.to_s.to_sym do
- port = ports.by_name[port_name.to_s]
+ define_method name.to_s.to_sym do
+ port = ports.by_name[name.to_s]
return port if port
# If the port was not connected, return a port-like object
# that can respond/log but doesn't send any data. Note,
# it won't be available in the 'by_uuid' collection, as it
# doesn't have a configured uuid
- RFlow.logger.debug "'#{self.name}##{port_name}' not connected, creating a disconnected port"
+ RFlow.logger.debug "'#{self.name}##{name}' not connected, creating a disconnected port"
- disconnected_port = DisconnectedPort.new(OpenStruct.new(:name => port_name, :uuid => 0))
- ports << disconnected_port
- disconnected_port
+ DisconnectedPort.new(OpenStruct.new(:name => name, :uuid => 0)).tap {|d| ports << d }
end
end
-
# Attempt to instantiate a component described by the config
# specification. This assumes that the specification of a
# component is a fully qualified Ruby class that has already
# been loaded. It will first attempt to find subclasses of
# RFlow::Component (in the available_components hash) and then
# attempt to constantize the specification into a different
# class. Future releases will support external (i.e. non-managed
# components), but the current stuff only supports Ruby classes
def build(config)
- if config.managed?
- RFlow.logger.debug "Instantiating component '#{config.name}' as '#{config.specification}' (#{config.uuid})"
- begin
- RFlow.logger.debug RFlow.configuration.available_components.inspect
- instantiated_component = if RFlow.configuration.available_components.include? config.specification
- RFlow.logger.debug "Component found in configuration.available_components['#{config.specification}']"
- RFlow.configuration.available_components[config.specification].new(config)
- else
- RFlow.logger.debug "Component not found in configuration.available_components, constantizing component '#{config.specification}'"
- config.specification.constantize.new(config)
- end
- rescue NameError => e
- error_message = "Could not instantiate component '#{config.name}' as '#{config.specification}' (#{config.uuid}): the class '#{config.specification}' was not found"
- RFlow.logger.error error_message
- raise RuntimeError, error_message
- rescue Exception => e
- error_message = "Could not instantiate component '#{config.name}' as '#{config.specification}' (#{config.uuid}): #{e.class} #{e.message}"
- RFlow.logger.error error_message
- raise RuntimeError, error_message
+ raise NotImplementedError, "Non-managed components not yet implemented for component '#{config.name}' as '#{config.specification}' (#{config.uuid})" unless config.managed?
+
+ RFlow.logger.debug "Instantiating component '#{config.name}' as '#{config.specification}' (#{config.uuid})"
+ begin
+ component = RFlow.configuration.available_components[config.specification]
+
+ if component
+ RFlow.logger.debug "Component found in configuration.available_components['#{config.specification}']"
+ component.new(config)
+ else
+ RFlow.logger.debug "Component not found in configuration.available_components, constantizing component '#{config.specification}'"
+ config.specification.constantize.new(config)
end
- else
- error_message = "Non-managed components not yet implemented for component '#{config.name}' as '#{config.specification}' (#{config.uuid})"
- RFlow.logger.error error_message
- raise NotImplementedError, error_message
+ rescue NameError => e
+ raise RuntimeError, "Could not instantiate component '#{config.name}' as '#{config.specification}' (#{config.uuid}): the class '#{config.specification}' was not found"
+ rescue Exception => e
+ raise RuntimeError, "Could not instantiate component '#{config.name}' as '#{config.specification}' (#{config.uuid}): #{e.class} #{e.message}"
end
-
- instantiated_component
end
end
- attr_reader :config, :uuid, :name, :ports
+ attr_reader :uuid, :name, :ports
def initialize(config)
@config = config
@uuid = config.uuid
@name = config.name
@ports = PortCollection.new
configure_ports!
configure_connections!
- configure!(config.options)
end
-
# Returns a list of connected input ports. Each port will have
# one or more keys associated with a particular connection.
- def input_ports
- ports.by_type["RFlow::Component::InputPort"]
- end
+ def input_ports; ports.by_type["RFlow::Component::InputPort"]; end
-
# Returns a list of connected output ports. Each port will have
# one or more keys associated with the particular connection.
- def output_ports
- ports.by_type["RFlow::Component::OutputPort"]
- end
+ def output_ports; ports.by_type["RFlow::Component::OutputPort"]; end
-
# Returns a list of disconnected output ports.
- def disconnected_ports
- ports.by_type["RFlow::Component::DisconnectedPort"]
- end
+ def disconnected_ports; ports.by_type["RFlow::Component::DisconnectedPort"]; end
-
- def configure_ports!
- # Send the port configuration to each component
- config.input_ports.each do |input_port_config|
- RFlow.logger.debug "Configuring component '#{name}' (#{uuid}) with input port '#{input_port_config.name}' (#{input_port_config.uuid})"
- configure_input_port!(input_port_config)
- end
-
- config.output_ports.each do |output_port_config|
- RFlow.logger.debug "Configuring component '#{name}' (#{uuid}) with output port '#{output_port_config.name}' (#{output_port_config.uuid})"
- configure_output_port!(output_port_config)
- end
- end
-
-
- def configure_input_port!(port_config)
- unless self.class.defined_input_ports.include? port_config.name
- raise ArgumentError, "Input port '#{port_config.name}' not defined on component '#{self.class}'"
- end
- ports << InputPort.new(port_config)
- end
-
-
- def configure_output_port!(port_config)
- unless self.class.defined_output_ports.include? port_config.name
- raise ArgumentError, "Output port '#{port_config.name}' not defined on component '#{self.class}'"
- end
- ports << OutputPort.new(port_config)
- end
-
-
- def configure_connections!
- config.input_ports.each do |input_port_config|
- input_port_config.input_connections.each do |input_connection_config|
- RFlow.logger.debug "Configuring input port '#{input_port_config.name}' (#{input_port_config.uuid}) key '#{input_connection_config.input_port_key}' with #{input_connection_config.type.to_s} connection '#{input_connection_config.name}' (#{input_connection_config.uuid})"
- ports.by_uuid[input_port_config.uuid].add_connection(input_connection_config.input_port_key, Connection.build(input_connection_config))
- end
- end
-
- config.output_ports.each do |output_port_config|
- output_port_config.output_connections.each do |output_connection_config|
- RFlow.logger.debug "Configuring output port '#{output_port_config.name}' (#{output_port_config.uuid}) key '#{output_connection_config.output_port_key}' with #{output_connection_config.type.to_s} connection '#{output_connection_config.name}' (#{output_connection_config.uuid})"
- ports.by_uuid[output_port_config.uuid].add_connection(output_connection_config.output_port_key, Connection.build(output_connection_config))
- end
- end
- end
-
-
- # Tell the component to establish it's ports' connections, i.e. make
+ # Tell the component to establish its ports' connections, i.e. make
# the connection. Uses the underlying connection object. Also
# establishes the callbacks for each of the input ports
def connect!
- input_ports.each do |input_port|
- input_port.connect!
-
- # Create the callbacks for recieving messages as a proc
- input_port.keys.each do |input_port_key|
- keyed_connections = input_port[input_port_key]
- keyed_connections.each do |connection|
- connection.recv_callback = Proc.new do |message|
- process_message(input_port, input_port_key, connection, message)
- end
- end
- end
- end
-
- output_ports.each do |output_port|
- output_port.connect!
- end
+ input_ports.each {|port| port.recv_callback = method(:process_message) }
+ input_ports.each(&:connect!)
+ output_ports.each(&:connect!)
end
-
def to_s
string = "Component '#{name}' (#{uuid})\n"
ports.each do |port|
- port.keys.each do |port_key|
- port[port_key].each do |connection|
- string << "\t#{port.class.to_s} '#{port.name}' (#{port.uuid}) key '#{port_key}' connection '#{connection.name}' (#{connection.uuid})\n"
+ port.keys.each do |key|
+ port[key].each do |connection|
+ string << "\t#{port.class.to_s} '#{port.name}' (#{port.uuid}) key '#{key}' connection '#{connection.name}' (#{connection.uuid})\n"
end
end
end
string
end
-
# Method that should be overridden by a subclass to provide for
# component-specific configuration. The subclass should use the
# self.configuration attribute (@configuration) to store its
# particular configuration. The incoming deserialized_configuration
# parameter is from the RFlow configuration database and is (most
@@ -241,7 +136,41 @@
# Method called after all components have been shutdown! and just
# before the global RFlow exit. Sublcasses should implement to
# cleanup any leftover state, e.g. flush file handles, etc
def cleanup!; end
- end # class Component
-end # class RFlow
+ private
+ def configure_ports!
+ @config.input_ports.each do |p|
+ RFlow.logger.debug "Configuring component '#{name}' (#{uuid}) with input port '#{p.name}' (#{p.uuid})"
+ unless self.class.defined_input_ports.include? p.name
+ raise ArgumentError, "Input port '#{p.name}' not defined on component '#{self.class}'"
+ end
+ ports << InputPort.new(p)
+ end
+
+ @config.output_ports.each do |p|
+ RFlow.logger.debug "Configuring component '#{name}' (#{uuid}) with output port '#{p.name}' (#{p.uuid})"
+ unless self.class.defined_output_ports.include? p.name
+ raise ArgumentError, "Output port '#{p.name}' not defined on component '#{self.class}'"
+ end
+ ports << OutputPort.new(p)
+ end
+ end
+
+ def configure_connections!
+ @config.input_ports.each do |p|
+ p.input_connections.each do |c|
+ RFlow.logger.debug "Configuring input port '#{p.name}' (#{p.uuid}) key '#{c.input_port_key}' with #{c.type.to_s} connection '#{c.name}' (#{c.uuid})"
+ ports.by_uuid[p.uuid].add_connection c.input_port_key, Connection.build(c)
+ end
+ end
+
+ @config.output_ports.each do |p|
+ p.output_connections.each do |c|
+ RFlow.logger.debug "Configuring output port '#{p.name}' (#{p.uuid}) key '#{c.output_port_key}' with #{c.type.to_s} connection '#{c.name}' (#{c.uuid})"
+ ports.by_uuid[p.uuid].add_connection c.output_port_key, Connection.build(c)
+ end
+ end
+ end
+ end
+end