-
-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: persistent durable scheduler #640
Comments
The current implementation of message scheduler in GoAkt is not persistent and requires the actor system to be up running. Messages sent to a dead/suspended actor is lost. @sdil the latest commit hash is running GoAkt 3. I am yet to cut a release tag :) |
@sdil I will ponder a bit on this use case. Perhaps I can come up with something. |
@sdil I move the issue on the GoAkt repo because I believe that where it is supposed to be :) |
@Tochemey i was in doubt whether should I request this feature in ego or goakt. I requested it here because maybe we could make use of the DB adapters we already have (or will write more here). However, if you have a better idea on how to implement this in goakt, that would be even better! |
@sdil there is no persistence layer in GoAkt like Akka/Pekko. I did not add that because the whole purpose of it was to implement event sourcing and durable state and eGo was perfect for that. Moreover it is very easy for anyone to implement a persistence actor with GoAkt. To add this feature we need to think about persistence layer in GoAkt first :) |
@sdil I have the feeling this seems an application feature.For instance you can create an actor in your application with some state that does the whole scheduling stuff as you pointed it out. In there you can handle the lifecycle processes efficiently using the PreStart/PostStop in there. The concept you are suggesting is different from what GoAkt offers at the moment. To achieve that we need to overhaul a bit the scheduling feature. |
@sdil This is one way to accomplish your feature request. I will provide some steps and code snippets here for your guidance:
message ScheduledJob {
// this is the actor that will receive the schedule message
string actor_id = 1;
// this represents the schedule message
google.protobuf.Any message = 2;
// this defines the cron expression.
// we can also just put the actual time when the send the message
// using a cron expression may require additional implementation.
string cron = 3;
}
// JobsStore store scheduled jobs
// This is POC. In a real-world application, you should use a database
type JobsStore struct {
sync.RWMutex
// an actor can have more than one job
// this is POC
jobs map[string]*schedulerpb.ScheduledJob
}
// NewJobsStore create a new job store
func NewJobsStore() *JobsStore {
return &JobsStore{
jobs: make(map[string]*schedulerpb.ScheduledJob),
RWMutex: sync.RWMutex{},
}
}
// Add adds a new job to the store
func (store *JobsStore) Add(scheduledJob *schedulerpb.ScheduledJob) {
store.Lock()
store.jobs[scheduledJob.GetActorId()] = scheduledJob
store.Unlock()
}
// Remove removes a job from the store
func (store *JobsStore) Remove(actorID string) {
store.Lock()
delete(store.jobs, actorID)
store.Unlock()
}
// Get gets a job from the store
func (store *JobsStore) Get(actorID string) *schedulerpb.ScheduledJob {
store.RLock()
defer store.RUnlock()
return store.jobs[actorID]
}
// GetAll gets all jobs from the store
func (store *JobsStore) GetAll() []*schedulerpb.ScheduledJob {
store.RLock()
defer store.RUnlock()
jobs := make([]*schedulerpb.ScheduledJob, 0, len(store.jobs))
for _, job := range store.jobs {
jobs = append(jobs, job)
}
return jobs
}
// IsEmpty checks if the store is empty
func (store *JobsStore) IsEmpty() bool {
store.RLock()
defer store.RUnlock()
return len(store.jobs) == 0
}
// Size returns the number of jobs in the store
func (store *JobsStore) Size() int {
store.RLock()
defer store.RUnlock()
return len(store.jobs)
}
// Clear clears all jobs from the store
func (store *JobsStore) Clear() {
store.Lock()
store.jobs = make(map[string]*schedulerpb.ScheduledJob)
store.Unlock()
}
// Scheduler schedule jobs
type Scheduler struct {
jobsStore *JobsStore
pid *actor.PID
jobs []*schedulerpb.ScheduledJob
}
var _ actor.Actor = (*Scheduler)(nil)
// NewScheduler create a new scheduler
func NewScheduler() *Scheduler {
return &Scheduler{
jobsStore: NewJobsStore(),
}
}
func (s *Scheduler) PreStart(ctx context.Context) error {
if !s.jobsStore.IsEmpty() {
s.jobs = s.jobsStore.GetAll()
}
return nil
}
func (s *Scheduler) Receive(ctx *actor.ReceiveContext) {
switch msg := ctx.Message().(type) {
case *goaktpb.PostStart:
s.pid = ctx.Self()
case *schedulerpb.ScheduledJob:
s.jobs = append(s.jobs, msg)
s.jobsStore.Add(msg)
// TODO: start ticking for the job
default:
ctx.Unhandled()
}
}
func (s *Scheduler) PostStop(ctx context.Context) error {
for _, job := range s.jobs {
s.jobsStore.Add(job)
}
return nil
} As you can see any actor can send a ScheduleJob to the |
@sdil I tried to convert this as a discussion rather than an issue but Github seems not working. If you can try to move it in the QA discussion that will be great. |
Yup totally agreed. I like how Goakt is designed now, so adding this to Goakt doesn't seem like a good move. I might fork eGo or write this as library. Thanks for your guidance and the code snippet! |
I tried this as well but it's not working also |
@sdil I will suggest you think deeper about it. If you are adding an enhancement to GoAkt then great. However if you are making it like a general purpose one that may not fly. :). I am more than to assist in brainstorming. |
@sdil I will be closing the issue if you are ok. |
Definitely not in Goakt, but perhaps eGo. I try to make it generic as possible. Sure go ahead. You can close the issue 😃 |
Problem description
Goakt allows you to schedule a message using scheduler. However, I assumed that the scheduled message might be lost when some nodes or the entire cluster are shut down.
The use case is for building long-running workflows in actors. For example
Solution description
Write a new actor called SchedulerActor, similar to the projection actor. How it works:
SendSync("SchedulerActor", <command, appended with time in the future when it will be executed>)
SendAsync(command)
. The command is then popped from its actor state and deleted from the durable DB.This is more or less how it works visualized
It's best to run this actor as a cluster singleton actor. However, ego is not using goakt v3 yet at the moment. The advantage of using cluster singleton rather than implementing this outside Actor system is, the processing loop always run once in the entire cluster and we don't need to care about leader election and such.
Perhaps we don't need actor state at this moment and let the actor poll for the schedules from the store every X interval to keep implemention simple.
I would appreciate your opinion to make the function signature more ergonomics.
Alternative solutions
Use goakt scheduler but it's not durable and susceptible to message lost when nodes or cluster are shut down.
Additional context
N/A
The text was updated successfully, but these errors were encountered: