lib/arc-furnace/pipeline.rb in arc-furnace-0.1.41 vs lib/arc-furnace/pipeline.rb in arc-furnace-0.1.42

- old
+ new

@@ -15,11 +15,11 @@ end # Define the sink for this transformation. Only a single sink may be # specified per transformation. The sink is delivered a hash per row or # entity, and feeds them from the graph of nodes above it. - def self.sink(type: , source:, params:) + def self.sink(type:, source:, params:) if sink_node raise 'Sink already defined!' end @sink_node = -> do @@ -45,11 +45,11 @@ # if an associated entity is not found in the hash for the join key def self.inner_join(node_id, type: ArcFurnace::InnerJoin, params:) define_intermediate(node_id, type: type, params: params) end - # Define an outer join nod e where rows from the source are kept + # Define an outer join node where rows from the source are kept # even if an associated entity is not found in the hash for the join key def self.outer_join(node_id, type: ArcFurnace::OuterJoin, params:) define_intermediate(node_id, type: type, params: params) end @@ -63,10 +63,16 @@ end raise "Transform #{type} is not a Transform!" unless type <= Transform define_intermediate(node_id, type: type, params: params) end + # Define a merge node where rows from multiple source nodes are merged + # into a single row + def self.merge(node_id, type: ArcFurnace::Merge, params:) + define_intermediate(node_id, type: type, params: params) + end + # Define a node that unfolds rows. By default you get a BlockUnfold # (and when this metaprogramming method is passed a block) that will be passed # a hash for each row. The result of the block becomes the set of rows for the next # downstream node. def self.unfold(node_id, type: BlockUnfold, params: {}, &block) @@ -171,10 +177,12 @@ end def resolve_parameters(node_id, params_to_resolve) params_to_resolve.each_with_object({}) do |(key, value), result| result[key] = - if value.is_a?(Symbol) + if key == :sources + value.map { |_value| resolve_parameter(node_id, _value) } + elsif value.is_a?(Symbol) # Allow resolution of intermediates resolve_parameter(node_id, value) elsif value.nil? resolve_parameter(node_id, key) else