Sha256: feab29d5fe0f32fd73c5e2f1f5d5cea5c94789252a2753a66968cd7e8c9f99bf
Contents?: true
Size: 1.46 KB
Versions: 19
Compression:
Stored size: 1.46 KB
Contents
#!/usr/bin/env ruby $: << File.dirname(__FILE__)+'/../../lib' require 'wukong' class Edge < Struct.new(:src, :dest) end class MultiEdge < Struct.new( :src, :dest, :a_follows_b, :b_follows_a, :a_replies_b, :b_replies_a, :a_favorites_b, :b_favorites_a ) end module Gen1HoodEdges class Mapper < Wukong::Streamer::Base def process rsrc, src, dest # next if (src.to_i == 0) || (dest.to_i == 0) yield [ dest, 'i', src ] yield [ src, 'o', dest] end end # # Accumulate ( !!in memory!!) all inbound links onto middle node # # Then for each outbound link, loop over those inbound links and emit the # triple (in, mid,out) # class Reducer < Wukong::Streamer::AccumulatingReducer attr_accessor :ins def start! *args self.ins = [] end def accumulate mid, dir, node case dir when 'i' self.ins << node if (self.ins.length % 1000 == 0) && (self.ins.length > 10000) $stderr.puts ["Accumulating:", mid, self.ins.length].join("\t") end when 'o' ins.each do |inn| yield ['path_2', inn, mid, node] end end end def finalize end def get_key mid, *_ mid end end class Script < Wukong::Script def default_options super.merge :sort_fields => 2, :partition_fields => 1 end end end # Execute the script Gen1HoodEdges::Script.new( Gen1HoodEdges::Mapper, Gen1HoodEdges::Reducer ).run
Version data entries
19 entries across 19 versions & 2 rubygems