Skip to content

RSDK-10965 Adding the jobmanager to RDK #5104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

bkaravan
Copy link
Member

@bkaravan bkaravan commented Jul 1, 2025

A draft PR to get feedback on the Job Manager logic.

From the ticket, the job manager:

  • Should have a concept of a “job”
  • Should add or remove jobs according to mutations of the jobs field in the config
  • Should log when jobs trigger and whether they succeed or fail
  • Should gracefully shutdown along with machine
  • Should have tests

Here is how this PR answers these goals:

  • We rely on the "job" definition from gocron. While we considered creating an internal "Job" struct, it seemed to be a layer of abstraction that did not provide any useful features. The JobConfig already has most of the required data to start the job, so a separate struct seemed unnecessary.
  • When the config file gets updated, the diff gets populates Added, Removed, and Modified fields with their respective JobConfig. After that, jobmanager.UpdateJobs() is called within reconfigure, which is responsible for correctly stopping/starting jobs.
  • For logging patterns, explore jobmanager.go
  • A Shutdown() method is introduced and is called at the end of localrobot.Close()
  • Tests are still to be created. There are tests for config-related changes, but not for the jobmanager logic. Will be done after feedback for this part settles!

We have also decided to not use the cronexpr library to parse the cron expression during Validation. cronexpr could allow a 7th year field, while gocron does not support that. Currently, Validation only checks the required fields of the JobConfig, and any "bad schedule" messages are displayed when the jobmanager is trying to actually start the job (in case of an error, no job will be created)

