lib/arc-furnace/pipeline.rb in arc-furnace-0.1.13 vs lib/arc-furnace/pipeline.rb in arc-furnace-0.1.14

- old
+ new

@@ -87,9 +87,21 @@ end raise "Filter #{type} is not a Filter!" unless type <= Filter define_intermediate(node_id, type: type, params: params) end + # Define a node that observes rows. By default you get a BlockObserver + # (and when this metaprogramming method is passed a block) that will be passed + # a hash for each row. The result of the block is ignored; all rows + # are forwarded to the next node in the line + def self.observer(node_id, type: BlockObserver, params: {}, &block) + if block_given? && type <= BlockObserver + params[:block] = block + end + raise "Observer #{type} is not an Observer!" unless type <= Observer + define_intermediate(node_id, type: type, params: params) + end + # Create an instance to run a transformation, passing the parameters to # instantiate the transform instance with. The resulting class instance # will have a single public method--#execute, which will perform the # transformation. def self.instance(params = {})