RxTCP is a small single-threaded TCP library using Java NIO and RxJava. You can use this library in your project with gradle using jitpack
repositories {
maven { url 'https://jitpack.io' }
}
dependencies {
implementation 'com.github.RightMesh:librxtcp:master-SNAPSHOT'
}
To run a TCP server on a given port just do the following:
RxTCP.Server<RxTCP.Connection> s = new RxTCP.Server<>(port);
s.start().subscribe(
connection -> {
/* connection is a new RxTCP.Connection client */
},
e -> {
/* the server has stopped */
},
() -> {
/* the server has stopped */
}
);
You can also provide a factory to the server that must create a RxTCP.Connection object:
RxTCP.Server<MyOwnTCPConnection> s = new RxTCP.Server<>(port, MyOwnTCPConnection::new);
s.start().subscribe(
connection -> {
/* connection is a new client socket of type MyOwnTCPConnection */
},
e -> {
/* the server has stopped */
},
() -> {
/* the server has stopped */
}
);
to connect to a TCP Server do the following:
new RxTCP.ConnectionRequest<>(inetAddress.getHostAddress(), port)
.connect()
.subscribe(
connection -> {
/* connection socket of type RxTCP.Connection */
},
e -> {
/* connection failed */
});
similarly, you can also provide a Fqactory:
new RxTCP.ConnectionRequest<>(inetAddress.getHostAddress(), port, MyOwnTCPConnection::new)
.connect()
.subscribe(
connection -> {
/* connection socket of type MyOwnTCPConnection */
},
e -> {
/* connection failed */
});
you can recv data using recv:
con.recv().subscribe(
byteBuffer -> {
/* new bytebuffer received */
},
e -> {
/* connection has closed */
},
() -> {
/* connection has closed
});
few points:
- the same ByteBuffer is reused for every new packet received so you must not modify it outside of this thread.
- the callback for the bytebuffer runs on the NIO single thread so any processing that happens here must be kept minimal and returns as soon as possible to prevent slowing down the other sockets.
- when recv terminates, it means that the TCP connection has closed.
You can send ByteBuffer like so:
ByteBuffer b = ByteBuffer.wrap("Hello World".getBytes());
con.order(b).track().subscribe(
i -> {
/* track the number of bytes sent from this order */
},
e -> {
/* an error happened during transmission */
},
() -> {
/* transmission done */
}
Every order returns an JobHandle that you can use to track the order with track(). If you do not subscribe to this Observable, no packets are sent. you can also order a Flowable:
ByteBuffer b = Flowable.just(ByteBuffer.wrap("Hello World".getBytes()));
con.order(b).track().subscribe(
i -> {
/* track the number of bytes sent from this order */
},
e -> {
/* an error happened during transmission */
},
() -> {
/* transmission done */
}
You can also cancel an order using the JobHandle like so:
ByteBuffer b = Flowable.just(ByteBuffer.wrap("Hello World".getBytes()));
JobHandle handle = con.order(b);
handle.track().subscribe(
i -> {
/* track the number of bytes sent from this order */
},
e -> {
/* an error happened during transmission */
},
() -> {
/* transmission done */
}
// some stuff happens
handle.cancel();
Copyright 2018 Lucien Loiseau
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.