Sha256: ed81db552f997049fd0c8d0bd92934f81d83c8f448a814ea2667eaa22bc27d7c

Contents?: true

Size: 1.26 KB

Versions: 3

Compression:

Stored size: 1.26 KB

Contents

#!/usr/bin/env ruby
require 'rubygems'
require 'wukong/script'

module Size
  #
  # Feed the entire dataset through wc and sum the results
  #
  class Script < Wukong::Script
    #
    # Don't implement a wukong script to do something if there's a unix command
    # that does it faster: just override map_command or reduce_command in your
    # subclass of Wukong::Script to return the complete command line
    #
    def map_command
      '/usr/bin/wc'
    end

    # Make all records go to one reducer
    def default_options
      super.merge :reduce_tasks => 1
    end
  end

  #
  # Sums the numeric value of each column in its input
  #
  class Reducer < Wukong::Streamer::Base
    attr_accessor :sums

    #
    # The unix +wc+ command uses whitespace, not tabs, so we'll recordize
    # accordingly.
    #
    def recordize line
      line.strip.split(/\s+/)
    end

    #
    # add each corresponding column in the input
    #
    def process *vals
      self.sums = vals.zip( sums || [] ).map{|val,sum| val.to_i + sum.to_i }
    end

    #
    # run through the whole reduction input and then output the total
    #
    def stream *args
      super *args
      emit sums
    end
  end
end

# Execute the script
Size::Script.new(
  nil,
  Size::Reducer,
  :reduce_tasks => 1
  ).run

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
wukong-3.0.0.pre old/examples/size.rb
wukong-2.0.2 examples/size.rb
wukong-2.0.1 examples/size.rb