Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: persistent durable scheduler #640

Closed
sdil opened this issue Mar 1, 2025 · 13 comments
Closed

feat: persistent durable scheduler #640

sdil opened this issue Mar 1, 2025 · 13 comments
Labels
enhancement question Further information is requested wontfix This will not be worked on

Comments

@sdil
Copy link
Collaborator

sdil commented Mar 1, 2025

Problem description

Goakt allows you to schedule a message using scheduler. However, I assumed that the scheduled message might be lost when some nodes or the entire cluster are shut down.

The use case is for building long-running workflows in actors. For example

  1. I have a user actor I wish to email after 7 days, and the user signed up on my system
  2. I wish to send an email to the user every 30 days (until the user is offboarded) for his/her usage report.

Solution description

Write a new actor called SchedulerActor, similar to the projection actor. How it works:

  1. When scheduling a message, other actors will send a message to SchedulerActor like SendSync("SchedulerActor", <command, appended with time in the future when it will be executed>)
  2. SchedulerActor will add the command into its actor state and also persist it in a store (which ever DB adapter we want to support)
  3. SchedulerActor will run a processing loop where it will peek its actor state every second and see if there's any command to send to the target actor. Peeking into actor state is much cheaper than doing DB query so once per second is tolerable IMO.
  4. When a scheduled message is due to be executed, it will send a message to the target actor like SendAsync(command). The command is then popped from its actor state and deleted from the durable DB.
  5. During SchedulerActor PreStart, restore all the schedules from DB and rehydrate into its actor state.

This is more or less how it works visualized

Image

It's best to run this actor as a cluster singleton actor. However, ego is not using goakt v3 yet at the moment. The advantage of using cluster singleton rather than implementing this outside Actor system is, the processing loop always run once in the entire cluster and we don't need to care about leader election and such.

Perhaps we don't need actor state at this moment and let the actor poll for the schedules from the store every X interval to keep implemention simple.

I would appreciate your opinion to make the function signature more ergonomics.

Alternative solutions
Use goakt scheduler but it's not durable and susceptible to message lost when nodes or cluster are shut down.

Additional context
N/A

@Tochemey
Copy link
Owner

Tochemey commented Mar 2, 2025

The current implementation of message scheduler in GoAkt is not persistent and requires the actor system to be up running. Messages sent to a dead/suspended actor is lost.

@sdil the latest commit hash is running GoAkt 3. I am yet to cut a release tag :)

@Tochemey
Copy link
Owner

Tochemey commented Mar 2, 2025

@sdil I will ponder a bit on this use case. Perhaps I can come up with something.

@Tochemey Tochemey transferred this issue from Tochemey/ego Mar 2, 2025
@Tochemey Tochemey changed the title [FEAT] Durable scheduler feat: persistent durable scheduler Mar 2, 2025
@Tochemey
Copy link
Owner

Tochemey commented Mar 2, 2025

@sdil I move the issue on the GoAkt repo because I believe that where it is supposed to be :)

@sdil
Copy link
Collaborator Author

sdil commented Mar 2, 2025

@Tochemey i was in doubt whether should I request this feature in ego or goakt. I requested it here because maybe we could make use of the DB adapters we already have (or will write more here). However, if you have a better idea on how to implement this in goakt, that would be even better!

@Tochemey
Copy link
Owner

Tochemey commented Mar 2, 2025

@sdil there is no persistence layer in GoAkt like Akka/Pekko. I did not add that because the whole purpose of it was to implement event sourcing and durable state and eGo was perfect for that. Moreover it is very easy for anyone to implement a persistence actor with GoAkt. To add this feature we need to think about persistence layer in GoAkt first :)

@Tochemey
Copy link
Owner

Tochemey commented Mar 2, 2025

@sdil I have the feeling this seems an application feature.For instance you can create an actor in your application with some state that does the whole scheduling stuff as you pointed it out. In there you can handle the lifecycle processes efficiently using the PreStart/PostStop in there. The concept you are suggesting is different from what GoAkt offers at the moment. To achieve that we need to overhaul a bit the scheduling feature.

Repository owner locked and limited conversation to collaborators Mar 2, 2025
@Tochemey Tochemey converted this issue into a discussion Mar 2, 2025
@Tochemey Tochemey reopened this Mar 2, 2025
@Tochemey Tochemey converted this issue into a discussion Mar 2, 2025
@Tochemey Tochemey reopened this Mar 2, 2025
@Tochemey Tochemey converted this issue into a discussion Mar 2, 2025
@Tochemey Tochemey reopened this Mar 2, 2025
Repository owner unlocked this conversation Mar 2, 2025
@Tochemey
Copy link
Owner

Tochemey commented Mar 2, 2025

@sdil This is one way to accomplish your feature request. I will provide some steps and code snippets here for your guidance:

  1. Create a proto message that encapsulate a scheduled job as follow:
message ScheduledJob {
  // this is the actor that will receive the schedule message
  string actor_id = 1;
  // this represents the schedule message
  google.protobuf.Any message = 2;
  // this defines the cron expression.
  // we can also just put the actual time when the send the message
  // using a cron expression may require additional implementation.
  string cron = 3;
}
  1. Create a durable store for the jobs you want to persist:
// JobsStore store scheduled jobs
// This is POC. In a real-world application, you should use a database
type JobsStore struct {
	sync.RWMutex
	// an actor can have more than one job
	// this is POC
	jobs map[string]*schedulerpb.ScheduledJob
}

// NewJobsStore create a new job store
func NewJobsStore() *JobsStore {
	return &JobsStore{
		jobs:    make(map[string]*schedulerpb.ScheduledJob),
		RWMutex: sync.RWMutex{},
	}
}

