lib/arc-furnace/pipeline.rb in arc-furnace-0.1.13 vs lib/arc-furnace/pipeline.rb in arc-furnace-0.1.14
- old
+ new
@@ -87,9 +87,21 @@
end
raise "Filter #{type} is not a Filter!" unless type <= Filter
define_intermediate(node_id, type: type, params: params)
end
+ # Define a node that observes rows. By default you get a BlockObserver
+ # (and when this metaprogramming method is passed a block) that will be passed
+ # a hash for each row. The result of the block is ignored; all rows
+ # are forwarded to the next node in the line
+ def self.observer(node_id, type: BlockObserver, params: {}, &block)
+ if block_given? && type <= BlockObserver
+ params[:block] = block
+ end
+ raise "Observer #{type} is not an Observer!" unless type <= Observer
+ define_intermediate(node_id, type: type, params: params)
+ end
+
# Create an instance to run a transformation, passing the parameters to
# instantiate the transform instance with. The resulting class instance
# will have a single public method--#execute, which will perform the
# transformation.
def self.instance(params = {})