It is actually Χρόνος (eng. Chronos) - personification of time in Greek mythology.
I've built it after trying to fix MySql driver for Hangfire. Don't get me wrong: Hangfire is an excellent piece of software (MySql driver not so much though), and Χρόνος provides less, but it provides exactly what I needed.
It is lightweight scheduler, build with CQRS in mind. The only thing it really does is: "send this message back to me when the time comes", that's it.
There are two interfaces:
interface IJobScheduler
{
Task<Guid> Schedule(DateTimeOffset time, object message);
}
and:
interface IJobHandler
{
Task Handle(CancellationToken token, object message);
}
IJobScheduler
is meant to schedule a message at a given time.
You can imagine that very naive implementation of IJobScheduler
would be:
class NaiveJobScheduler: IJobScheduler
{
public SimplisticJobScheduler(IJobHandler handler) =>
_handler = handler;
public Task<Guid> Schedule(DateTimeOffset time, object message)
{
var delay = time.Subtract(DateTimeOffset.Now);
Task.Delay(delay).ContinueWith(_ => _handler.Handle(CancellationToken.None, message));
return Task.FromResult(Guid.NewGuid());
}
}
making sure message gets delivered at given time.
Handler needs to be implemented by the user. It is responsible for handling messages. Χρόνος does not care at all how you implement this handler.
The simplistic approach would be to have massive switch statement:
internal class SimplisticJobHandler: IJobHandler
{
public Task Handle(CancellationToken token, object payload)
{
switch (payload)
{
case CommandA a:
await HandleCommandA(a, token);
return;
case CommandB b:
await HandleCommandB(b, token);
return;
//...
default:
throw new ArgumentException(
$"No handler for command {payload.GetType().Name}");
};
}
}
This is, of course, pretty ineffective so I would recommend some message routing, using, for example, (well established) MediatR or, my other creation, Quarterback.
MediatR integration is already implemented, but it if you are curious how it is done you can check here.
So what Χρόνος actually does? Χρόνος is providing three things: persistence, distributed processing and retries.
Name | Nuget | Description |
---|---|---|
K4os.Xpovoc.Abstractions |
Interfaces. Everything you need to use Χρόνος | |
K4os.Xpovoc.Core |
Implementation of core components, like JobScheduler , default binary job serializer ([Serializable] ) and in-memory scheduler using Rx |
|
K4os.Xpovoc.MySql |
JobStorage implementation for MySql |
|
K4os.Xpovoc.PgSql |
JobStorage implementation for Postgres |
|
K4os.Xpovoc.MsSql |
JobStorage implementation for SQL Server |
|
K4os.Xpovoc.SqLite |
JobStorage implementation for Sqlite |
|
K4os.Xpovoc.Mongo |
JobStorage implementation for MongoDb |
|
K4os.Xpovoc.Redis |
JobStorage implementation for Redis |
|
K4os.Xpovoc.Json |
Json serialization for database storage using Newtonsoft.Json | |
K4os.Xpovoc.MediatR |
MediatR integration | |
K4os.Xpovoc.Brighter |
Brighter integration | |
K4os.Xpovoc.Quarterback |
K4os.Quarterback integration |
NOTE: I am a fan of not dragging too many dependencies (because they introduce risk of version mismatch) but some of them I consider "a standard":
- Core: uses Reactive Extensions because I am in denial and I don't believe Rx is not part of the system
- MySql, PgSql, MsSql, and SqLite: all depend on Dapper because dapper is good and should be considered part of
IDbConnection
extensions - MySql: depends on Polly as Polly should be used
- Json: depends on NewtonSoft.Json (of course)
Everything depends on interfaces though which are defined in Abstractions
so you can reimplement any component yourself using whatever technology you like.
One of the principles behind Χρόνος design was modularity. This gives a lot of freedom how things are implemented but also means some complexity setting it up.
IJobHandler
is just an interface with one method to implement, so you can always roll your own,
but if you are reading this you probably don't want to. You just want to get going as quickly as
possible.
There are two quick options for now: using internal very simple "message broker" SimpleJobHandler
or integrate with MediatR.
Register broker itself:
collection.AddSingleton<IJobHandler, SimpleJobHandler>();
and then register handlers:
collection.AddScoped<IJobHandler<MessageA>>, MyMessageAHandler>();
collection.AddScoped<IJobHandler<MessageB>>, MyMessageBHandler>();
collection.AddScoped<IJobHandler<MessageC>>, MyMessageCHandler>();
//...
Please note, that IJobHandler
is registered as singleton. Changing it won't change anything
as it will be resolved only once.
Register MediatR
adapter as IJobHandler
:
collection.AddSingleton<IJobHandler, MediatorJobHandler>();
From now on, just keep registering MediatR
handlers:
collection.AddScoped<IRequestHandler<MessageA>, MyMessageAHandler>();
collection.AddScoped<IRequestHandler<MessageB>, MyMessageBHandler>();
collection.AddScoped<IRequestHandler<MessageC>, MyMessageCHandler>();
//...
Please note, that IJobHandler
is registered as singleton. Changing it won't change anything
as it will be resolved only once.
I also wrote my own mediator which you can find here: K4os.Quarterback.
It it very similar to MediatR
, depends of DI container and requires registration of Quarterback
's job handler
collection.AddSingleton<IJobHandler, QuarterbackJobHandler>();
and then specific command handlers:
collection.AddScoped<ICommandHandler<CommandA>, MyCommandAHandler>();
collection.AddScoped<ICommandHandler<CommandB>, MyCommandBHandler>();
collection.AddScoped<ICommandHandler<CommandC>, MyCommandCHandler>();
That's it.
Ok, but maybe you want to write your own handler. Maybe you want to integrate it with Akka.NET, Orleans, or Brighter. It is not hard but requires some understanding.
Χρόνος is not opinionated how you handle messages, all you need to do is to implement this interface:
interface IJobHandler
{
Task Handle(CancellationToken token, object message);
}
but actually it comes with one caveat: it will be a singleton. It gets injected
(once, through constructor) into IJobScheduler
which is a singleton itself (because you want
it to work all the time) therefore it becomes a singleton (regardless of your container
configuration).
If you want to make your handler a factory of handlers you can. If you want your handler to be created in new scope, you can. In such case I would recommend taking a look at NewScopeJobHandler. You can either inherit from it, or just replicate. The important part is:
async Task IJobHandler.Handle(CancellationToken token, object payload)
{
using (var scope = _provider.CreateScope())
await Handle(token, scope.ServiceProvider, payload);
}
You can do whatever you want from here: connect to Orleans cluster and send a message (to grain which Id is in your message), send message to some Akka actor which will take from here, dispatch it with Brighter, or put the message on some RabbitMQ queue.
Or... you just can have massing switch
statement... Χρόνος does not care
(the Gods of Software do though).
The other option of simple approach is to put a handler on a message itself, so you can:
async Task Handle(CancellationToken token, object payload)
{
switch (payload)
{
case ICanHandleMyself m: return m.Handle(token);
default: throw new ArgumentException("It cannot handle itself");
}
}
Throw IServiceProvider
in new scope to the mix and you can have pretty compelling example
of encapsulation.
The other important component of this system is IJobSceduler
it is responsible for both storing,
monitoring and and retrieving tasks to be executed. You probably want to persist your jobs in database,
so you need to configure DbJobScheduler
:
collection.AddSingleton<IJobScheduler, DbJobScheduler>();
...and that's it (for now, at least).
Actually, Χρόνος comes with two separate implementations: RxJobScheduler
and DbJobScheduler
.
This is a very simple wrapper around Reactive Extensions
IScheduler
. It uses provided or default scheduler to schedule jobs which allows for some
blend of Rx and CQRS. It also enables testing with simulated time.
Please note, this is in memory only scheduler.
This is the "real" scheduler. It requires IJobHandler
(of course) but also requires storage
mechanism. All the persistence is delegated to IDbJobStorage
(read below). Some aspects, the ones
not related to database, can be configured using ISchedulerConfig
(yet, it is not required).
The third component is actual storage. Currently Χρόνος comes with in 5 flavours:
- MySQL
- PostgreSQL
- Microsoft SQL Server
- SqLite
- Memory
I guess both SqLite and Memory are just toys, MySQL is the one which is really used, while both PostgreSQL and Microsoft SQL Server, well... they do pass integration tests but I don't know if they are used in production anywhere.
Just pick the flavour you want and configure it:
collection.AddSingleton<IDbJobStorage, MySqlJobStorage>();
collection.AddSingleton<IMySqlJobStorageConfig>(
new MySqlJobStorageConfig { ConnectionString = "..." });
All DB storage providers implemented by me may take IJobSerializer
which is responsible
from serialization, to and from database. The default one is based on BinaryFormatter
which
is known for being... not the best. I used it as default though because it does not require and
configuration, it just works (well, except for [Serializable]
attribute, I guess).
You can use any serialization mechanism you want though. All you need is to implement this interface:
public interface IJobSerializer
{
string Serialize(object job);
object Deserialize(string payload);
}
Why string, you might ask? Well, there was a decision to make do I store as byte blob which would be (much?) faster but will be inherently unreadable or store as string so I can use JSON for serialization which allows querying jobs if I really need to (at least for MySQL and PostgreSQL). I flipped the coin. Actually, it could be solved with two fields... but it wasn't.
Anyway, you can use any serializer you want: protobuf, Hyperion... I mean I don't know, whatever works for you.
Two things are worth noting:
- If you have binary serializer
Convert.ToBase64String
is you friend - As jobs can be stored for very long time (scheduled something for next year?) you should check if your serialization mechanism is polymorphic and version tolerant, you don't want your old job to become unreadable after few sprints.
689,587,822 total downloads is not a mistake. Newtonsoft.Json is absolutely great and fits Χρόνος very well. It is human readable, polymorphic and version tolerant, all those things have very high value in this case. It is not a fastest one, but it is not used for messaging but for persistence. You can quickly configure it:
collection.AddSingleton<IJobSerializer>(
new JsonJobSerializer(
new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.Auto }));
Please note, setting TypeNameHandling
to TypeNameHandling.Auto
makes it polymorphic.
There is a slight problem though, as Newtonsoft.Json
is very version tolerant,
polymorphism in Newtonsoft.Json
is not very version tolerant (when you start refactoring,
moving and renaming your jobs).
I don't want to get into details here, but there are two ways to resolve it:
- DON'T refactor, move nor rename your jobs
- Use K4os.Json.KnownTypes
Ok, so we've seen it all. Let me give you some minimal yet complete examples. All database setup looks roughly the same, one of the differences is that both MySQL and SqLite do not have schemas (well, MySQL kind of does, but not exactly) so they use table name prefix (if you want to use it), while both MsSQL and PostgreSQL can use schema for encapsulation.
There's nothing new here, but let's put all I said before together, for example:
MySQL, using SimpleJobHandler
and DefaultJobSerializer
:
collection.AddSingleton<IJobHandler, SimpleJobHandler>();
collection.AddSingleton<IJobScheduler, DbJobScheduler>();
collection.AddSingleton<IDbJobStorage, MySqlJobStorage>();
collection.AddSingleton<IMySqlJobStorageConfig>(
new MySqlJobStorageConfig { ConnectionString = "<connection string>" });
PostgreSQL, using MediatR
and JsonJobSerializer
:
collection.AddSingleton<IJobHandler, MediatorJobHandler>();
collection.AddSingleton<IJobScheduler, DbJobScheduler>();
collection.AddSingleton<IDbJobStorage, PgSqlJobStorage>();
collection.AddSingleton<IPgSqlJobStorageConfig>(
new PgSqlJobStorageConfig { ConnectionString = "<connection string>" });
collection.AddSingleton<IJobSerializer>(
new JsonJobSerializer(
new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.Auto }));
You did all the things above, and it just does not work. THere is still one thing you didn't do.
You did not start it. It is relatively obscure because there is no obvious Start
method, but
there's no need for one, as all everythinh is started in constructor, so this Start
method
would actually do nothing. All you need to do is to run the constrcutor which will happen when
you request IJobScheduler
from service provider.
It may be different place depending on what are you running: Console app, Web app, ASP.NET Core MVC, I have no idea. When you need to is to call:
provider.GetRequiredService<IJobScheduler>();
as early as possible (some Startup
method or something).
There is one thing which I missed. How to actually schedule a job?
Well, that's simple, just grab IJobScheduler
from DI container and use the only method it has:
await scheduler.Schedule(
DateTimeOffset.Now.AddDays(666),
new TriggerApocalypse());
this message will get persisted in database and delivered back to you when the time comes (as long as are able to deserialize it then).
dotnet tool restore
dotnet r build