lib/wukong/streamer/base.rb in wukong-1.5.4 vs lib/wukong/streamer/base.rb in wukong-2.0.0

- old
+ new

@@ -2,19 +2,23 @@ module Streamer class Base # Options, initially set from the command-line args -- see # Script#process_argv! - attr_accessor :options + attr_reader :own_options # # Accepts option hash from script runner # def initialize options={} - self.options = options + @own_options = options end + def options + Settings.deep_merge own_options + end + # # Pass each record to +#process+ # def stream Log.info("Streaming on:\t%s" % [Script.input_file]) unless Script.input_file.blank? @@ -22,10 +26,11 @@ each_record do |line| record = recordize(line.chomp) or next process(*record) do |output_record| emit output_record end + monitor.periodically(record.to_s[0..1000]) end after_stream end def each_record &block @@ -62,19 +67,55 @@ # # Process each record in turn, yielding the records to emit # def process *args, &block - raise "override the process method in your implementation: it should process each record." end # # To track processing errors inline, # pass the line back to bad_record! # def bad_record! key, *args warn "Bad record #{args.inspect[0..400]}" puts ["bad_record-"+key, *args].join("\t") end + + # A periodic logger to track progress + def monitor + @monitor ||= PeriodicMonitor.new + end + + # Defines a process method on the fly to execute the given mapper. + # + # This is still experimental. + # Among other limitations, you can't use ++yield++ -- you have to call + # emit() directly. + def mapper &mapper_block + @mapper_block = mapper_block.to_proc + self.instance_eval do + def process *args, &block + instance_exec(*args, &@mapper_block) + end + end + self + end + + # Creates a new object of this class and injects the given block + # as the process method + def self.mapper *args, &block + self.new.mapper *args, &block + end + + # Delegates back to Wukong to run this instance as a mapper + def run options={} + Wukong.run(self, nil, options) + end + + # Creates a new object of this class and runs it + def self.run options={} + Wukong.run(self.new, nil, options) + end + end end end