Sha256: 2026cf7f7698f8a6b470825c9946b78b7320e27918d876cd7c3b114a650a6c81
Contents?: true
Size: 1.95 KB
Versions: 10
Compression:
Stored size: 1.95 KB
Contents
module Wukong module Streamer class Base # Options, initially set from the command-line args -- see # Script#process_argv! attr_accessor :options # # Accepts option hash from script runner # def initialize options={} self.options = options end # # Pass each record to +#process+ # def stream Log.info("Streaming on:\t%s" % [Script.input_file]) unless Script.input_file.blank? before_stream each_record do |line| record = recordize(line.chomp) or next process(*record) do |output_record| emit output_record end end after_stream end def each_record &block $stdin.each(&block) end # Called exactly once, before streaming begins def before_stream end # Called exactly once, after streaming completes def after_stream end # # Default recordizer: returns array of fields by splitting at tabs # def recordize line line.split("\t") rescue nil end # # Serializes the record to output. # # Emits a single line of tab-separated fields created by calling #to_flat # on the record and joining with "\t". # # Does no escaping or processing of the record -- that's to_flat's job, or # yours if you override this method. # def emit record puts record.to_flat.join("\t") end # # Process each record in turn, yielding the records to emit # def process *args, &block raise "override the process method in your implementation: it should process each record." end # # To track processing errors inline, # pass the line back to bad_record! # def bad_record! key, *args warn "Bad record #{args.inspect[0..400]}" puts ["bad_record-"+key, *args].join("\t") end end end end
Version data entries
10 entries across 10 versions & 1 rubygems