lib/wukong/streamer/base.rb in wukong-2.0.0 vs lib/wukong/streamer/base.rb in wukong-2.0.1
- old
+ new
@@ -26,15 +26,19 @@
each_record do |line|
record = recordize(line.chomp) or next
process(*record) do |output_record|
emit output_record
end
- monitor.periodically(record.to_s[0..1000])
+ track(record)
end
after_stream
end
+ def track record
+ monitor.periodically(record.to_s[0..1000])
+ end
+
def each_record &block
$stdin.each(&block)
end
# Called exactly once, before streaming begins
@@ -101,10 +105,10 @@
end
# Creates a new object of this class and injects the given block
# as the process method
def self.mapper *args, &block
- self.new.mapper *args, &block
+ self.new.mapper(*args, &block)
end
# Delegates back to Wukong to run this instance as a mapper
def run options={}
Wukong.run(self, nil, options)