1
+ import java .util .*;
2
+ import java .lang .*;
3
+ import java .io .*;
4
+
5
+ import org .apache .hadoop .io .IntWritable ;
6
+ import org .apache .hadoop .io .LongWritable ;
7
+ import org .apache .hadoop .io .Text ;
8
+
9
+ import org .apache .logging .log4j .Logger ;
10
+ import org .apache .logging .log4j .LogManager ;
11
+
12
+ import org .apache .hadoop .conf .Configuration ;
13
+ import org .apache .hadoop .fs .Path ;
14
+ import org .apache .hadoop .io .IntWritable ;
15
+ import org .apache .hadoop .io .Text ;
16
+ import org .apache .hadoop .mapreduce .Job ;
17
+ import org .apache .hadoop .mapreduce .Mapper ;
18
+ import org .apache .hadoop .mapreduce .Reducer ;
19
+ import org .apache .hadoop .mapreduce .lib .input .TextInputFormat ;
20
+ // import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
21
+ import org .apache .hadoop .mapreduce .lib .output .FileOutputFormat ;
22
+ import org .apache .hadoop .mapreduce .lib .chain .ChainMapper ;
23
+ // import org.apache.hadoop.mapred.JobConf;
24
+ import org .apache .hadoop .fs .FileSystem ;
25
+ // import java.io.OutputStreamWriter;
26
+ import org .apache .hadoop .conf .Configured ;
27
+ import org .apache .hadoop .util .Tool ;
28
+ import org .apache .hadoop .util .ToolRunner ;
29
+
30
+ public class CRT extends Configured implements Tool {
31
+ static Boolean OPTION ;
32
+ // static final int[] PRIMES = {2,3,5,7,11,13,17,19,23,29,31,37};
33
+ static final int [] PRIMES = {23 ,29 ,31 ,37 ,41 ,43 ,47 ,53 ,59 ,61 ,67 };
34
+ static final int NUMBER = 11 ;
35
+ static int FAILED_NUMBER ;
36
+ static int x1 ;
37
+ static int x2 ;
38
+
39
+ static final Logger logger = LogManager .getLogger ();
40
+ static Configuration conf = new Configuration ();
41
+
42
+ public static void addProperty () throws IOException {
43
+ Properties prop = new Properties ();
44
+ InputStream input = null ;
45
+ input = new FileInputStream ("config.properties" );
46
+ // load a properties file
47
+ prop .load (input );
48
+ // get the property value and print it out
49
+ OPTION = Boolean .valueOf (prop .getProperty ("OPTION" ));
50
+ FAILED_NUMBER = Integer .valueOf (prop .getProperty ("FAILED_NUMBER" ));
51
+ long mx = 1 ;
52
+ for (int i =0 ;i <PRIMES .length ;i ++)
53
+ mx = mx * PRIMES [i ];
54
+ long tmp = (long )(Math .random ()*50 )+1 ;
55
+ x1 = (int )(Math .sqrt (mx )-tmp );
56
+ tmp = (long )(Math .random ()*50 )+1 ;
57
+ x2 = (int )(Math .sqrt (mx )-tmp );
58
+ // System.out.println("X1 and X2 is: "+x1+" "+x2);
59
+ System .out .println ("The First Number=" +x1 );
60
+ System .out .println ("The Second Number=" +x2 );
61
+ // System.out.println("X1 and X2 is: "+x1+" "+x2);
62
+ }
63
+
64
+ private static int [] failNode (){
65
+ int n = NUMBER ;
66
+ int [] ans = new int [n ];
67
+ int [] rand_number = new int [n ];
68
+ Random rand = new Random ();
69
+ for (int i =0 ;i <n ;i ++){
70
+ rand_number [i ] = i ;
71
+ ans [i ] = 0 ;
72
+ }
73
+ for (int i =0 ;i <FAILED_NUMBER ;i ++){
74
+ int pos = rand .nextInt (n -i );
75
+ // swap(rand[n-i],rand[pos]);
76
+ int tmp = rand_number [pos ];
77
+ rand_number [pos ] = rand_number [n -i -1 ];
78
+ rand_number [n -i -1 ] = tmp ;
79
+ ans [tmp ] = 1 ;
80
+ }
81
+ return ans ;
82
+ }
83
+
84
+ public static void init () throws IOException {
85
+ int [] fail_node = failNode ();
86
+ int LEN = PRIMES .length ;
87
+ int [][] matrixA = new int [NUMBER ][LEN ];
88
+ // If with coding, the maximal mapper number equals to number, otherwise the maximal mapper number equals to the prime's number
89
+ int max_num = OPTION ? NUMBER : LEN ;
90
+
91
+ if (OPTION ){
92
+ // CRTMatrixA mtx = new CRTMatrixA(LEN,NUMBER,FAILED_NUMBER,PRIMES);
93
+ CRTMatrixD mtx = new CRTMatrixD (LEN ,NUMBER ,FAILED_NUMBER ,PRIMES );
94
+ matrixA = mtx .run ();
95
+ }
96
+
97
+ String tmpoutput ="hdfs://localhost:9000/user/mio/crt/input/data" ;
98
+ String output ;
99
+ FileSystem Phdfs = FileSystem .get (new Configuration ());
100
+ String fileStr = "" ;
101
+
102
+ for (int i =0 ;i <max_num ;i ++){
103
+ fileStr = (fail_node [i ]==1 ) ? "1 " : "0 " ; // 1 means the node will fail
104
+ if (OPTION ){
105
+ int multi = 1 ;
106
+ String tstr = "" ;
107
+ for (int j =0 ;j <LEN ;j ++){
108
+ if (matrixA [i ][j ]==1 ){
109
+ multi *= PRIMES [j ];
110
+ tstr += String .valueOf (PRIMES [j ])+" " ;
111
+ }
112
+ }
113
+ conf .set (String .valueOf (multi ),tstr );
114
+ fileStr += String .valueOf (multi )+" " +String .valueOf (x1 )+" " +String .valueOf (x2 );
115
+ System .out .println ("Config: " +tstr );
116
+ } else {
117
+ fileStr += String .valueOf (PRIMES [i ])+" " +String .valueOf (x1 )+" " +String .valueOf (x2 );
118
+ }
119
+ output = tmpoutput + String .valueOf (i ) +".txt" ;
120
+ Path fname =new Path (output );
121
+ BufferedWriter out =new BufferedWriter (new OutputStreamWriter (Phdfs .create (fname ,true )));
122
+ System .out .println (fileStr );
123
+ out .write (fileStr );
124
+ out .close ();
125
+ }
126
+ }
127
+
128
+ @ Override
129
+ public int run (String [] args ) throws Exception {
130
+ Job job = Job .getInstance (conf , "crt" );
131
+ job .setJarByClass (CRT .class );
132
+
133
+ job .setMapperClass (CRTMapper .class );
134
+ job .setMapOutputKeyClass (Text .class );
135
+ job .setMapOutputValueClass (IntWritable .class );
136
+
137
+ job .setReducerClass (CRTReducer .class );
138
+ job .setNumReduceTasks (1 );
139
+ job .setOutputKeyClass (Text .class );
140
+ job .setOutputValueClass (LongWritable .class );
141
+
142
+ TextInputFormat .addInputPath (job , new Path (args [0 ]));
143
+ FileOutputFormat .setOutputPath (job , new Path (args [1 ]));
144
+
145
+ return job .waitForCompletion (true ) ? 0 : 1 ;
146
+ }
147
+
148
+ public static void main (String [] args ) throws Exception {
149
+ addProperty ();
150
+ conf .setBoolean ("OPTION" ,OPTION );
151
+ System .out .println ("Config is: " +OPTION +" " +FAILED_NUMBER );
152
+ init ();
153
+ long start = new Date ().getTime ();
154
+ int exitCode = ToolRunner .run (new CRT (),args );
155
+ long end = new Date ().getTime ();
156
+ System .out .println ("Whole time=" +(end -start ));
157
+ System .exit (exitCode );
158
+ }
159
+ }
0 commit comments