lib/rflow/component.rb in rflow-1.0.0a3 vs lib/rflow/component.rb in rflow-1.0.0a4
- old
+ new
@@ -34,11 +34,11 @@
# been loaded. It will first attempt to find subclasses of
# RFlow::Component (in the available_components hash) and then
# attempt to constantize the specification into a different
# class. Future releases will support external (i.e. non-managed
# components), but the current stuff only supports Ruby classes
- def build(config)
+ def build(worker, config)
raise NotImplementedError, "Non-managed components not yet implemented for component '#{config.name}' as '#{config.specification}' (#{config.uuid})" unless config.managed?
RFlow.logger.debug "Instantiating component '#{config.name}' as '#{config.specification}' (#{config.uuid})"
begin
component_class = RFlow.configuration.available_components[config.specification]
@@ -48,11 +48,11 @@
else
RFlow.logger.debug "Component not found in configuration.available_components, constantizing component '#{config.specification}'"
component_class = config.specification.constantize
end
- component_class.new(uuid: config.uuid, name: config.name).tap do |component|
+ component_class.new(worker: worker, uuid: config.uuid, name: config.name).tap do |component|
config.input_ports.each {|p| component.configure_input_port! p.name, uuid: p.uuid }
config.output_ports.each {|p| component.configure_output_port! p.name, uuid: p.uuid }
config.input_ports.each do |p|
p.input_connections.each do |c|
@@ -73,19 +73,22 @@
end
end
end
attr_accessor :uuid, :name
- attr_reader :ports
+ attr_reader :ports, :worker
def initialize(args = {})
@name = args[:name]
@uuid = args[:uuid]
+ @worker = args[:worker]
@ports = PortCollection.new
self.class.defined_input_ports.each {|name, _| ports << InputPort.new(self, name: name) }
self.class.defined_output_ports.each {|name, _| ports << OutputPort.new(self, name: name) }
end
+
+ def shard; worker.shard if worker; end
# Returns a list of connected input ports. Each port will have
# one or more keys associated with a particular connection.
def input_ports; ports.by_type["RFlow::Component::InputPort"]; end