The simplest way to implement a task queue with Java, Vertx and PostgreSQL.
There are only three participants in CoronaMQ you have to reason about:
- Worker(s) process tasks that are added to the queue. Workers are bound to a label which describes the unit of work. There can be multiple labels (e.g. PLACE_ORDER, CHECKOUT, etc) and thus workers.
- The broker listens to inserts made to the task queue and sends these tasks over the EventBus.
- The TaskRepository is interacting with the queue in the database. You can deploy it together with the broker, but you don't have to.
There is also the Dispatcher: A dispatcher can add tasks to the queue by sending a message on the EventBus. Another way to add tasks is by adding them directly into the tasks-table.
Thanks to PostgresSQL's NOTIFY/LISTEN and the fastest PostgresSQL driver for Java 1 out there, tasks are instantly pushed to the EventBus. There is no polling. To empty the task queue as fast as possible, workers additionally request tasks when they are deployed and after they've completed a task.
The tasks are stored in a PostgreSQL database guaranteeing durability and consistency. Your application might already use a PostgreSQL database in the persistence layer, so you don't have to bring another player to your system architecture. The fewer players, the fewer errors.
Many queues out there guarantee at least once
-delivery which means tasks might get handled twice. But what you really want
is exactly once
delivery. You have one job - and it should be done once. However, in a real world, there are network timeouts,
database errors et al. so the best you can get is effectively once
delivery and this is what CoronaMQ aims for.
CoronaMQ supports the following API-flavors:
<dependency>
<groupId>io.github.jklingsporn</groupId>
<artifactId>coronamq-core</artifactId>
<version>0.2</version>
</dependency>
<dependency>
<groupId>io.github.jklingsporn</groupId>
<artifactId>coronamq-mutiny</artifactId>
<version>0.2</version>
</dependency>
<dependency>
<groupId>io.github.jklingsporn</groupId>
<artifactId>coronamq-rxjava</artifactId>
<version>0.2</version>
</dependency>
- To run the examples, you need a running docker daemon.
- For a more advanced test, you can use the provided docker file.
- All others need to add the following SQL to their existing database:
CREATE TYPE task_status AS ENUM ('NEW', 'RUNNING','COMPLETED','FAILED');
CREATE TABLE tasks(id UUID,label text, payload JSONB, status task_status, update_time timestamp);
CREATE OR REPLACE FUNCTION task_status_notify()
RETURNS trigger AS
$$
BEGIN
PERFORM pg_notify('coronamq_task_update', jsonb_build_object('id', NEW.id::text, 'payload',new.payload, 'label',new.label)::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER task_status_change
AFTER INSERT
ON tasks
FOR EACH ROW
EXECUTE PROCEDURE task_status_notify();
The following example uses the Future
-API
@Test
public void boostrapExample(Vertx vertx, VertxTestContext testContext){
//configurable options
CoronaMqOptions coronaMqOptions = new CoronaMqOptions();
//a simple worker that just completes any task it receives
SimpleWorker worker = new SimpleWorker(vertx, coronaMqOptions);
//an example configuration with a broker, repository and worker
Future<BootstrapSpreadStep> spread = CoronaMq.create(vertx,coronaMqOptions)
.withBroker()
.withRepository()
.withWorker(worker)
.spread(); //spread the wor(l)d
testContext
.assertComplete(spread)
//send a new task with "test"-label to the queue
.compose(s-> s.dispatch("test",new JsonObject().put("someValue","hi")))
.onComplete(testContext.succeeding(UUID::fromString))
//complete the work
.compose(v-> worker.getCurrentWork())
//shut down all components
.compose(v->spread.compose(BootstrapSpreadStep::vaccinate))
.onComplete(testContext.succeedingThenComplete())
;
}
More examples can be found in the examples-module.
This project is a showcase for various cool features that go beyond a simple "Hello world"-example:
- Vertx Service Discovery to detect the repository's availability
- Vertx Service Proxies to remotely interoperate with the repository using it's interface
- Vertx Codegen to generate Mutiny- and RxJava-APIs
- Testcontainers for running the integration tests
- Micrometer Metrics to see what's going on
I originally created the project under the name PoXMQ (Postgres VertX MQ) but wasn't completely satisfied with it. So I made a poll on twitter - what could possibly go wrong?