Skip to content

Commit f598d74

Browse files
authored
Mahadev virtual thread 1brc (gunnarmorling#611)
* Read file with multiple virtual threads and process chunks of file data in parallel. * Updated logic to bucket every chunk of aggs into a vector and merge them into a TreeMap for printing. * Virtual Thread / File Channels Impl. * Renamed files with GHUsername. * Added statement to get vals before updating. * Added executable permission to the files.
1 parent f5bddaf commit f598d74

File tree

3 files changed

+191
-0
lines changed

3 files changed

+191
-0
lines changed

calculate_average_mahadev-k.sh

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#!/bin/sh
2+
#
3+
# Copyright 2023 The original authors
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
JAVA_OPTS=""
19+
java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_mahadev_k

prepare_mahadev-k.sh

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#!/bin/bash
2+
#
3+
# Copyright 2023 The original authors
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
# Uncomment below to use sdk
19+
# source "$HOME/.sdkman/bin/sdkman-init.sh"
20+
# sdk use java 21.0.1-graal 1>&2
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Copyright 2023 The original authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package dev.morling.onebrc;
17+
18+
import java.io.FileDescriptor;
19+
import java.io.FileOutputStream;
20+
import java.io.IOException;
21+
import java.io.PrintStream;
22+
import java.io.RandomAccessFile;
23+
import java.io.UnsupportedEncodingException;
24+
import java.nio.ByteBuffer;
25+
import java.nio.charset.StandardCharsets;
26+
import java.util.Map;
27+
import java.util.StringTokenizer;
28+
import java.util.concurrent.ConcurrentSkipListMap;
29+
import java.util.concurrent.Executors;
30+
import java.util.concurrent.ThreadFactory;
31+
32+
public class CalculateAverage_mahadev_k {
33+
34+
private static final String FILE = "./measurements.txt";
35+
36+
private static Map<String, MeasurementAggregator> stationMap = new ConcurrentSkipListMap<>();
37+
38+
private static double round(double value) {
39+
return Math.round(value * 10.0) / 10.0;
40+
}
41+
42+
private static class MeasurementAggregator {
43+
double minima = Double.POSITIVE_INFINITY, maxima = Double.NEGATIVE_INFINITY, total = 0, count = 0;
44+
45+
public synchronized void accept(double value) {
46+
if (minima > value)
47+
minima = value;
48+
if (maxima < value)
49+
maxima = value;
50+
total += value;
51+
count++;
52+
}
53+
54+
public double min() {
55+
return round(minima);
56+
}
57+
58+
public double max() {
59+
return round(maxima);
60+
}
61+
62+
public double avg() {
63+
return round((Math.round(total * 10.0) / 10.0) / count);
64+
}
65+
}
66+
67+
public static void main(String[] args) throws IOException {
68+
int chunkSize = args.length == 1 ? Integer.parseInt(args[0]) : 1_000_000;
69+
readAndProcess(chunkSize);
70+
print();
71+
}
72+
73+
public static void readAndProcess(int chunkSize) {
74+
final ThreadFactory factory = Thread.ofVirtual().name("routine-", 0).factory();
75+
76+
try (RandomAccessFile file = new RandomAccessFile(FILE, "r")) {
77+
try (var executor = Executors.newThreadPerTaskExecutor(factory)) {
78+
79+
var channel = file.getChannel();
80+
var size = channel.size();
81+
long start = 0;
82+
while (start <= size) {
83+
long end = start + chunkSize;
84+
String letter = "";
85+
do {
86+
end--;
87+
ByteBuffer buffer = ByteBuffer.allocate(1);
88+
channel.read(buffer, end);
89+
buffer.flip();
90+
letter = StandardCharsets.UTF_8.decode(buffer).toString();
91+
} while (!letter.equals("\n"));
92+
93+
if (end < start)
94+
end = start + chunkSize;
95+
96+
final long currentStart = start;
97+
final long currentEnd = end;
98+
executor.submit(() -> {
99+
ByteBuffer buffer = ByteBuffer.allocate((int) (currentEnd - currentStart + 1));
100+
try {
101+
channel.read(buffer, currentStart);
102+
}
103+
catch (IOException e) {
104+
e.printStackTrace();
105+
}
106+
buffer.flip();
107+
String data = StandardCharsets.UTF_8.decode(buffer).toString();
108+
processData(data);
109+
});
110+
start = end + 1;
111+
}
112+
}
113+
114+
}
115+
catch (IOException e) {
116+
e.printStackTrace();
117+
}
118+
}
119+
120+
public static void processData(String dataBlock) {
121+
StringTokenizer tokenizer = new StringTokenizer(dataBlock, "\n");
122+
while (tokenizer.hasMoreElements()) {
123+
StringTokenizer tokens = new StringTokenizer(tokenizer.nextToken(), ";");
124+
String station = tokens.nextToken();
125+
double value = Double.parseDouble(tokens.nextToken());
126+
processMinMaxMean(station, value);
127+
}
128+
}
129+
130+
private static void processMinMaxMean(String station, double temp) {
131+
var values = stationMap.get(station);
132+
if (values == null) {
133+
values = new MeasurementAggregator();
134+
stationMap.putIfAbsent(station, values);
135+
}
136+
values = stationMap.get(station);
137+
values.accept(temp);
138+
}
139+
140+
public static void print() throws UnsupportedEncodingException {
141+
System.setOut(new PrintStream(new FileOutputStream(FileDescriptor.out), true, StandardCharsets.UTF_8));
142+
System.out.print("{");
143+
int i = stationMap.size();
144+
for (var kv : stationMap.entrySet()) {
145+
System.out.printf("%s=%s/%s/%s", kv.getKey(), kv.getValue().min(), kv.getValue().avg(), kv.getValue().max());
146+
if (i > 1)
147+
System.out.print(", ");
148+
i--;
149+
}
150+
System.out.println("}");
151+
}
152+
}

0 commit comments

Comments
 (0)