h1. Using Wukong and Wuclan, Part 3 - Parsing In part 2 we begain a scraper to trawl our desired part of the social web. Now we're ready to start using Wukong to process the files. Files come off the wire as :url :scraped_at :response_code :response_message :contents String DateTime (flat) Integer String String (JSON-formatted, tab&newline-munged) The contents field is a JSON-formatted mix of records: * TwitterFollowersRequest and TwitterFriendsRequest yield an @Array[Hash{user => raw_tweet}]@. We want to extract a stream of AFollowsB (with the request user as user_a for a friends request and user_b for a followers request) along with the included Tweet, TwitterUser, TwitterUserProfile and TwitterUserStyle records. * TwitterFavoritesRequest yields an array of @Array[Hash{tweet_hash => user_hash}]. We want to extract a stream of AFavoritesB along with the included Tweet, TwitterUser, TwitterUserProfile and TwitterUserStyle records * TwitterUser yields a single @user_hash@ making one each of TwitterUser, TwitterUserProfile and TwitterUserStyle. * UserTimelineRequest and PublicTimelineRequest yield an Array[Hash{tweet => user}]. We want to extract the included Tweet, TwitterUser, TwitterUserProfile and TwitterUserStyle records. * TwitterFollowerIdsRequest and TwitterFriendIdsRequest return an Array[user_ids] (each user_id is a simple Integer). We extract a series of AFollowsB (using the request's user_id as user_a_id or user_b_id) We want to split each API response into a stream of those TwitterUser, Tweet, etc. records. # Stream in each line (each line holds one request) # turn the line into the corresponding TwitterRequest # have the TwitterRequest parse its JSON contents and construct the TwitterUser, Tweet, etc. # seriealize those records back out as tab-separated lines suitable for further processing with Wukong h4. The basics of StructStreamer Wukong handles the first and last steps through its StructStreamer and the standard .to_flat method. So the actual structure is really simple: # # Instantiate each incoming request. # Stream out the contained classes it generates. # class TwitterRequestParser < Wukong::Streamer::StructStreamer def process request request.parse do |obj| yield obj end end end # This makes the script go. Wukong::Script.new(TwitterRequestParser, nil).run In practice, all you need to know is that a StructStreamer gets a stream of objects to parse. Here's an outline of its internals. The Wukong StructStreamer: # takes each flattened line: "twitter_friends_request http://.... 20090701123456 ...fields... [{...}, {...}, ...json..., {...}]" # splits by tabs to create an array of fields ["twitter_friends_request", "http://...", ... "[{...}, {...}, ...json..., {...}]"] # constructs the class name indicated in the first field, using the values extracted from the remaining fields. TwitterFriendsRequest.new "http://...", "20090701123456", ... "[{...}, {...}, ...json..., {...}]" The last (contents) field is still just a string: there's nothing special about it to Wukong. h4. Parsing Since each requests' contents are handled in a slightly (and brittle-ly) different manner, we just ask each request object to parse itself and feed out all the TwitterXXXX objects it generates. class TwitterFollowersRequest # ... def parse &block return unless healthy? # for each raw user/tweet pair in the parsed JSON contents, parsed_contents.each do |hsh| json_obj = JsonUserWithTweet.new(hsh, 'scraped_at' => scraped_at) next unless json_obj && json_obj.healthy? # Extract user, tweet and relationship yield AFollowsB.new(json_obj.user.id, self.twitter_user_id) if json_obj.user json_obj.each(&block) end end # ... end The TwitterXXXRequest objects consist of one or many hashes with (a raw user hash, and possibly its latest raw tweet hash) or (a raw tweet hash and its raw user hash). The user hash might have only the fields for a TwitterPartialUser or it might have the fields for a full set of TwitterUser, TwitterUserProfile, TwitterUserStyle. Besides which, the fields themselves need some massaging to be compatible with Wukong and other tools in our Map/Reduce toolkit (details explained in a later section). The fiddly little details are handled by a JsonUserWithTweet or JsonTweetWithUser (as appropriate) adapter pattern: class JsonUserTweetPair def initialize raw, moreinfo # clean up fields in entries (flatten date, true/false -> 1/0, etc) fix_raw_user! fix_raw_tweet! end # generate all the contained TwitterXXX objects def each # end # create TwitterUser object from raw info def user end # create Tweet object from raw tweet hash def tweet end # ... and so forth end I'll ignore the gory details; view the source if you're interested. h4. Running the script Here, again, is the code (in full!) for the twitter_request_parser.rb script. # # Instantiate each incoming request. # Stream out the contained classes it generates. # class TwitterRequestParser < Wukong::Streamer::StructStreamer def process request request.parse do |obj| yield obj end end end # This makes the script go. Wukong::Script.new(TwitterRequestParser, nil).run That last line is the runner: it makes this a Wukong script with a map phase only. (We'll add in a reducer later on.)