// Add adds a new job to the store
func (store *JobsStore) Add(scheduledJob *schedulerpb.ScheduledJob) {
	store.Lock()
	store.jobs[scheduledJob.GetActorId()] = scheduledJob
	store.Unlock()
}

// Remove removes a job from the store
func (store *JobsStore) Remove(actorID string) {
	store.Lock()
	delete(store.jobs, actorID)
	store.Unlock()
}

// Get gets a job from the store
func (store *JobsStore) Get(actorID string) *schedulerpb.ScheduledJob {
	store.RLock()
	defer store.RUnlock()
	return store.jobs[actorID]
}

// GetAll gets all jobs from the store
func (store *JobsStore) GetAll() []*schedulerpb.ScheduledJob {
	store.RLock()
	defer store.RUnlock()
	jobs := make([]*schedulerpb.ScheduledJob, 0, len(store.jobs))
	for _, job := range store.jobs {
		jobs = append(jobs, job)
	}
	return jobs
}

// IsEmpty checks if the store is empty
func (store *JobsStore) IsEmpty() bool {
	store.RLock()
	defer store.RUnlock()
	return len(store.jobs) == 0
}

// Size returns the number of jobs in the store
func (store *JobsStore) Size() int {
	store.RLock()
	defer store.RUnlock()
	return len(store.jobs)
}

// Clear clears all jobs from the store
func (store *JobsStore) Clear() {
	store.Lock()
	store.jobs = make(map[string]*schedulerpb.ScheduledJob)
	store.Unlock()
}
  1. Create the Scheduler actor as follow:
// Scheduler schedule jobs
type Scheduler struct {
	jobsStore *JobsStore
	pid       *actor.PID
	jobs      []*schedulerpb.ScheduledJob
}

var _ actor.Actor = (*Scheduler)(nil)

// NewScheduler create a new scheduler
func NewScheduler() *Scheduler {
	return &Scheduler{
		jobsStore: NewJobsStore(),
	}
}

func (s *Scheduler) PreStart(ctx context.Context) error {
	if !s.jobsStore.IsEmpty() {
		s.jobs = s.jobsStore.GetAll()
	}
	return nil
}

func (s *Scheduler) Receive(ctx *actor.ReceiveContext) {
	switch msg := ctx.Message().(type) {
	case *goaktpb.PostStart:
		s.pid = ctx.Self()
	case *schedulerpb.ScheduledJob:
		s.jobs = append(s.jobs, msg)
		s.jobsStore.Add(msg)
		// TODO: start ticking for the job
	default:
		ctx.Unhandled()
	}
}

func (s *Scheduler) PostStop(ctx context.Context) error {
	for _, job := range s.jobs {
		s.jobsStore.Add(job)
	}
	return nil
}

As you can see any actor can send a ScheduleJob to the Scheduler for it to process.

Repository owner locked and limited conversation to collaborators Mar 2, 2025
@Tochemey Tochemey converted this issue into a discussion Mar 2, 2025
@Tochemey Tochemey reopened this Mar 2, 2025
Repository owner unlocked this conversation Mar 2, 2025
@Tochemey Tochemey self-assigned this Mar 2, 2025
@Tochemey Tochemey added the feature New feature or request label Mar 2, 2025
Repository owner locked and limited conversation to collaborators Mar 2, 2025
Repository owner unlocked this conversation Mar 2, 2025
@Tochemey
Copy link
Owner

Tochemey commented Mar 2, 2025

@sdil I tried to convert this as a discussion rather than an issue but Github seems not working. If you can try to move it in the QA discussion that will be great.

@Tochemey Tochemey removed their assignment Mar 2, 2025
@Tochemey Tochemey removed feature New feature or request enhancement labels Mar 2, 2025
Repository owner locked and limited conversation to collaborators Mar 2, 2025
@Tochemey Tochemey converted this issue into a discussion Mar 2, 2025
@Tochemey Tochemey reopened this Mar 2, 2025
Repository owner unlocked this conversation Mar 2, 2025
@Tochemey Tochemey added question Further information is requested enhancement wontfix This will not be worked on labels Mar 2, 2025
@sdil
Copy link
Collaborator Author

sdil commented Mar 2, 2025

I have the feeling this seems an application feature

Yup totally agreed. I like how Goakt is designed now, so adding this to Goakt doesn't seem like a good move. I might fork eGo or write this as library.

Thanks for your guidance and the code snippet!

Repository owner locked and limited conversation to collaborators Mar 2, 2025
@sdil sdil converted this issue into a discussion Mar 2, 2025
@sdil sdil reopened this Mar 2, 2025
@sdil
Copy link
Collaborator Author

sdil commented Mar 2, 2025

I tried to convert this as a discussion rather than an issue but Github seems not working

I tried this as well but it's not working also

@sdil sdil converted this issue into a discussion Mar 2, 2025
@sdil sdil reopened this Mar 2, 2025
@Tochemey
Copy link
Owner

Tochemey commented Mar 3, 2025

@sdil I will suggest you think deeper about it. If you are adding an enhancement to GoAkt then great. However if you are making it like a general purpose one that may not fly. :). I am more than to assist in brainstorming.

@Tochemey
Copy link
Owner

Tochemey commented Mar 3, 2025

@sdil I will be closing the issue if you are ok.

Repository owner unlocked this conversation Mar 3, 2025
@sdil
Copy link
Collaborator Author

sdil commented Mar 3, 2025

Definitely not in Goakt, but perhaps eGo. I try to make it generic as possible.

Sure go ahead. You can close the issue 😃

@Tochemey Tochemey closed this as completed Mar 3, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement question Further information is requested wontfix This will not be worked on
Projects
None yet
Development

No branches or pull requests

2 participants