lib/rflow/component.rb in rflow-1.3.0 vs lib/rflow/component.rb in rflow-1.3.1
- old
+ new
@@ -1,26 +1,41 @@
require 'ostruct'
require 'rflow/message'
require 'rflow/component/port'
class RFlow
+ # Parent class for all RFlow components.
class Component
class << self
- # Keep track of available component subclasses
+ # Keep track of available component subclasses.
+ # @!visibility private
def inherited(subclass)
RFlow::Configuration.add_available_component(subclass)
end
- # Define an input port with a given name
+ # When declaring your component class, defines an input port with a given
+ # name. Will also define a port accessor method named after the port for
+ # retrieving it.
+ #
+ # @param name [String]
+ # @return [void]
def input_port(name); define_port(defined_input_ports, name); end
- # Define an output port with a given name
+ # When declaring your component class, defines an output port with a
+ # given name. Will also define a port accessor method named after the
+ # port for retrieving it.
+ #
+ # @param name [String]
+ # @return [void]
def output_port(name); define_port(defined_output_ports, name); end
+ # @!visibility private
def defined_input_ports; @defined_input_ports ||= {}; end
+ # @!visibility private
def defined_output_ports; @defined_output_ports ||= {}; end
+ # @!visibility private
def define_port(collection, name)
collection[name.to_s] = true
# Create the port accessor method based on the port name
define_method name.to_s.to_sym do
@@ -30,14 +45,18 @@
# Attempt to instantiate a component described by the config
# specification. This assumes that the specification of a
# component is a fully qualified Ruby class that has already
# been loaded. It will first attempt to find subclasses of
- # RFlow::Component (in the available_components hash) and then
+ # {RFlow::Component} (in {Configuration#available_components}) and then
# attempt to constantize the specification into a different
# class. Future releases will support external (i.e. non-managed
- # components), but the current stuff only supports Ruby classes
+ # components), but the current stuff only supports Ruby classes.
+ #
+ # @param worker [Shard::Worker] the worker process for the component to run in
+ # @param config [Configuration::Component] the component configuration
+ # @return [RFlow::Component] an instance of the component class
def build(worker, config)
raise NotImplementedError, "Non-managed components not yet implemented for component '#{config.name}' as '#{config.specification}' (#{config.uuid})" unless config.managed?
RFlow.logger.debug "Instantiating component '#{config.name}' as '#{config.specification}' (#{config.uuid})"
begin
@@ -72,63 +91,86 @@
raise RuntimeError, "Could not instantiate component '#{config.name}' as '#{config.specification}' (#{config.uuid}): #{e.class} #{e.message}, because: #{e.backtrace.inspect}"
end
end
end
- attr_accessor :uuid, :name
- attr_reader :ports, :worker
+ # The UUID of the component.
+ # @return [String]
+ attr_accessor :uuid
+ # The name of the component.
+ # @return [String]
+ attr_accessor :name
+ # Collection of the component's input and output ports.
+ # @return [PortCollection]
+ attr_reader :ports
+ # Reference to the worker process in which this instance of the component is running.
+ # @return [Shard::Worker]
+ attr_reader :worker
+ # @param args [Hash] supported args are +:name+, +:uuid+, +:worker+
def initialize(args = {})
@name = args[:name]
@uuid = args[:uuid]
@worker = args[:worker]
@ports = PortCollection.new
self.class.defined_input_ports.each {|name, _| ports << InputPort.new(self, name: name) }
self.class.defined_output_ports.each {|name, _| ports << OutputPort.new(self, name: name) }
end
+ # @!attribute shard [r]
+ # Reference to the component's worker process's {Shard}.
+ # @return [Shard]
def shard; worker.shard if worker; end
# Returns a list of connected input ports. Each port will have
# one or more keys associated with a particular connection.
+ # @return [Array<InputPort>]
def input_ports; ports.by_type['RFlow::Component::InputPort']; end
# Returns a list of connected output ports. Each port will have
# one or more keys associated with the particular connection.
+ # @return [Array<OutputPort>]
def output_ports; ports.by_type['RFlow::Component::OutputPort']; end
+ # @!visibility private
def configure_input_port!(port_name, options = {})
RFlow.logger.debug "Configuring component '#{name}' (#{uuid}) input port '#{port_name}' (#{options[:uuid]})"
unless self.class.defined_input_ports.include? port_name
raise ArgumentError, "Input port '#{port_name}' not defined on component '#{self.class}'"
end
ports.by_name[port_name].uuid = options[:uuid]
end
+ # @!visibility private
def configure_output_port!(port_name, options = {})
RFlow.logger.debug "Configuring component '#{name}' (#{uuid}) output port '#{port_name}' (#{options[:uuid]})"
unless self.class.defined_output_ports.include? port_name
raise ArgumentError, "Output port '#{port_name}' not defined on component '#{self.class}'"
end
ports.by_name[port_name].uuid = options[:uuid]
end
+ # @!visibility private
# Tell the component to establish its ports' connections, i.e. make
# the connection. Uses the underlying connection object. Also
# establishes the callbacks for each of the input ports
def connect_inputs!
input_ports.each {|port| port.recv_callback = method(:process_message) }
input_ports.each(&:connect!)
end
+ # @!visibility private
# Tell the component to establish its ports' connections, i.e. make
# the connection. Uses the underlying connection object.
+ # @!visibility private
def connect_outputs!
output_ports.each(&:connect!)
end
+ # Pretty-printed version of the component, its ports, their keys, and their connections.
+ # @return [String]
def to_s
string = "Component '#{name}' (#{uuid})\n"
ports.each do |port|
port.keys.each do |key|
port[key].each do |connection|
@@ -139,31 +181,39 @@
string
end
# Method that should be overridden by a subclass to provide for
# component-specific configuration. The subclass should use the
- # self.configuration attribute (@configuration) to store its
- # particular configuration. The incoming deserialized_configuration
- # parameter is from the RFlow configuration database and is (most
- # likely) a hash. Don't assume that the keys are symbols
+ # {configuration} attribute (+@configuration+) to store its
+ # particular configuration.
+ # @param deserialized_configuration [Hash] from the RFlow configuration database; most likely a Hash. Don't assume that the keys are symbols!
+ # @return [void]
def configure!(deserialized_configuration); end
# Main component running method. Subclasses should implement if
# they want to set up any EventMachine stuffs (servers, clients,
- # etc)
+ # etc.).
+ # @return [void]
def run!; end
# Method called when a message is received on an input port.
- # Subclasses should implement if they want to receive messages
+ # Subclasses should implement if they want to receive messages.
+ # @param input_port [RFlow::Component::InputPort] the input port the message was received on
+ # @param input_port_key [String] if the message was received on a keyed subport, this is the key
+ # @param connection [RFlow::Connection] the connection the message was received on
+ # @param message [RFlow::Message] the message itself
+ # @return [void]
def process_message(input_port, input_port_key, connection, message); end
# Method called when RFlow is shutting down. Subclasses should
- # implment to terminate any servers/clients (or let them finish)
- # and stop sending new data through the flow
+ # implement to terminate any servers/clients (or let them finish)
+ # and stop sending new data through the flow.
+ # @return [void]
def shutdown!; end
# Method called after all components have been shutdown! and just
- # before the global RFlow exit. Sublcasses should implement to
- # cleanup any leftover state, e.g. flush file handles, etc
+ # before the global RFlow exit. Sublcasses should implement to
+ # cleanup any leftover state, e.g. flush file handles, etc.
+ # @return [void]
def cleanup!; end
end
end