-
Notifications
You must be signed in to change notification settings - Fork 123
RSDK-10965 Adding the jobmanager to RDK #5104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
) (*Jobmanager, error) { | ||
jobLogger := logger.Sublogger("job_manager") | ||
// we do not want deduplication on jobs | ||
jobLogger.NeverDeduplicate() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per offline discussion, I recommended that we never deduplicate for jobs so that jobs that run every 1s, e.g., are visible in their logging and not squashed unnecessarily.
toPrint := buffer.String() | ||
toPrint = strings.ReplaceAll(toPrint, "\n", "") | ||
toPrint = strings.ReplaceAll(toPrint, "\"", "") | ||
jm.logger.CInfow(jm.ctx, "Job succeeded", "name", jc.Name, "result", toPrint) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you give an example of what this log looks like when it's output?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally. Here is how it looks after replacements:
2025-07-01T21:20:07.714Z INFO rdk.job_manager jobmanager/jobmanager.go:172 Job succeeded {"name":"my motor job","result":"{ isOn: false, powerPct: 0}"}
Here is how it looks without these strings replacements:
2025-07-01T19:58:50.847Z INFO rdk.job_manager jobmanager/jobmanager.go:171 Job succeeded {"name":"my motor job","result":"{\n \"isOn\": false,\n \"powerPct\": 0\n}\n"}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems good to me! Could we change result
to be response
so it's clear it's a gRPC response?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally, will do tomorrow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If buffer is guaranteed to be JSON, could you unmarshal it into a struct and pass that into the "w" loggers, or maybe use a custom String()
? I feel ReplaceAll
to transform json is usually a last resort in many languages, because it's slow (generally speaking -- maybe not applicable here) and if the response can contain some strings that would be more useful if they're kept as-is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had an attempt where I called json.Unmarshall()
directly, but could not quite get it to work. I am happy to give it another try though, currently having some laptop issues but I will give it another go tomorrow.
On that note, do you know if go will support deserializing it into a generic struct? My understanding was that you need to know the struct fields you are getting beforehand, and pass that as an argument to Unmarshall()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure, but I just glanced through at the InvokeRPC
src and you might even be able to get the response back as the native proto.Message
via handler.OnReceiveResponse
. That might be pretty useful to have, even just for additional logging flexibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will explore!
Awesome job! Some initial questions + nits. |
|
||
jobManager, err := jobmanager.New(ctx, logger, getResource, r.webSvc.ModuleAddresses()) | ||
if err != nil { | ||
logger.Warn("Unable to start the job scheduler") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just noticed this logger line as well. What should be the correct thing to do here - do we log an error or a warning and return nil, err
; or is this error going to get logged anyway when we return it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point; you probably don't need the warn log line. I believe viam-server
would fail to start up with error serving web: [error from failure to create job manager]
, but you could double-check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, will double check this
|
||
jobManager, err := jobmanager.New(ctx, logger, getResource, r.webSvc.ModuleAddresses()) | ||
if err != nil { | ||
logger.Warn("Unable to start the job scheduler") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point; you probably don't need the warn log line. I believe viam-server
would fail to start up with error serving web: [error from failure to create job manager]
, but you could double-check.
functionToRun := func() { | ||
resource, err := jm.getResource(jc.Resource) | ||
if err != nil { | ||
jm.logger.CWarnw(jm.ctx, "Could not get resource", "error", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think we should give each job its own Sublogger
of rdk.job_manager
? That way errors like this can easily be associated with a job? We could also use WithFields
to always associate a job name with logs for a certain job. I'd pick one or the other, but up to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think we would create a Sublogger
inside the functionToRun()
? Not sure how expensive a call to Sublogger
is, especially on quick jobs, it might be setting up/tearing it down quite often. Could that be a reason to create a Job
struct, that would hold a logger and the function for each job? Potentially, WithFields
would be a better fit to not add this other layer of abstraction
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say let's avoid creating a Sublogger
in every functionToRun()
call. Whether that requires a Job
struct for another layer of abstraction I'm not sure. Feel free to play around with it, but I think a Job
being a function and a logging.Logger
is a fair, new type to add if need be.
jm.logger.CWarnw(jm.ctx, "Could not get resource", "error", err) | ||
return | ||
} | ||
jm.logger.CInfow(jm.ctx, "Job triggered", "name", jc.Name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you use a Sublogger
or a WithFields
logger as I suggest above, you probably don't need the "name"
field here or anywhere else.
toPrint := buffer.String() | ||
toPrint = strings.ReplaceAll(toPrint, "\n", "") | ||
toPrint = strings.ReplaceAll(toPrint, "\"", "") | ||
jm.logger.CInfow(jm.ctx, "Job succeeded", "name", jc.Name, "result", toPrint) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems good to me! Could we change result
to be response
so it's clear it's a gRPC response?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good to me so far!
return nil, err | ||
} | ||
|
||
dialAddr := "unix://" + parentAddr.UnixAddr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if the server was running in TCP_MODE?
|
||
// createJobFunction returns a function that the job scheduler puts on its queue. | ||
func (jm *Jobmanager) createJobFunction(jc config.JobConfig) func() { | ||
if jc.Method == "DoCommand" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not just combine this with the function below?
refCtx := metadata.NewOutgoingContext(jm.ctx, nil) | ||
refClient := grpcreflect.NewClientV1Alpha(refCtx, reflectpb.NewServerReflectionClient(jm.conn)) | ||
reflSource := grpcurl.DescriptorSourceFromServer(jm.ctx, refClient) | ||
descSource := reflSource | ||
|
||
resourceType := resource.Name().API.SubtypeName | ||
services, err := descSource.ListServices() | ||
if err != nil { | ||
jm.logger.CWarnw(jm.ctx, "Could not get a list of available grpc services", "error", err) | ||
return | ||
} | ||
var grpcService string | ||
for _, srv := range services { | ||
if strings.Contains(srv, resourceType) { | ||
grpcService = srv | ||
break | ||
} | ||
} | ||
if grpcService == "" { | ||
jm.logger.CWarn(jm.ctx, fmt.Sprintf("could not find a service for type: %s", resourceType)) | ||
return | ||
} | ||
grpcMethod := grpcService + "." + jc.Method |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
feels like a good function to abstract away into a separate function, since the logic is pretty self contained
A draft PR to get feedback on the Job Manager logic.
From the ticket, the job manager:
Here is how this PR answers these goals:
gocron
. While we considered creating an internal "Job" struct, it seemed to be a layer of abstraction that did not provide any useful features. TheJobConfig
already has most of the required data to start the job, so a separate struct seemed unnecessary.Added
,Removed
, andModified
fields with their respectiveJobConfig
. After that,jobmanager.UpdateJobs()
is called within reconfigure, which is responsible for correctly stopping/starting jobs.Shutdown()
method is introduced and is called at the end oflocalrobot.Close()
jobmanager
logic. Will be done after feedback for this part settles!We have also decided to not use the
cronexpr
library to parse the cron expression duringValidation
.cronexpr
could allow a 7th year field, whilegocron
does not support that. Currently,Validation
only checks the required fields of theJobConfig
, and any "bad schedule" messages are displayed when thejobmanager
is trying to actually start the job (in case of an error, no job will be created)