lib/rflow/component.rb in rflow-1.3.0 vs lib/rflow/component.rb in rflow-1.3.1

- old
+ new

@@ -1,26 +1,41 @@ require 'ostruct' require 'rflow/message' require 'rflow/component/port' class RFlow + # Parent class for all RFlow components. class Component class << self - # Keep track of available component subclasses + # Keep track of available component subclasses. + # @!visibility private def inherited(subclass) RFlow::Configuration.add_available_component(subclass) end - # Define an input port with a given name + # When declaring your component class, defines an input port with a given + # name. Will also define a port accessor method named after the port for + # retrieving it. + # + # @param name [String] + # @return [void] def input_port(name); define_port(defined_input_ports, name); end - # Define an output port with a given name + # When declaring your component class, defines an output port with a + # given name. Will also define a port accessor method named after the + # port for retrieving it. + # + # @param name [String] + # @return [void] def output_port(name); define_port(defined_output_ports, name); end + # @!visibility private def defined_input_ports; @defined_input_ports ||= {}; end + # @!visibility private def defined_output_ports; @defined_output_ports ||= {}; end + # @!visibility private def define_port(collection, name) collection[name.to_s] = true # Create the port accessor method based on the port name define_method name.to_s.to_sym do @@ -30,14 +45,18 @@ # Attempt to instantiate a component described by the config # specification. This assumes that the specification of a # component is a fully qualified Ruby class that has already # been loaded. It will first attempt to find subclasses of - # RFlow::Component (in the available_components hash) and then + # {RFlow::Component} (in {Configuration#available_components}) and then # attempt to constantize the specification into a different # class. Future releases will support external (i.e. non-managed - # components), but the current stuff only supports Ruby classes + # components), but the current stuff only supports Ruby classes. + # + # @param worker [Shard::Worker] the worker process for the component to run in + # @param config [Configuration::Component] the component configuration + # @return [RFlow::Component] an instance of the component class def build(worker, config) raise NotImplementedError, "Non-managed components not yet implemented for component '#{config.name}' as '#{config.specification}' (#{config.uuid})" unless config.managed? RFlow.logger.debug "Instantiating component '#{config.name}' as '#{config.specification}' (#{config.uuid})" begin @@ -72,63 +91,86 @@ raise RuntimeError, "Could not instantiate component '#{config.name}' as '#{config.specification}' (#{config.uuid}): #{e.class} #{e.message}, because: #{e.backtrace.inspect}" end end end - attr_accessor :uuid, :name - attr_reader :ports, :worker + # The UUID of the component. + # @return [String] + attr_accessor :uuid + # The name of the component. + # @return [String] + attr_accessor :name + # Collection of the component's input and output ports. + # @return [PortCollection] + attr_reader :ports + # Reference to the worker process in which this instance of the component is running. + # @return [Shard::Worker] + attr_reader :worker + # @param args [Hash] supported args are +:name+, +:uuid+, +:worker+ def initialize(args = {}) @name = args[:name] @uuid = args[:uuid] @worker = args[:worker] @ports = PortCollection.new self.class.defined_input_ports.each {|name, _| ports << InputPort.new(self, name: name) } self.class.defined_output_ports.each {|name, _| ports << OutputPort.new(self, name: name) } end + # @!attribute shard [r] + # Reference to the component's worker process's {Shard}. + # @return [Shard] def shard; worker.shard if worker; end # Returns a list of connected input ports. Each port will have # one or more keys associated with a particular connection. + # @return [Array<InputPort>] def input_ports; ports.by_type['RFlow::Component::InputPort']; end # Returns a list of connected output ports. Each port will have # one or more keys associated with the particular connection. + # @return [Array<OutputPort>] def output_ports; ports.by_type['RFlow::Component::OutputPort']; end + # @!visibility private def configure_input_port!(port_name, options = {}) RFlow.logger.debug "Configuring component '#{name}' (#{uuid}) input port '#{port_name}' (#{options[:uuid]})" unless self.class.defined_input_ports.include? port_name raise ArgumentError, "Input port '#{port_name}' not defined on component '#{self.class}'" end ports.by_name[port_name].uuid = options[:uuid] end + # @!visibility private def configure_output_port!(port_name, options = {}) RFlow.logger.debug "Configuring component '#{name}' (#{uuid}) output port '#{port_name}' (#{options[:uuid]})" unless self.class.defined_output_ports.include? port_name raise ArgumentError, "Output port '#{port_name}' not defined on component '#{self.class}'" end ports.by_name[port_name].uuid = options[:uuid] end + # @!visibility private # Tell the component to establish its ports' connections, i.e. make # the connection. Uses the underlying connection object. Also # establishes the callbacks for each of the input ports def connect_inputs! input_ports.each {|port| port.recv_callback = method(:process_message) } input_ports.each(&:connect!) end + # @!visibility private # Tell the component to establish its ports' connections, i.e. make # the connection. Uses the underlying connection object. + # @!visibility private def connect_outputs! output_ports.each(&:connect!) end + # Pretty-printed version of the component, its ports, their keys, and their connections. + # @return [String] def to_s string = "Component '#{name}' (#{uuid})\n" ports.each do |port| port.keys.each do |key| port[key].each do |connection| @@ -139,31 +181,39 @@ string end # Method that should be overridden by a subclass to provide for # component-specific configuration. The subclass should use the - # self.configuration attribute (@configuration) to store its - # particular configuration. The incoming deserialized_configuration - # parameter is from the RFlow configuration database and is (most - # likely) a hash. Don't assume that the keys are symbols + # {configuration} attribute (+@configuration+) to store its + # particular configuration. + # @param deserialized_configuration [Hash] from the RFlow configuration database; most likely a Hash. Don't assume that the keys are symbols! + # @return [void] def configure!(deserialized_configuration); end # Main component running method. Subclasses should implement if # they want to set up any EventMachine stuffs (servers, clients, - # etc) + # etc.). + # @return [void] def run!; end # Method called when a message is received on an input port. - # Subclasses should implement if they want to receive messages + # Subclasses should implement if they want to receive messages. + # @param input_port [RFlow::Component::InputPort] the input port the message was received on + # @param input_port_key [String] if the message was received on a keyed subport, this is the key + # @param connection [RFlow::Connection] the connection the message was received on + # @param message [RFlow::Message] the message itself + # @return [void] def process_message(input_port, input_port_key, connection, message); end # Method called when RFlow is shutting down. Subclasses should - # implment to terminate any servers/clients (or let them finish) - # and stop sending new data through the flow + # implement to terminate any servers/clients (or let them finish) + # and stop sending new data through the flow. + # @return [void] def shutdown!; end # Method called after all components have been shutdown! and just - # before the global RFlow exit. Sublcasses should implement to - # cleanup any leftover state, e.g. flush file handles, etc + # before the global RFlow exit. Sublcasses should implement to + # cleanup any leftover state, e.g. flush file handles, etc. + # @return [void] def cleanup!; end end end