lib/arc-furnace/pipeline.rb in arc-furnace-0.1.41 vs lib/arc-furnace/pipeline.rb in arc-furnace-0.1.42
- old
+ new
@@ -15,11 +15,11 @@
end
# Define the sink for this transformation. Only a single sink may be
# specified per transformation. The sink is delivered a hash per row or
# entity, and feeds them from the graph of nodes above it.
- def self.sink(type: , source:, params:)
+ def self.sink(type:, source:, params:)
if sink_node
raise 'Sink already defined!'
end
@sink_node = -> do
@@ -45,11 +45,11 @@
# if an associated entity is not found in the hash for the join key
def self.inner_join(node_id, type: ArcFurnace::InnerJoin, params:)
define_intermediate(node_id, type: type, params: params)
end
- # Define an outer join nod e where rows from the source are kept
+ # Define an outer join node where rows from the source are kept
# even if an associated entity is not found in the hash for the join key
def self.outer_join(node_id, type: ArcFurnace::OuterJoin, params:)
define_intermediate(node_id, type: type, params: params)
end
@@ -63,10 +63,16 @@
end
raise "Transform #{type} is not a Transform!" unless type <= Transform
define_intermediate(node_id, type: type, params: params)
end
+ # Define a merge node where rows from multiple source nodes are merged
+ # into a single row
+ def self.merge(node_id, type: ArcFurnace::Merge, params:)
+ define_intermediate(node_id, type: type, params: params)
+ end
+
# Define a node that unfolds rows. By default you get a BlockUnfold
# (and when this metaprogramming method is passed a block) that will be passed
# a hash for each row. The result of the block becomes the set of rows for the next
# downstream node.
def self.unfold(node_id, type: BlockUnfold, params: {}, &block)
@@ -171,10 +177,12 @@
end
def resolve_parameters(node_id, params_to_resolve)
params_to_resolve.each_with_object({}) do |(key, value), result|
result[key] =
- if value.is_a?(Symbol)
+ if key == :sources
+ value.map { |_value| resolve_parameter(node_id, _value) }
+ elsif value.is_a?(Symbol)
# Allow resolution of intermediates
resolve_parameter(node_id, value)
elsif value.nil?
resolve_parameter(node_id, key)
else