@bkaravan bkaravan requested review from benjirewis and a team July 1, 2025 20:00
@viambot viambot added the safe to test This pull request is marked safe to test from a trusted zone label Jul 1, 2025
) (*Jobmanager, error) {
jobLogger := logger.Sublogger("job_manager")
// we do not want deduplication on jobs
jobLogger.NeverDeduplicate()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per offline discussion, I recommended that we never deduplicate for jobs so that jobs that run every 1s, e.g., are visible in their logging and not squashed unnecessarily.

toPrint := buffer.String()
toPrint = strings.ReplaceAll(toPrint, "\n", "")
toPrint = strings.ReplaceAll(toPrint, "\"", "")
jm.logger.CInfow(jm.ctx, "Job succeeded", "name", jc.Name, "result", toPrint)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you give an example of what this log looks like when it's output?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally. Here is how it looks after replacements:
2025-07-01T21:20:07.714Z INFO rdk.job_manager jobmanager/jobmanager.go:172 Job succeeded {"name":"my motor job","result":"{ isOn: false, powerPct: 0}"}
Here is how it looks without these strings replacements:
2025-07-01T19:58:50.847Z INFO rdk.job_manager jobmanager/jobmanager.go:171 Job succeeded {"name":"my motor job","result":"{\n \"isOn\": false,\n \"powerPct\": 0\n}\n"}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems good to me! Could we change result to be response so it's clear it's a gRPC response?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally, will do tomorrow

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If buffer is guaranteed to be JSON, could you unmarshal it into a struct and pass that into the "w" loggers, or maybe use a custom String()? I feel ReplaceAll to transform json is usually a last resort in many languages, because it's slow (generally speaking -- maybe not applicable here) and if the response can contain some strings that would be more useful if they're kept as-is.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had an attempt where I called json.Unmarshall() directly, but could not quite get it to work. I am happy to give it another try though, currently having some laptop issues but I will give it another go tomorrow.

On that note, do you know if go will support deserializing it into a generic struct? My understanding was that you need to know the struct fields you are getting beforehand, and pass that as an argument to Unmarshall()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, but I just glanced through at the InvokeRPC src and you might even be able to get the response back as the native proto.Message via handler.OnReceiveResponse. That might be pretty useful to have, even just for additional logging flexibility.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will explore!

@benjirewis
Copy link
Member

Awesome job! Some initial questions + nits.


jobManager, err := jobmanager.New(ctx, logger, getResource, r.webSvc.ModuleAddresses())
if err != nil {
logger.Warn("Unable to start the job scheduler")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noticed this logger line as well. What should be the correct thing to do here - do we log an error or a warning and return nil, err; or is this error going to get logged anyway when we return it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point; you probably don't need the warn log line. I believe viam-server would fail to start up with error serving web: [error from failure to create job manager], but you could double-check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, will double check this

@bkaravan bkaravan requested a review from benjirewis July 2, 2025 14:21
@viambot viambot added safe to test This pull request is marked safe to test from a trusted zone and removed safe to test This pull request is marked safe to test from a trusted zone labels Jul 2, 2025

jobManager, err := jobmanager.New(ctx, logger, getResource, r.webSvc.ModuleAddresses())
if err != nil {
logger.Warn("Unable to start the job scheduler")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point; you probably don't need the warn log line. I believe viam-server would fail to start up with error serving web: [error from failure to create job manager], but you could double-check.

functionToRun := func() {
resource, err := jm.getResource(jc.Resource)
if err != nil {
jm.logger.CWarnw(jm.ctx, "Could not get resource", "error", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we should give each job its own Sublogger of rdk.job_manager? That way errors like this can easily be associated with a job? We could also use WithFields to always associate a job name with logs for a certain job. I'd pick one or the other, but up to you.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we would create a Sublogger inside the functionToRun()? Not sure how expensive a call to Sublogger is, especially on quick jobs, it might be setting up/tearing it down quite often. Could that be a reason to create a Job struct, that would hold a logger and the function for each job? Potentially, WithFields would be a better fit to not add this other layer of abstraction

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say let's avoid creating a Sublogger in every functionToRun() call. Whether that requires a Job struct for another layer of abstraction I'm not sure. Feel free to play around with it, but I think a Job being a function and a logging.Logger is a fair, new type to add if need be.

jm.logger.CWarnw(jm.ctx, "Could not get resource", "error", err)
return
}
jm.logger.CInfow(jm.ctx, "Job triggered", "name", jc.Name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you use a Sublogger or a WithFields logger as I suggest above, you probably don't need the "name" field here or anywhere else.

toPrint := buffer.String()
toPrint = strings.ReplaceAll(toPrint, "\n", "")
toPrint = strings.ReplaceAll(toPrint, "\"", "")
jm.logger.CInfow(jm.ctx, "Job succeeded", "name", jc.Name, "result", toPrint)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems good to me! Could we change result to be response so it's clear it's a gRPC response?

Copy link
Member

@cheukt cheukt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good to me so far!

return nil, err
}

dialAddr := "unix://" + parentAddr.UnixAddr
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the server was running in TCP_MODE?


// createJobFunction returns a function that the job scheduler puts on its queue.
func (jm *Jobmanager) createJobFunction(jc config.JobConfig) func() {
if jc.Method == "DoCommand" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just combine this with the function below?

Comment on lines +108 to +130
refCtx := metadata.NewOutgoingContext(jm.ctx, nil)
refClient := grpcreflect.NewClientV1Alpha(refCtx, reflectpb.NewServerReflectionClient(jm.conn))
reflSource := grpcurl.DescriptorSourceFromServer(jm.ctx, refClient)
descSource := reflSource

resourceType := resource.Name().API.SubtypeName
services, err := descSource.ListServices()
if err != nil {
jm.logger.CWarnw(jm.ctx, "Could not get a list of available grpc services", "error", err)
return
}
var grpcService string
for _, srv := range services {
if strings.Contains(srv, resourceType) {
grpcService = srv
break
}
}
if grpcService == "" {
jm.logger.CWarn(jm.ctx, fmt.Sprintf("could not find a service for type: %s", resourceType))
return
}
grpcMethod := grpcService + "." + jc.Method
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feels like a good function to abstract away into a separate function, since the logic is pretty self contained

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
safe to test This pull request is marked safe to test from a trusted zone
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants