LOAD common_pages FROM 'data/common_pages' AS (ip:chararray, from_path:chararray, into_path:chararray); -- -- Build adjacency list from edges (, , ) -- adj_list_j = GROUP common_pages BY from_path; adj_list = FOREACH adj_list_j GENERATE group AS from_path, 1.0F AS pagerank:float, common_pages.(dest) AS into_paths ; STORE adj_list INTO 'data/pagerank/pr_iter_00'; -- -- Iterate pagerank to become -- -- find partial share: A.rank / A.into_paths.length -- dispatch to each page sent_shares = FOREACH adj_list GENERATE FLATTEN(into_paths) AS path, (float)(pagerank / (float)SIZE(into_paths)) AS share:float; -- dispatch to yourself, so you have the links still around sent_edges = FOREACH adj_list GENERATE from_path AS path, into_paths; -- assemble all the received shared, and the self-sent edge list; rcvd_shares = COGROUP sent_edges BY path INNER, sent_shares BY path PARALLEL $PARALLEL; -- calculate the new rank, and emit a record that looked just like the input. next_iter = FOREACH rcvd_shares { raw_rank = (float)SUM(sent_shares.share); -- treat the case that a node has no in links damped_rank = ((raw_rank IS NOT NULL AND raw_rank > 1.0e-12f) ? raw_rank*0.85f + 0.15f : 0.0f); GENERATE group AS from_path, damped_rank AS rank, FLATTEN(sent_edges.into_paths) ; }; STORE next_iter INTO 'data/pagerank/pr_iter_01';