Skip to content

Simple Single-Threaded Reactive TCP class using RxJava and NIO

License

Notifications You must be signed in to change notification settings

RightMesh/librxtcp

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

26 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

librxtcp

RxTCP is a small single-threaded TCP library using Java NIO and RxJava. You can use this library in your project with gradle using jitpack

repositories {
    maven { url 'https://jitpack.io' }
}
dependencies {
   implementation 'com.github.RightMesh:librxtcp:master-SNAPSHOT'
}

create a RxTCP.Server

To run a TCP server on a given port just do the following:

RxTCP.Server<RxTCP.Connection> s = new RxTCP.Server<>(port);
s.start().subscribe(
        connection -> {
            /* connection is a new RxTCP.Connection client */
        },
        e -> {
            /* the server has stopped */
        },
        () -> {
            /* the server has stopped */
        }
        );

You can also provide a factory to the server that must create a RxTCP.Connection object:

RxTCP.Server<MyOwnTCPConnection> s = new RxTCP.Server<>(port, MyOwnTCPConnection::new);
s.start().subscribe(
        connection -> {
            /* connection is a new client socket of type MyOwnTCPConnection */
        },
        e -> {
            /* the server has stopped */
        },
        () -> {
            /* the server has stopped */
        }
        );

create a client with RxTCP.ConnectionRequest

to connect to a TCP Server do the following:

    new RxTCP.ConnectionRequest<>(inetAddress.getHostAddress(), port)
    .connect()
    .subscribe(
            connection -> {
                /* connection socket of type RxTCP.Connection */
            },
            e -> {
                /* connection failed */
            });

similarly, you can also provide a Fqactory:

    new RxTCP.ConnectionRequest<>(inetAddress.getHostAddress(), port, MyOwnTCPConnection::new)
    .connect()
    .subscribe(
            connection -> {
                /* connection socket of type MyOwnTCPConnection */
            },
            e -> {
                /* connection failed */
            });

send and recv data using RxTCP.Connection

Receiving ByteBuffer

you can recv data using recv:

con.recv().subscribe(
                    byteBuffer -> {
                        /* new bytebuffer received */
                    },
                    e -> {
                        /* connection has closed */
                    },
                    () -> {
                        /* connection has closed
                    });

few points:

  • the same ByteBuffer is reused for every new packet received so you must not modify it outside of this thread.
  • the callback for the bytebuffer runs on the NIO single thread so any processing that happens here must be kept minimal and returns as soon as possible to prevent slowing down the other sockets.
  • when recv terminates, it means that the TCP connection has closed.

Sending ByteBuffer

You can send ByteBuffer like so:

    ByteBuffer b = ByteBuffer.wrap("Hello World".getBytes());
    con.order(b).track().subscribe(
        i -> {
            /* track the number of bytes sent from this order */
        },
        e -> {
            /* an error happened during transmission */
        },
        () -> {
            /* transmission done */
        }

Every order returns an JobHandle that you can use to track the order with track(). If you do not subscribe to this Observable, no packets are sent. you can also order a Flowable:

    ByteBuffer b = Flowable.just(ByteBuffer.wrap("Hello World".getBytes()));
    con.order(b).track().subscribe(
        i -> {
            /* track the number of bytes sent from this order */
        },
        e -> {
            /* an error happened during transmission */
        },
        () -> {
            /* transmission done */
        }

You can also cancel an order using the JobHandle like so:

    ByteBuffer b = Flowable.just(ByteBuffer.wrap("Hello World".getBytes()));
    JobHandle handle = con.order(b);
    handle.track().subscribe(
        i -> {
            /* track the number of bytes sent from this order */
        },
        e -> {
            /* an error happened during transmission */
        },
        () -> {
            /* transmission done */
        }
    
    // some stuff happens
    
    handle.cancel();

License

Copyright 2018 Lucien Loiseau

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

About

Simple Single-Threaded Reactive TCP class using RxJava and NIO

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages