Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

take full advantage of the advantage of the concurrency of FileChannel #560

Open
jixuan1989 opened this issue Jan 3, 2019 · 1 comment

Comments

@jixuan1989
Copy link
Member

Because File Channel supports concurrency read. we can use this feature and decouple the file read module.

The attachment is a test for FileChannel:

  1. File Channel native concurrent read;
  2. MappedByteBuffer
  3. Open many file channels for one file.

Results show that File Channel's native concurrency is good enough.

import org.apache.commons.lang3.RandomStringUtils;
import sun.misc.Cleaner;
import sun.nio.ch.DirectBuffer;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Test2 {
    static String filename = "test.txt";
    public static void main(String[] args) throws IOException, InterruptedException {
        //write();
        int round1 = 5;
        Path path = Paths.get(filename);
        FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ);
        String[] result=new String[2000];
        for(int i=0; i<result.length; i++){
            result[i]= read1000(i*1000,fileChannel);
        }


        //for mapped byte buffer
        long time1 = System.currentTimeMillis();
        MappedByteBuffer data = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, 2000*1000000);
        System.out.println("map 2GB data, time cost:"+ (System.currentTimeMillis()-time1));
        ExecutorService service2 = Executors.newFixedThreadPool(100);
        long time2 = System.currentTimeMillis();
        for(int i=0; i< round1; i++) {
            for(int j=0; j<2000; j++) {
                service2.submit(new MyThread2(j, data, result[j]));
            }
        }
        service2.shutdown();
        service2.awaitTermination(1000000L, TimeUnit.SECONDS);
        System.out.println((System.currentTimeMillis()-time2));


        //for concurrent file channel
        ExecutorService service = Executors.newFixedThreadPool(100);
        long time = System.currentTimeMillis();
        for(int i=0; i< round1; i++) {
            for(int j=0; j<2000; j++) {
                service.submit(new MyThread(j, fileChannel, result[j]));
            }
        }
        service.shutdown();
        service.awaitTermination(1000000L, TimeUnit.SECONDS);
        System.out.println((System.currentTimeMillis()-time));

        //for open many file channels
        ExecutorService service3 = Executors.newFixedThreadPool(100);
        FileChannel[] channels = new FileChannel[100*2000];
        for(int i=0; i< round1; i++) {
            for(int j=0; j<2000; j++) {
                channels[i*2000+j] = FileChannel.open(path, StandardOpenOption.READ);
            }
        }
        long time3 = System.currentTimeMillis();
        for(int i=0; i< round1; i++) {
            for(int j=0; j<2000; j++) {
                service3.submit(new MyThread(j, channels[i*2000+j], result[j]));
            }
        }
        service3.shutdown();
        service3.awaitTermination(1000000L, TimeUnit.SECONDS);
        System.out.println((System.currentTimeMillis()-time3));
        for(FileChannel channel: channels){
            channel.close();
        }


        time1 = System.currentTimeMillis();
        Cleaner cl = ((DirectBuffer)data).cleaner();
        if (cl != null) {
            cl.clean();
        }
        System.out.println("clean 2GB data, time cost:"+ (System.currentTimeMillis()-time1));

        fileChannel.close();
    }

    public static String read1000(int offset, FileChannel fileChannel)throws IOException{
        ByteBuffer buffer = ByteBuffer.allocate(1000);
        fileChannel.read(buffer,offset);
        buffer.flip();
        byte[] bytes=new byte[1000];
        buffer.get(bytes, 0, 1000);
        return new String( bytes, Charset.forName("UTF-8") );
    }

    public static String read1000fromByteBuffer(int offset, ByteBuffer data)throws IOException{
        byte[] bytes=new byte[1000];
        data.duplicate().get(bytes, offset, 1000 );
        return new String( bytes, Charset.forName("UTF-8") );
    }

    public static void write() throws IOException {
        Path path = Paths.get(filename);
        if(!Files.exists(path)){
            Files.createFile(path);
        }
        FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.APPEND);
        for(int i = 0 ; i <2000; i++) {
            fileChannel.write(ByteBuffer.wrap(RandomStringUtils.randomAlphabetic(1000000).getBytes()));
            System.out.println(i);
        }
        fileChannel.close();
    }


    public static class MyThread implements  Runnable{
        int num;
        FileChannel channel;
        String result;
        MyThread(int num, FileChannel channel, String result){
            this.num=num;
            this.channel=channel;
            this.result=result;
        }
        @Override
        public void run(){
            try {
                //Thread.sleep(new Random().nextInt(5));
                String line = read1000(num * 1000, channel);
                if (!line.equals(result)) {
                    System.out.println(line);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * for mapped byte buffer
     */
    public static class MyThread2 implements  Runnable{
        int num;
        ByteBuffer data;
        String result;
        MyThread2(int num, ByteBuffer data, String result){
            this.num=num;
            this.data=data;
            this.result=result;
        }
        @Override
        public void run(){
            try {
                //Thread.sleep(new Random().nextInt(5));
                String line = read1000fromByteBuffer(num * 1000, data);
                if (!line.equals(result)) {
                    System.out.println(line);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>test</groupId>
    <artifactId>test</artifactId>
    <version>1.0-SNAPSHOT</version>
<dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.8.1</version>
    </dependency>

</dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
    </plugins>

</build>
</project>
@jixuan1989
Copy link
Member Author

@Beyyes @liukun4515

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant