Skip to content

Commit aee71b9

Browse files
My own solution -- memory mapping the files, running in parallel threads, using a state machine to parse the file (gunnarmorling#466)
* Golang implementation * Speed up by avoiding copying the lines * Memory mapping * Add script for testing * Now passing most of the tests * Refactor to composed method * Now using integer math throughout * Now using a state machine for parsing! * Refactoring state names * Enabling profiling * Running in parallel! * Fully parallel! * Refactor * Improve type safety of methods * The rounding problem is due to difference between Javas and Gos printf implementation * Converting my solution to Java * Merging results * Splitting the file in several buffers * Made it parallel! * Removed test file * Removed go implementation * Removed unused files * Add header to .sh file --------- Co-authored-by: Matteo Vaccari <[email protected]>
1 parent 0854152 commit aee71b9

File tree

2 files changed

+281
-0
lines changed

2 files changed

+281
-0
lines changed

calculate_average_xpmatteo.sh

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#!/bin/sh
2+
#
3+
# Copyright 2023 The original authors
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
JAVA_OPTS="--enable-preview"
19+
java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_xpmatteo
20+
Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
/*
2+
* Copyright 2023 The original authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package dev.morling.onebrc;
17+
18+
import java.io.File;
19+
import java.io.IOException;
20+
import java.io.RandomAccessFile;
21+
import java.nio.ByteBuffer;
22+
import java.nio.channels.FileChannel;
23+
import java.nio.file.Files;
24+
import java.nio.file.Path;
25+
import java.util.ArrayList;
26+
import java.util.List;
27+
import java.util.Objects;
28+
import java.util.TreeMap;
29+
import java.util.stream.Collectors;
30+
31+
@SuppressWarnings({ "ReassignedVariable", "StatementWithEmptyBody" })
32+
public class CalculateAverage_xpmatteo {
33+
34+
private static final String FILE = "./measurements.txt";
35+
36+
public static void main(String[] args) throws IOException, InterruptedException {
37+
var fileName = dataFileName(args);
38+
39+
try (
40+
var file = new RandomAccessFile(new File(fileName), "r");
41+
var channel = file.getChannel()) {
42+
var numCpus = Runtime.getRuntime().availableProcessors();
43+
var threads = split(channel, numCpus).stream()
44+
.map(Worker::new)
45+
.toList();
46+
threads.forEach(Thread::start);
47+
for (Worker thread : threads) {
48+
thread.join();
49+
}
50+
var results = threads.stream().map(Worker::getResults)
51+
.reduce(CalculateAverage_xpmatteo::merge)
52+
.orElseThrow();
53+
printCities(results);
54+
}
55+
}
56+
57+
public static class Worker extends Thread {
58+
private final ByteBuffer buffer;
59+
private Results results;
60+
61+
public Worker(ByteBuffer buffer) {
62+
this.buffer = buffer;
63+
}
64+
65+
@Override
66+
public void run() {
67+
this.results = parseData(this.buffer);
68+
}
69+
70+
public Results getResults() {
71+
return results;
72+
}
73+
}
74+
75+
protected static List<ByteBuffer> split(FileChannel channel, int numCpus) throws IOException {
76+
if (channel.size() < 10_000) {
77+
return List.of(channel.map(FileChannel.MapMode.READ_ONLY, 0, channel.size()));
78+
}
79+
80+
long[] increments = new long[numCpus + 1];
81+
for (int i = 0; i < numCpus; i++) {
82+
increments[i] = i * channel.size() / numCpus;
83+
// adjust the increments so that they start on the beginning of a city
84+
while (increments[i] > 0 && byteAt(channel, increments[i] - 1) != '\n') {
85+
increments[i]--;
86+
}
87+
}
88+
increments[numCpus] = channel.size();
89+
90+
List<ByteBuffer> result = new ArrayList<>(numCpus);
91+
for (int i = 0; i < numCpus; i++) {
92+
long from = increments[i];
93+
long to = increments[i + 1];
94+
result.add(channel.map(FileChannel.MapMode.READ_ONLY, from, to - from));
95+
}
96+
return result;
97+
}
98+
99+
private static byte byteAt(FileChannel channel, long offset) throws IOException {
100+
ByteBuffer buf = ByteBuffer.allocate(1);
101+
channel.position(offset);
102+
channel.read(buf);
103+
buf.flip();
104+
var bytes = new byte[1];
105+
buf.get(bytes);
106+
return bytes[0];
107+
}
108+
109+
public static String dataFileName(String[] args) {
110+
if (args.length == 1) {
111+
return args[0];
112+
}
113+
return FILE;
114+
}
115+
116+
protected static byte[] readAllData(String fileName) throws IOException {
117+
return Files.readAllBytes(Path.of(fileName));
118+
}
119+
120+
protected static ByteBuffer memoryMap(String fileName) throws IOException {
121+
try (RandomAccessFile file = new RandomAccessFile(new File(fileName), "r")) {
122+
// Get file channel in read-only mode
123+
FileChannel fileChannel = file.getChannel();
124+
125+
return fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileChannel.size());
126+
}
127+
}
128+
129+
protected enum State {
130+
PARSING_CITY_NAME,
131+
SKIPPING_SEMICOLON,
132+
PARSING_TEMPERATURE
133+
}
134+
135+
protected static Results parseData(ByteBuffer data) {
136+
var results = new Results();
137+
var state = State.PARSING_CITY_NAME;
138+
int cityStartOffset = 0, cityEndOffset = 0;
139+
int temp = 0, sign = 0;
140+
141+
for (int i = 0; i < data.limit(); i++) {
142+
byte currentChar = data.get();
143+
if (state == State.PARSING_CITY_NAME && currentChar == ';') {
144+
state = State.SKIPPING_SEMICOLON;
145+
cityEndOffset = i;
146+
}
147+
else if (state == State.PARSING_CITY_NAME) {
148+
// do nothing
149+
}
150+
else if (state == State.SKIPPING_SEMICOLON && currentChar == '-') {
151+
state = State.PARSING_TEMPERATURE;
152+
temp = 0;
153+
sign = -1;
154+
}
155+
else if (state == State.SKIPPING_SEMICOLON && currentChar >= '0' && currentChar <= '9') {
156+
state = State.PARSING_TEMPERATURE;
157+
temp = currentChar - '0';
158+
sign = 1;
159+
}
160+
else if (state == State.PARSING_TEMPERATURE && currentChar >= '0' && currentChar <= '9') {
161+
temp = temp * 10 + currentChar - '0';
162+
}
163+
else if (state == State.PARSING_TEMPERATURE && currentChar == '.') {
164+
// do nothing
165+
}
166+
else if (state == State.PARSING_TEMPERATURE && currentChar == '\n') {
167+
byte[] bytes = new byte[cityEndOffset - cityStartOffset];
168+
data.get(cityStartOffset, bytes);
169+
var cityName = new String(bytes);
170+
accumulate(results, cityName, temp * sign);
171+
state = State.PARSING_CITY_NAME;
172+
cityStartOffset = i + 1;
173+
}
174+
}
175+
176+
return results;
177+
}
178+
179+
private static void accumulate(Results results, String cityName, int tempTimesTen) {
180+
var existing = results.get(cityName);
181+
if (existing == null) {
182+
results.put(cityName, new CityData(tempTimesTen, tempTimesTen, tempTimesTen, 1));
183+
}
184+
else {
185+
existing.min = Math.min(existing.min, tempTimesTen);
186+
existing.sum = existing.sum + tempTimesTen;
187+
existing.max = Math.max(existing.max, tempTimesTen);
188+
existing.count++;
189+
}
190+
}
191+
192+
protected static Results merge(Results a, Results b) {
193+
for (var entry : b.entrySet()) {
194+
CityData valueInA = a.get(entry.getKey());
195+
if (null == valueInA) {
196+
a.put(entry.getKey(), entry.getValue());
197+
}
198+
else {
199+
var valueInB = entry.getValue();
200+
valueInA.min = Math.min(valueInA.min, valueInB.min);
201+
valueInA.sum += valueInB.sum;
202+
valueInA.max = Math.max(valueInA.max, valueInB.max);
203+
valueInA.count += valueInB.count;
204+
}
205+
}
206+
207+
return a;
208+
}
209+
210+
protected static class Results extends TreeMap<String, CityData> {
211+
212+
}
213+
214+
protected static class CityData {
215+
int min, sum, max, count;
216+
217+
public CityData(int min, int sum, int max, int count) {
218+
this.min = min;
219+
this.sum = sum;
220+
this.max = max;
221+
this.count = count;
222+
}
223+
224+
@Override
225+
public boolean equals(Object o) {
226+
if (this == o)
227+
return true;
228+
if (o == null || getClass() != o.getClass())
229+
return false;
230+
CityData cityData = (CityData) o;
231+
return min == cityData.min && sum == cityData.sum && max == cityData.max && count == cityData.count;
232+
}
233+
234+
@Override
235+
public int hashCode() {
236+
return Objects.hash(min, sum, max, count);
237+
}
238+
239+
@Override
240+
public String toString() {
241+
return STR."CityData{min=\{min}, sum=\{sum}, max=\{max}, count=\{count}\{'}'}";
242+
}
243+
}
244+
245+
protected static void printCities(Results cities) {
246+
System.out.print("{");
247+
for (String city : cities.keySet()) {
248+
CityData data = cities.get(city);
249+
var min = data.min / 10.0;
250+
var mean = (data.sum * 10.0 / data.count) / 100.0;
251+
var max = data.max / 10.0;
252+
System.out.printf(
253+
"%s=%.1f/%.1f/%.1f, ",
254+
city,
255+
min,
256+
mean,
257+
max);
258+
}
259+
System.out.print("}");
260+
}
261+
}

0 commit comments

Comments
 (0)