#!/usr/bin/env ruby
$: << File.dirname(__FILE__)+'/../../lib'
require 'wukong'

#
#
#

module PageRank
  #
  # Damping factor (prob. of a 'random' jump)
  # 0.85 works well in practice. See http://en.wikipedia.org/wiki/Pagerank
  #
  DAMPING_FACTOR = 0.85

  #
  # Each user's line looks like
  #
  #   user_a    pagerank        id1,id2,...,idN
  #
  # we need to disperse this user's pagerank to each of id1..idN, and
  # rendezvous the list of outbound links at user_a's reducer as well.
  #
  module Iterating
    class Mapper < Wukong::Streamer::Base
      #
      # Send pagerank to each page, and send the dests list back to self
      #
      def process src, pagerank, dests_str, &block
        # This lets us use Pig to generate the input
        dests_str = dests_str.gsub(/[\(\{\}\)]/, '')
        dests     = dests_str.split(",")
        yield_pagerank_shares src, pagerank, dests, &block
        yield_own_dest_list   src, dests_str,       &block
      end

      #
      # Take the source node's pagerank and distribute it among all the out-nodes
      #
      def yield_pagerank_shares src, pagerank, dests
        pagerank_share = pagerank.to_f / dests.length
        dests.each do |dest|
          yield [dest, 'p', pagerank_share]
        end
      end

      #
      # Dispatch this user's out-node list to rendezvous with itself.
      #
      def yield_own_dest_list src, dests_str
        yield [src, 'd', dests_str]
      end
    end

    class Reducer < Wukong::Streamer::AccumulatingReducer
      attr_accessor :node_id, :pagerank, :dests_str
      # Begin reduction with 0 accumulated pagerank and no dests as yet
      def start! node_id, *args
        self.node_id   = node_id
        self.pagerank  = 0.0
        self.dests_str = nil
      end
      # We'll receive fractional pagerank from all incoming edges,
      # and the destination list from this node's map stage
      def accumulate node_id, what, val
        case what
        when 'p' then self.pagerank += val.to_f
        when 'd' then self.dests_str = val
        else     raise "Don't know how to accumulate #{[node_id, what, val].inspect}"
        end
      end
      # To finalize, dump the damped pagerank and dest list
      # in a form that can be fed back into this script
      def finalize
        damped_pagerank = (self.pagerank * DAMPING_FACTOR) + (1 - DAMPING_FACTOR)
        self.dests_str = 'dummy' if self.dests_str.blank?
        yield [node_id, damped_pagerank, dests_str]
      end
    end

    class Script < Wukong::Script
      def default_options
        super.merge :extra_args => ' -jobconf io.sort.record.percent=0.25 '
      end
    end
    Script.new(Mapper, Reducer).run
  end
end