Here's a script to count words in a text stream: require 'wukong' module WordCount class Mapper < Wukong::Streamer::LineStreamer # Emit each word in the line. def process line words = line.strip.split(/\W+/).reject(&:blank?) words.each{|word| yield [word, 1] } end end class Reducer < Wukong::Streamer::ListReducer def finalize yield [ key, values.map(&:last).map(&:to_i).sum ] end end end Wukong::Script.new( WordCount::Mapper, WordCount::Reducer ).run # Execute the script The first class, the Mapper, eats lines and craps @[word, count]@ records. Here the /key/ is the word, and the /value/ is its count. The second class is an example of an accumulated list reducer. The values for each key are stacked up into a list; then the record(s) yielded by @#finalize@ are emitted. Here's another way to write the Reducer: accumulate the count of each line, then yield the sum in @#finalize@: class Reducer2 < Wukong::Streamer::AccumulatingReducer attr_accessor :key_count def start! *args self.key_count = 0 end def accumulate(word, count) self.key_count += count.to_i end def finalize yield [ key, key_count ] end end Of course you can be really lazy (that is, smart) and write your script instead as class Script < Wukong::Script def reducer_command 'uniq -c' end end h2. Structured data All of these deal with unstructured data. Wukong also lets you view your data as a stream of structured objects. Let's say you have a blog; its records look like Post = Struct.new( :id, :created_at, :user_id, :title, :body, :link ) Comment = Struct.new( :id, :created_at, :post_id, :user_id, :body ) User = Struct.new( :id, :username, :fullname, :homepage, :description ) UserLoc = Struct.new( :user_id, :text, :lat, :lng ) You've been using "twitter":http://twitter.com for a long time, and you've written something that from now on will inject all your tweets as Posts, and all replies to them as Comments (by a common 'twitter_bot' account on your blog). What about the past two years' worth of tweets? Let's assume you're so chatty that a Map/Reduce script is warranted to handle the volume. Cook up something that scrapes your tweets and all replies to your tweets: Tweet = Struct.new( :id, :created_at, :twitter_user_id, :in_reply_to_user_id, :in_reply_to_status_id, :text ) TwitterUser = Struct.new( :id, :username, :fullname, :homepage, :location, :description ) Now we'll just process all those in a big pile, converting to Posts, Comments and Users as appropriate. Serialize your scrape results so that each Tweet and each TwitterUser is a single lines containing first the class name ('tweet' or 'twitter_user') followed by its constituent fields, in order, separated by tabs. The RecordStreamer takes each such line, constructs its corresponding class, and instantiates it with the require 'wukong' require 'my_blog' #defines the blog models module TwitBlog class Mapper < Wukong::Streamer::RecordStreamer # Watch for tweets by me MY_USER_ID = 24601 # structs for our input objects Tweet = Struct.new( :id, :created_at, :twitter_user_id, :in_reply_to_user_id, :in_reply_to_status_id, :text ) TwitterUser = Struct.new( :id, :username, :fullname, :homepage, :location, :description ) # # If this is a tweet is by me, convert it to a Post. # # If it is a tweet not by me, convert it to a Comment that # will be paired with the correct Post. # # If it is a TwitterUser, convert it to a User record and # a user_location record # def process record case record when TwitterUser user = MyBlog::User.new.merge(record) # grab the fields in common user_loc = MyBlog::UserLoc.new(record.id, record.location, nil, nil) yield user yield user_loc when Tweet if record.twitter_user_id == MY_USER_ID post = MyBlog::Post.new.merge record post.link = "http://twitter.com/statuses/show/#{record.id}" post.body = record.text post.title = record.text[0..65] + "..." yield post else comment = MyBlog::Comment.new.merge record comment.body = record.text comment.post_id = record.in_reply_to_status_id yield comment end end end end end Wukong::Script.new( TwitBlog::Mapper, nil ).run # identity reducer h2. Uniqifying The script above uses the identity reducer: every record from the mapper is sent to the output. But what if you had grabbed the replying user's record every time you saw a reply? Fine, so pass it through @uniq@. But what if a user updated their location or description during this time? You'll want to probably use UniqByLastReducer Location might want to take the most /frequent/, and might want as well to geolocate the location text. Use a ListReducer, find the most frequent element, then finally call the expensive geolocation method. h2. A note about keys Now we're going to write this using the synthetic keys already extant in the twitter records, making the unwarranted assumption that they won't collide with the keys in your database. Map/Reduce paradigm does badly with synthetic keys. Synthetic keys demand locality, and map/reduce's remarkable scaling comes from not assuming locality. In general, write your map/reduce scripts to use natural keys (the scre h1. More info There are many useful examples (including an actually-useful version of this WordCount script) in examples/ directory.