1
+ /*
2
+ * Copyright 2023 The original authors
3
+ *
4
+ * Licensed under the Apache License, Version 2.0 (the "License");
5
+ * you may not use this file except in compliance with the License.
6
+ * You may obtain a copy of the License at
7
+ *
8
+ * http://www.apache.org/licenses/LICENSE-2.0
9
+ *
10
+ * Unless required by applicable law or agreed to in writing, software
11
+ * distributed under the License is distributed on an "AS IS" BASIS,
12
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
+ * See the License for the specific language governing permissions and
14
+ * limitations under the License.
15
+ */
16
+ package dev .morling .onebrc ;
17
+
18
+ import java .io .*;
19
+ import java .nio .ByteBuffer ;
20
+ import java .nio .MappedByteBuffer ;
21
+ import java .nio .channels .FileChannel ;
22
+ import java .nio .charset .StandardCharsets ;
23
+ import java .nio .file .Path ;
24
+ import java .nio .file .Paths ;
25
+ import java .nio .file .StandardOpenOption ;
26
+ import java .util .*;
27
+ import static java .nio .channels .FileChannel .MapMode .READ_ONLY ;
28
+
29
+ public class CalculateAverage_mattiz {
30
+ private static final int TWO_BYTE_TO_INT = 480 + 48 ; // 48 is the ASCII code for '0'
31
+ private static final int THREE_BYTE_TO_INT = 4800 + 480 + 48 ;
32
+ private static final String FILE = "./measurements.txt" ;
33
+ public static final int PARTS = 8 ;
34
+
35
+ public static void main (String [] args ) throws Exception {
36
+ var result = new CalculateAverage_mattiz ().calculate (FILE , PARTS );
37
+ System .out .println (result );
38
+ }
39
+
40
+ StationList calculate (String file , int numParts ) throws Exception {
41
+ var buffers = createBuffers (Paths .get (file ), numParts );
42
+
43
+ return buffers
44
+ .parallelStream ()
45
+ .map (this ::aggregate )
46
+ .reduce (StationList ::merge )
47
+ .orElseThrow ();
48
+ }
49
+
50
+ record BufferAndSize (ByteBuffer buffer , long size ) {
51
+ }
52
+
53
+ List <ByteBuffer > createBuffers (Path file , int numParts ) throws IOException {
54
+ FileChannel fileChannel = FileChannel .open (file , StandardOpenOption .READ );
55
+
56
+ var fileSize = fileChannel .size ();
57
+
58
+ if (fileSize < (1024 * 1024 )) { // Only one core for small files
59
+ numParts = 1 ;
60
+ }
61
+
62
+ var chunkSize = fileSize / numParts ;
63
+ var buffers = new ArrayList <ByteBuffer >();
64
+ long filePointer = 0 ;
65
+
66
+ for (int i = 0 ; i < numParts ; i ++) {
67
+ if (i != numParts - 1 ) { // not last element
68
+ var adjustedChunkSize = getBuffer (fileChannel , filePointer , chunkSize , true );
69
+ buffers .add (adjustedChunkSize .buffer ());
70
+ filePointer += adjustedChunkSize .size ();
71
+ }
72
+ else {
73
+ var adjustedChunkSize = getBuffer (fileChannel , filePointer , fileSize - filePointer , false );
74
+ buffers .add (adjustedChunkSize .buffer ());
75
+ }
76
+ }
77
+
78
+ return buffers ;
79
+ }
80
+
81
+ BufferAndSize getBuffer (FileChannel fileChannel , long start , long size , boolean adjust ) throws IOException {
82
+ MappedByteBuffer buffer = fileChannel .map (READ_ONLY , start , size );
83
+
84
+ var actualSize = ((int ) size );
85
+
86
+ if (adjust ) {
87
+ while (buffer .get (actualSize - 1 ) != '\n' ) {
88
+ actualSize --;
89
+ }
90
+ }
91
+
92
+ buffer .limit (actualSize );
93
+
94
+ return new BufferAndSize (buffer , actualSize );
95
+ }
96
+
97
+ private StationList aggregate (ByteBuffer buffer ) {
98
+ var measurements = new StationList ();
99
+
100
+ while (buffer .hasRemaining ()) {
101
+ int startPos = buffer .position ();
102
+
103
+ byte b ;
104
+ int hash = 0 ;
105
+ while ((b = buffer .get ()) != ';' ) {
106
+ hash = ((hash << 5 ) - hash ) + b ;
107
+ }
108
+
109
+ if (hash < 0 ) {
110
+ hash = -hash ;
111
+ }
112
+
113
+ int length = buffer .position () - startPos - 1 ;
114
+ byte [] station = new byte [length ];
115
+ buffer .get (startPos , station );
116
+
117
+ int value = readValue (buffer );
118
+
119
+ measurements .update (station , length , hash , value );
120
+ }
121
+
122
+ return measurements ;
123
+ }
124
+
125
+ /*
126
+ * Read decimal number from ascii characters (copied from arjenw)
127
+ *
128
+ * Example:
129
+ * If you have the decimal number 1.4,
130
+ * then byte 1 contain 49 (ascii code for '1')
131
+ * and byte 3 contain 52 (ascii code for '4')
132
+ * Subtract 480 + 48 (48 is the ASCII code for '0')
133
+ * to move number from ascii number to int
134
+ *
135
+ * 49 * 10 + 52 - 528 = 14
136
+ */
137
+ private static int readValue (ByteBuffer buffer ) {
138
+ int value ;
139
+ byte b1 = buffer .get ();
140
+ byte b2 = buffer .get ();
141
+ byte b3 = buffer .get ();
142
+ byte b4 = buffer .get ();
143
+
144
+ if (b2 == '.' ) {// value is n.n
145
+ value = (b1 * 10 + b3 - TWO_BYTE_TO_INT );
146
+ }
147
+ else {
148
+ if (b4 == '.' ) { // value is -nn.n
149
+ value = -(b2 * 100 + b3 * 10 + buffer .get () - THREE_BYTE_TO_INT );
150
+ }
151
+ else if (b1 == '-' ) { // value is -n.n
152
+ value = -(b2 * 10 + b4 - TWO_BYTE_TO_INT );
153
+ }
154
+ else { // value is nn.n
155
+ value = (b1 * 100 + b2 * 10 + b4 - THREE_BYTE_TO_INT );
156
+ }
157
+ buffer .get (); // new line
158
+ }
159
+ return value ;
160
+ }
161
+ }
162
+
163
+ class CustomMap {
164
+ private static final int SIZE = 1024 * 64 ;
165
+ private final Station [] stationList = new Station [SIZE ];
166
+
167
+ public void addOrUpdate (byte [] stationName , int length , int hash , int value ) {
168
+ int slot = hash & (SIZE - 1 );
169
+ var station = stationList [slot ];
170
+
171
+ while (station != null
172
+ && station .getHash () != hash
173
+ && !Arrays .equals (
174
+ station .getName (), 0 , station .getName ().length ,
175
+ stationName , 0 , length )) {
176
+
177
+ slot = (slot + 1 ) & (SIZE - 1 );
178
+ station = stationList [slot ];
179
+ }
180
+
181
+ if (station == null ) {
182
+ stationList [slot ] = new Station (stationName , hash );
183
+ }
184
+
185
+ stationList [slot ].add (value );
186
+ }
187
+
188
+ public Station get (byte [] stationName ) {
189
+ return stationList [findSlot (stationName )];
190
+ }
191
+
192
+ public void put (byte [] stationName , Station newStation ) {
193
+ stationList [findSlot (stationName )] = newStation ;
194
+ }
195
+
196
+ private int findSlot (byte [] stationName ) {
197
+ int hash = getHash (stationName );
198
+ int slot = hash & (SIZE - 1 );
199
+ var station = stationList [slot ];
200
+
201
+ while (station != null
202
+ && station .getHash () != hash
203
+ && !Arrays .equals (station .getName (), stationName )) {
204
+
205
+ slot = (slot + 1 ) & (SIZE - 1 );
206
+ station = stationList [slot ];
207
+ }
208
+
209
+ return slot ;
210
+ }
211
+
212
+ private int getHash (byte [] key ) {
213
+ int hash = 0 ;
214
+
215
+ for (byte b : key ) {
216
+ hash = hash * 31 + b ;
217
+ }
218
+
219
+ if (hash < 0 ) {
220
+ hash = -hash ;
221
+ }
222
+
223
+ return hash ;
224
+ }
225
+
226
+ public Set <Map .Entry <byte [], Station >> entrySet () {
227
+ var sorted = new HashMap <byte [], Station >();
228
+
229
+ for (var s : stationList ) {
230
+ if (s != null ) {
231
+ sorted .put (s .getName (), s );
232
+ }
233
+ }
234
+
235
+ return sorted .entrySet ();
236
+ }
237
+
238
+ public Map <String , Station > sorted () {
239
+ var sorted = new TreeMap <String , Station >();
240
+
241
+ for (var s : stationList ) {
242
+ if (s != null ) {
243
+ sorted .put (new String (s .getName (), StandardCharsets .UTF_8 ), s );
244
+ }
245
+ }
246
+
247
+ return sorted ;
248
+ }
249
+ }
250
+
251
+ class StationList {
252
+ private final CustomMap stations = new CustomMap ();
253
+
254
+ public void update (byte [] stationName , int length , int hash , int value ) {
255
+ stations .addOrUpdate (stationName , length , hash , value );
256
+ }
257
+
258
+ public StationList merge (StationList other ) {
259
+ for (var aggregator : other .stations .entrySet ()) {
260
+ var agg = stations .get (aggregator .getKey ());
261
+
262
+ if (agg == null ) {
263
+ stations .put (aggregator .getKey (), aggregator .getValue ());
264
+ }
265
+ else {
266
+ agg .merge (aggregator .getValue ());
267
+ }
268
+ }
269
+
270
+ return this ;
271
+ }
272
+
273
+ @ Override
274
+ public String toString () {
275
+ return stations .sorted ().toString ();
276
+ }
277
+ }
278
+
279
+ class Station {
280
+ private final byte [] name ;
281
+ private final int hash ;
282
+ private int min = Integer .MAX_VALUE ;
283
+ private int max = Integer .MIN_VALUE ;
284
+ private int sum ;
285
+ private int count ;
286
+
287
+ public Station (byte [] name , int hash ) {
288
+ this .name = name ;
289
+ this .hash = hash ;
290
+ }
291
+
292
+ public void add (int max , int min , int sum , int count ) {
293
+ this .max = Math .max (this .max , max );
294
+ this .min = Math .min (this .min , min );
295
+ this .sum += sum ;
296
+ this .count += count ;
297
+ }
298
+
299
+ public void add (int value ) {
300
+ this .max = Math .max (this .max , value );
301
+ this .min = Math .min (this .min , value );
302
+ this .sum += value ;
303
+ this .count ++;
304
+ }
305
+
306
+ public void merge (Station other ) {
307
+ this .max = Math .max (this .max , other .max );
308
+ this .min = Math .min (this .min , other .min );
309
+ this .sum += other .sum ;
310
+ this .count += other .count ;
311
+ }
312
+
313
+ public String toString () {
314
+ return (min / 10.0 ) + "/" + (Math .round (((double ) sum ) / count )) / 10.0 + "/" + (max / 10.0 );
315
+ }
316
+
317
+ public byte [] getName () {
318
+ return name ;
319
+ }
320
+
321
+ public int getHash () {
322
+ return hash ;
323
+ }
324
+ }
0 commit comments