Skip to content

Commit 156ed1c

Browse files
committedMay 31, 2021
Revert "Revert "Merge branch 'main' of https://github.com/edofazza/PageRank-Spark into main""
This reverts commit 8ff1f85.
1 parent 8ff1f85 commit 156ed1c

File tree

11 files changed

+2469
-10
lines changed

11 files changed

+2469
-10
lines changed
 

‎README.md

+22
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,24 @@
11
# PageRank-Spark
2+
Implementation of the MapReduce PageRank algorithm using the Spark framework both in Python and in Java.
23

4+
## How to run the algorithm
5+
Python version: `spark-submit page_rank.py <input file> <output> <number of iterations>`
6+
7+
Java version: `spark-submit --class PageRank <app Jar> <input file> <output> <number of iterations>`
8+
9+
## Input file
10+
The inputs to the program are pages from the Simple English Wikipedia. We will be using a pre-processed version of the Simple Wikipedia corpus in which the pages are stored in an XML format.
11+
The XML file can be found [here](wiki-micro.txt).
12+
13+
Each page of Wikipedia is represented in XML as follows:
14+
15+
<title>page name</title>
16+
...
17+
<revisionoptionalVal="xxx">
18+
...
19+
<textoptionalVal="yyy">page content</text>
20+
...
21+
</revision>
22+
23+
The pages have been "flattened" to be represented on a single line. The body text of the page also has all new lines converted to spaces to ensure it stays on one line in this representation.
24+
Links to other Wikipedia articles are of the form [[page name]] and **we considered only links in the _text_ section**.
473 Bytes
Binary file not shown.

‎pagerank_Java/src/main/java/PageRank.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import org.apache.spark.api.java.JavaPairRDD;
33
import org.apache.spark.api.java.JavaRDD;
44
import org.apache.spark.api.java.JavaSparkContext;
5-
import org.apache.spark.broadcast.Broadcast;
65
import scala.Tuple2;
76

87
import java.util.ArrayList;
@@ -13,16 +12,18 @@ public class PageRank {
1312
private static final double DUMPING_FACTOR = 0.8;
1413

1514
public static void main(String[] args) {
15+
if (args.length != 3) {
16+
System.err.println("Usage: PageRank <input path> <output path> <# of iterations>");
17+
System.exit(-1);
18+
}
19+
1620
// import context from Spark (distributed computing using yarn, set name of the application)
1721
SparkConf sparkConf = new SparkConf().setAppName("pageRankJava").setMaster("yarn");
1822
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
1923

2024
// import input data from txt file to rdd
2125
JavaRDD<String> inputDataRDD = javaSparkContext.textFile(args[0]);
2226

23-
// the damping factor (static) is broadcast
24-
Broadcast<Double> DUMPING_FACTOR_BR = javaSparkContext.broadcast(DUMPING_FACTOR);
25-
2627
// count number of nodes in the input dataset (the N number)
2728
long nodesNumber = inputDataRDD.count();
2829

@@ -45,7 +46,7 @@ public static void main(String[] args) {
4546

4647
// aggregate contributions for each node, compute final ranks
4748
pageRankRDD = consideredContributionsRDD.reduceByKey(Double::sum)
48-
.mapValues(summedContributions -> (1 - DUMPING_FACTOR_BR.value()) / nodesNumber + DUMPING_FACTOR_BR.value() * summedContributions);
49+
.mapValues(summedContributions -> (1 - DUMPING_FACTOR) / nodesNumber + DUMPING_FACTOR * summedContributions);
4950
}
5051

5152
// sort by value (pagerank)
1.44 KB
Binary file not shown.
9.05 KB
Binary file not shown.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#Generated by Maven
2+
#Mon May 31 17:47:46 WEST 2021
3+
version=1.0-SNAPSHOT
4+
groupId=baggins
5+
artifactId=pagerank_Java
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
PageRank.class
2+
DataParser.class
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
/home/hadoop/pagerank_Java/src/main/java/DataParser.java
2+
/home/hadoop/pagerank_Java/src/main/java/PageRank.java
Binary file not shown.

‎pagerank/page_rank.py ‎pagerank_Python/page_rank.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22
import re
33
import sys
44

5+
# the damping factor (static)
6+
DAMPING_FACTOR = 0.8
7+
58

69
def data_parser(line):
710
# get the index of the begin of the title
@@ -46,9 +49,6 @@ def spread_rank(node, outgoing_links, rank):
4649
# import input data from txt file to rdd
4750
input_data_rdd = sc.textFile(sys.argv[1], 2)
4851

49-
# the damping factor (static) is broadcast
50-
DAMPING_FACTOR_BR = sc.broadcast(0.8)
51-
5252
# count number of nodes in the input dataset (the N number)
5353
node_number = input_data_rdd.count()
5454

@@ -72,8 +72,8 @@ def spread_rank(node, outgoing_links, rank):
7272

7373
# aggregate contributions for each node, compute final ranks
7474
page_ranks = considered_contributions.reduceByKey(lambda x, y: x + y) \
75-
.mapValues(lambda summed_contributions: (float(1 - DAMPING_FACTOR_BR.value) / node_number) +
76-
(DAMPING_FACTOR_BR.value * float(summed_contributions)))
75+
.mapValues(lambda summed_contributions: (float(1 - DAMPING_FACTOR) / node_number) +
76+
(DAMPING_FACTOR * float(summed_contributions)))
7777

7878
# sort by value (pagerank)
7979
sorted_page_ranks = page_ranks.sortBy(lambda page: page[1], False, 12)

‎wiki-micro.txt

+2,427
Large diffs are not rendered by default.

0 commit comments

Comments
 (0)
Please sign in to comment.