lib/wukong/streamer/base.rb in wukong-1.5.4 vs lib/wukong/streamer/base.rb in wukong-2.0.0
- old
+ new
@@ -2,19 +2,23 @@
module Streamer
class Base
# Options, initially set from the command-line args -- see
# Script#process_argv!
- attr_accessor :options
+ attr_reader :own_options
#
# Accepts option hash from script runner
#
def initialize options={}
- self.options = options
+ @own_options = options
end
+ def options
+ Settings.deep_merge own_options
+ end
+
#
# Pass each record to +#process+
#
def stream
Log.info("Streaming on:\t%s" % [Script.input_file]) unless Script.input_file.blank?
@@ -22,10 +26,11 @@
each_record do |line|
record = recordize(line.chomp) or next
process(*record) do |output_record|
emit output_record
end
+ monitor.periodically(record.to_s[0..1000])
end
after_stream
end
def each_record &block
@@ -62,19 +67,55 @@
#
# Process each record in turn, yielding the records to emit
#
def process *args, &block
- raise "override the process method in your implementation: it should process each record."
end
#
# To track processing errors inline,
# pass the line back to bad_record!
#
def bad_record! key, *args
warn "Bad record #{args.inspect[0..400]}"
puts ["bad_record-"+key, *args].join("\t")
end
+
+ # A periodic logger to track progress
+ def monitor
+ @monitor ||= PeriodicMonitor.new
+ end
+
+ # Defines a process method on the fly to execute the given mapper.
+ #
+ # This is still experimental.
+ # Among other limitations, you can't use ++yield++ -- you have to call
+ # emit() directly.
+ def mapper &mapper_block
+ @mapper_block = mapper_block.to_proc
+ self.instance_eval do
+ def process *args, &block
+ instance_exec(*args, &@mapper_block)
+ end
+ end
+ self
+ end
+
+ # Creates a new object of this class and injects the given block
+ # as the process method
+ def self.mapper *args, &block
+ self.new.mapper *args, &block
+ end
+
+ # Delegates back to Wukong to run this instance as a mapper
+ def run options={}
+ Wukong.run(self, nil, options)
+ end
+
+ # Creates a new object of this class and runs it
+ def self.run options={}
+ Wukong.run(self.new, nil, options)
+ end
+
end
end
end