Sha256: 2026cf7f7698f8a6b470825c9946b78b7320e27918d876cd7c3b114a650a6c81

Contents?: true

Size: 1.95 KB

Versions: 10

Compression:

Stored size: 1.95 KB

Contents

module Wukong
  module Streamer
    class Base

      # Options, initially set from the command-line args -- see
      # Script#process_argv!
      attr_accessor :options

      #
      # Accepts option hash from script runner
      #
      def initialize options={}
        self.options = options
      end

      #
      # Pass each record to +#process+
      #
      def stream
        Log.info("Streaming on:\t%s" % [Script.input_file]) unless Script.input_file.blank?
        before_stream
        each_record do |line|
          record = recordize(line.chomp) or next
          process(*record) do |output_record|
            emit output_record
          end
        end
        after_stream
      end

      def each_record &block
        $stdin.each(&block)
      end

      # Called exactly once, before streaming begins
      def before_stream
      end

      # Called exactly once, after streaming completes
      def after_stream
      end

      #
      # Default recordizer: returns array of fields by splitting at tabs
      #
      def recordize line
        line.split("\t") rescue nil
      end

      #
      # Serializes the record to output.
      #
      # Emits a single line of tab-separated fields created by calling #to_flat
      # on the record and joining with "\t".
      #
      # Does no escaping or processing of the record -- that's to_flat's job, or
      # yours if you override this method.
      #
      def emit record
        puts record.to_flat.join("\t")
      end

      #
      # Process each record in turn, yielding the records to emit
      #
      def process *args, &block
        raise "override the process method in your implementation: it should process each record."
      end

      #
      # To track processing errors inline,
      # pass the line back to bad_record!
      #
      def bad_record! key, *args
        warn "Bad record #{args.inspect[0..400]}"
        puts ["bad_record-"+key, *args].join("\t")
      end
    end
  end
end

Version data entries

10 entries across 10 versions & 1 rubygems

Version Path
wukong-1.5.4 lib/wukong/streamer/base.rb
wukong-1.5.3 lib/wukong/streamer/base.rb
wukong-1.5.2 lib/wukong/streamer/base.rb
wukong-1.5.1 lib/wukong/streamer/base.rb
wukong-1.5.0 lib/wukong/streamer/base.rb
wukong-1.4.12 lib/wukong/streamer/base.rb
wukong-1.4.11 lib/wukong/streamer/base.rb
wukong-1.4.10 lib/wukong/streamer/base.rb
wukong-1.4.9 lib/wukong/streamer/base.rb
wukong-1.4.7 lib/wukong/streamer/base.rb