lib/rflow/component.rb in rflow-1.0.0a3 vs lib/rflow/component.rb in rflow-1.0.0a4

- old
+ new

@@ -34,11 +34,11 @@ # been loaded. It will first attempt to find subclasses of # RFlow::Component (in the available_components hash) and then # attempt to constantize the specification into a different # class. Future releases will support external (i.e. non-managed # components), but the current stuff only supports Ruby classes - def build(config) + def build(worker, config) raise NotImplementedError, "Non-managed components not yet implemented for component '#{config.name}' as '#{config.specification}' (#{config.uuid})" unless config.managed? RFlow.logger.debug "Instantiating component '#{config.name}' as '#{config.specification}' (#{config.uuid})" begin component_class = RFlow.configuration.available_components[config.specification] @@ -48,11 +48,11 @@ else RFlow.logger.debug "Component not found in configuration.available_components, constantizing component '#{config.specification}'" component_class = config.specification.constantize end - component_class.new(uuid: config.uuid, name: config.name).tap do |component| + component_class.new(worker: worker, uuid: config.uuid, name: config.name).tap do |component| config.input_ports.each {|p| component.configure_input_port! p.name, uuid: p.uuid } config.output_ports.each {|p| component.configure_output_port! p.name, uuid: p.uuid } config.input_ports.each do |p| p.input_connections.each do |c| @@ -73,19 +73,22 @@ end end end attr_accessor :uuid, :name - attr_reader :ports + attr_reader :ports, :worker def initialize(args = {}) @name = args[:name] @uuid = args[:uuid] + @worker = args[:worker] @ports = PortCollection.new self.class.defined_input_ports.each {|name, _| ports << InputPort.new(self, name: name) } self.class.defined_output_ports.each {|name, _| ports << OutputPort.new(self, name: name) } end + + def shard; worker.shard if worker; end # Returns a list of connected input ports. Each port will have # one or more keys associated with a particular connection. def input_ports; ports.by_type["RFlow::Component::InputPort"]; end