Skip to content

take full advantage of the advantage of the concurrency of FileChannel #560

Open
@jixuan1989

Description

@jixuan1989

Because File Channel supports concurrency read. we can use this feature and decouple the file read module.

The attachment is a test for FileChannel:

  1. File Channel native concurrent read;
  2. MappedByteBuffer
  3. Open many file channels for one file.

Results show that File Channel's native concurrency is good enough.

import org.apache.commons.lang3.RandomStringUtils;
import sun.misc.Cleaner;
import sun.nio.ch.DirectBuffer;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Test2 {
    static String filename = "test.txt";
    public static void main(String[] args) throws IOException, InterruptedException {
        //write();
        int round1 = 5;
        Path path = Paths.get(filename);
        FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ);
        String[] result=new String[2000];
        for(int i=0; i<result.length; i++){
            result[i]= read1000(i*1000,fileChannel);
        }


        //for mapped byte buffer
        long time1 = System.currentTimeMillis();
        MappedByteBuffer data = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, 2000*1000000);
        System.out.println("map 2GB data, time cost:"+ (System.currentTimeMillis()-time1));
        ExecutorService service2 = Executors.newFixedThreadPool(100);
        long time2 = System.currentTimeMillis();
        for(int i=0; i< round1; i++) {
            for(int j=0; j<2000; j++) {
                service2.submit(new MyThread2(j, data, result[j]));
            }
        }
        service2.shutdown();
        service2.awaitTermination(1000000L, TimeUnit.SECONDS);
        System.out.println((System.currentTimeMillis()-time2));


        //for concurrent file channel
        ExecutorService service = Executors.newFixedThreadPool(100);
        long time = System.currentTimeMillis();
        for(int i=0; i< round1; i++) {
            for(int j=0; j<2000; j++) {
                service.submit(new MyThread(j, fileChannel, result[j]));
            }
        }
        service.shutdown();
        service.awaitTermination(1000000L, TimeUnit.SECONDS);
        System.out.println((System.currentTimeMillis()-time));

        //for open many file channels
        ExecutorService service3 = Executors.newFixedThreadPool(100);
        FileChannel[] channels = new FileChannel[100*2000];
        for(int i=0; i< round1; i++) {
            for(int j=0; j<2000; j++) {
                channels[i*2000+j] = FileChannel.open(path, StandardOpenOption.READ);
            }
        }
        long time3 = System.currentTimeMillis();
        for(int i=0; i< round1; i++) {
            for(int j=0; j<2000; j++) {
                service3.submit(new MyThread(j, channels[i*2000+j], result[j]));
            }
        }
        service3.shutdown();
        service3.awaitTermination(1000000L, TimeUnit.SECONDS);
        System.out.println((System.currentTimeMillis()-time3));
        for(FileChannel channel: channels){
            channel.close();
        }


        time1 = System.currentTimeMillis();
        Cleaner cl = ((DirectBuffer)data).cleaner();
        if (cl != null) {
            cl.clean();
        }
        System.out.println("clean 2GB data, time cost:"+ (System.currentTimeMillis()-time1));

        fileChannel.close();
    }

    public static String read1000(int offset, FileChannel fileChannel)throws IOException{
        ByteBuffer buffer = ByteBuffer.allocate(1000);
        fileChannel.read(buffer,offset);
        buffer.flip();
        byte[] bytes=new byte[1000];
        buffer.get(bytes, 0, 1000);
        return new String( bytes, Charset.forName("UTF-8") );
    }

    public static String read1000fromByteBuffer(int offset, ByteBuffer data)throws IOException{
        byte[] bytes=new byte[1000];
        data.duplicate().get(bytes, offset, 1000 );
        return new String( bytes, Charset.forName("UTF-8") );
    }

    public static void write() throws IOException {
        Path path = Paths.get(filename);
        if(!Files.exists(path)){
            Files.createFile(path);
        }
        FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.APPEND);
        for(int i = 0 ; i <2000; i++) {
            fileChannel.write(ByteBuffer.wrap(RandomStringUtils.randomAlphabetic(1000000).getBytes()));
            System.out.println(i);
        }
        fileChannel.close();
    }


    public static class MyThread implements  Runnable{
        int num;
        FileChannel channel;
        String result;
        MyThread(int num, FileChannel channel, String result){
            this.num=num;
            this.channel=channel;
            this.result=result;
        }
        @Override
        public void run(){
            try {
                //Thread.sleep(new Random().nextInt(5));
                String line = read1000(num * 1000, channel);
                if (!line.equals(result)) {
                    System.out.println(line);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * for mapped byte buffer
     */
    public static class MyThread2 implements  Runnable{
        int num;
        ByteBuffer data;
        String result;
        MyThread2(int num, ByteBuffer data, String result){
            this.num=num;
            this.data=data;
            this.result=result;
        }
        @Override
        public void run(){
            try {
                //Thread.sleep(new Random().nextInt(5));
                String line = read1000fromByteBuffer(num * 1000, data);
                if (!line.equals(result)) {
                    System.out.println(line);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>test</groupId>
    <artifactId>test</artifactId>
    <version>1.0-SNAPSHOT</version>
<dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.8.1</version>
    </dependency>

</dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
    </plugins>

</build>
</project>

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions