Open
Description
Because File Channel supports concurrency read. we can use this feature and decouple the file read module.
The attachment is a test for FileChannel:
- File Channel native concurrent read;
- MappedByteBuffer
- Open many file channels for one file.
Results show that File Channel's native concurrency is good enough.
import org.apache.commons.lang3.RandomStringUtils;
import sun.misc.Cleaner;
import sun.nio.ch.DirectBuffer;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class Test2 {
static String filename = "test.txt";
public static void main(String[] args) throws IOException, InterruptedException {
//write();
int round1 = 5;
Path path = Paths.get(filename);
FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ);
String[] result=new String[2000];
for(int i=0; i<result.length; i++){
result[i]= read1000(i*1000,fileChannel);
}
//for mapped byte buffer
long time1 = System.currentTimeMillis();
MappedByteBuffer data = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, 2000*1000000);
System.out.println("map 2GB data, time cost:"+ (System.currentTimeMillis()-time1));
ExecutorService service2 = Executors.newFixedThreadPool(100);
long time2 = System.currentTimeMillis();
for(int i=0; i< round1; i++) {
for(int j=0; j<2000; j++) {
service2.submit(new MyThread2(j, data, result[j]));
}
}
service2.shutdown();
service2.awaitTermination(1000000L, TimeUnit.SECONDS);
System.out.println((System.currentTimeMillis()-time2));
//for concurrent file channel
ExecutorService service = Executors.newFixedThreadPool(100);
long time = System.currentTimeMillis();
for(int i=0; i< round1; i++) {
for(int j=0; j<2000; j++) {
service.submit(new MyThread(j, fileChannel, result[j]));
}
}
service.shutdown();
service.awaitTermination(1000000L, TimeUnit.SECONDS);
System.out.println((System.currentTimeMillis()-time));
//for open many file channels
ExecutorService service3 = Executors.newFixedThreadPool(100);
FileChannel[] channels = new FileChannel[100*2000];
for(int i=0; i< round1; i++) {
for(int j=0; j<2000; j++) {
channels[i*2000+j] = FileChannel.open(path, StandardOpenOption.READ);
}
}
long time3 = System.currentTimeMillis();
for(int i=0; i< round1; i++) {
for(int j=0; j<2000; j++) {
service3.submit(new MyThread(j, channels[i*2000+j], result[j]));
}
}
service3.shutdown();
service3.awaitTermination(1000000L, TimeUnit.SECONDS);
System.out.println((System.currentTimeMillis()-time3));
for(FileChannel channel: channels){
channel.close();
}
time1 = System.currentTimeMillis();
Cleaner cl = ((DirectBuffer)data).cleaner();
if (cl != null) {
cl.clean();
}
System.out.println("clean 2GB data, time cost:"+ (System.currentTimeMillis()-time1));
fileChannel.close();
}
public static String read1000(int offset, FileChannel fileChannel)throws IOException{
ByteBuffer buffer = ByteBuffer.allocate(1000);
fileChannel.read(buffer,offset);
buffer.flip();
byte[] bytes=new byte[1000];
buffer.get(bytes, 0, 1000);
return new String( bytes, Charset.forName("UTF-8") );
}
public static String read1000fromByteBuffer(int offset, ByteBuffer data)throws IOException{
byte[] bytes=new byte[1000];
data.duplicate().get(bytes, offset, 1000 );
return new String( bytes, Charset.forName("UTF-8") );
}
public static void write() throws IOException {
Path path = Paths.get(filename);
if(!Files.exists(path)){
Files.createFile(path);
}
FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.APPEND);
for(int i = 0 ; i <2000; i++) {
fileChannel.write(ByteBuffer.wrap(RandomStringUtils.randomAlphabetic(1000000).getBytes()));
System.out.println(i);
}
fileChannel.close();
}
public static class MyThread implements Runnable{
int num;
FileChannel channel;
String result;
MyThread(int num, FileChannel channel, String result){
this.num=num;
this.channel=channel;
this.result=result;
}
@Override
public void run(){
try {
//Thread.sleep(new Random().nextInt(5));
String line = read1000(num * 1000, channel);
if (!line.equals(result)) {
System.out.println(line);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* for mapped byte buffer
*/
public static class MyThread2 implements Runnable{
int num;
ByteBuffer data;
String result;
MyThread2(int num, ByteBuffer data, String result){
this.num=num;
this.data=data;
this.result=result;
}
@Override
public void run(){
try {
//Thread.sleep(new Random().nextInt(5));
String line = read1000fromByteBuffer(num * 1000, data);
if (!line.equals(result)) {
System.out.println(line);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>test</groupId>
<artifactId>test</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.8.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Metadata
Metadata
Assignees
Labels
No labels