Skip to content

Commit

Permalink
cloud: Make status messages more detailed
Browse files Browse the repository at this point in the history
ctessum committed Sep 30, 2018
1 parent 6cc45c3 commit 0cbe9d2
Showing 9 changed files with 278 additions and 69 deletions.
4 changes: 2 additions & 2 deletions cloud/blob.go
Original file line number Diff line number Diff line change
@@ -69,11 +69,11 @@ func (c *Client) Output(ctx context.Context, job *cloudrpc.JobName) (*cloudrpc.J
o := &cloudrpc.JobOutput{
Files: make(map[string][]byte),
}
//k8sJob, err := c.getk8sJob(ctx, job)
//TODO: k8sJob, err := c.getk8sJob(ctx, job)
if err != nil {
return nil, err
}
//addrs, err := c.jobOutputAddresses(ctx, job.Name, k8sJob.Spec.Template.Spec.Containers[0].Command)
//TODO: addrs, err := c.jobOutputAddresses(ctx, job.Name, k8sJob.Spec.Template.Spec.Containers[0].Command)
addrs, err := c.jobOutputAddresses(ctx, job.Name, []string{"inmap", "run", "steady"})
if err != nil {
return nil, err
61 changes: 45 additions & 16 deletions cloud/client.go
Original file line number Diff line number Diff line change
@@ -87,13 +87,25 @@ func NewClient(k kubernetes.Interface, root *cobra.Command, config *viper.Viper,
return c, nil
}

// Create creates (and queues) a Kubernetes job with the given name that executes
// RunJob creates (and queues) a Kubernetes job with the given name that executes
// the given command with the given command-line arguments on the given container
// image. resources specifies the minimum required resources for execution.
func (c *Client) RunJob(ctx context.Context, job *cloudrpc.JobSpec) (*cloudrpc.JobStatus, error) {
if job.Version != inmap.Version {
return nil, fmt.Errorf("incorrect InMAP version: %s != %s", job.Version, inmap.Version)
}

status, err := c.Status(ctx, &cloudrpc.JobName{Name: job.Name, Version: job.Version})
if err != nil {
return nil, err
}
if status.Status != cloudrpc.Status_Failed { //TODO: status.Status != cloudrpc.Status_Missing && {
// Only create the job if it is missing or failed.
return status, nil
}
// TODO: Is this necessary?
c.Delete(ctx, &cloudrpc.JobName{Name: job.Name, Version: job.Version})

if err := c.stageInputs(ctx, job); err != nil {
return nil, err
}
@@ -107,20 +119,11 @@ func (c *Client) RunJob(ctx context.Context, job *cloudrpc.JobSpec) (*cloudrpc.J
k8sJob := createJob(userJobName(user, job.Name), job.Cmd, job.Args, c.Image, core.ResourceList{
core.ResourceMemory: resource.MustParse(fmt.Sprintf("%dGi", job.MemoryGB)),
})
k8sJobResult, err := c.jobControl.Create(k8sJob)
_, err = c.jobControl.Create(k8sJob)
if err != nil {
return nil, err
}
return c.jobStatus(k8sJobResult)
}

// Status returns the status of the given job.
func (c *Client) Status(ctx context.Context, job *cloudrpc.JobName) (*cloudrpc.JobStatus, error) {
k8sJob, err := c.getk8sJob(ctx, job)
if err != nil {
return nil, err
}
return c.jobStatus(k8sJob)
return c.Status(ctx, &cloudrpc.JobName{Name: job.Name, Version: job.Version})
}

// Delete deletes the given job.
@@ -167,10 +170,36 @@ func userJobName(user, name string) string {
return strings.Replace(user, "_", "-", -1) + "-" + strings.Replace(name, "_", "-", -1)
}

func (c *Client) jobStatus(j *batch.Job) (*cloudrpc.JobStatus, error) {
return &cloudrpc.JobStatus{
Status: j.Status.String(),
}, nil
// Status returns the status of the given job.
func (c *Client) Status(ctx context.Context, job *cloudrpc.JobName) (*cloudrpc.JobStatus, error) {
s := new(cloudrpc.JobStatus)
/*k8sJob, err := c.getk8sJob(ctx, job)
if err != nil {
return &cloudrpc.JobStatus{
Status: cloudrpc.Status_Missing,
Message: err.Error(),
}, nil
}
for _, c := range k8sJob.Status.Conditions {
if c.Type == batch.JobComplete && c.Status == core.ConditionTrue {
s.Status = cloudrpc.Status_Complete
s.StartTime = k8sJob.Status.StartTime.Time.Unix()
s.CompletionTime = k8sJob.Status.CompletionTime.Time.Unix()
} else if c.Type == batch.JobFailed && c.Status == core.ConditionTrue {
s.Status = cloudrpc.Status_Failed
}
}
if k8sJob.Status.Active > 0 {
s.Status = cloudrpc.Status_Running
s.StartTime = k8sJob.Status.StartTime.Time.Unix()
}*/
//TODO: err = c.checkOutputs(ctx, name, k8sJob.Spec.Template.Spec.Containers[0].Command)
err := c.checkOutputs(ctx, job.Name, []string{"inmap", "run", "steady"})
if err != nil {
s.Status = cloudrpc.Status_Failed
s.Message = fmt.Sprintf("job completed but the following error occurred when checking outputs: %s", err)
}
return s, nil
}

// createJob creates a Kubernetes job specification with the given name that executes the
4 changes: 2 additions & 2 deletions cloud/client_test.go
Original file line number Diff line number Diff line change
@@ -96,7 +96,7 @@ func TestClient_fake(t *testing.T) {
t.Fatal(err)
}
wantStatus := &cloudrpc.JobStatus{
Status: "&JobStatus{Conditions:[],StartTime:<nil>,CompletionTime:<nil>,Active:0,Succeeded:0,Failed:0,}",
// Status: "&JobStatus{Conditions:[],StartTime:<nil>,CompletionTime:<nil>,Active:0,Succeeded:0,Failed:0,}",
}
if !reflect.DeepEqual(wantStatus, status) {
t.Errorf("status:\n%+v\n!=\n%+v", status, wantStatus)
@@ -112,7 +112,7 @@ func TestClient_fake(t *testing.T) {
t.Fatal(err)
}
wantStatus := &cloudrpc.JobStatus{
Status: "&JobStatus{Conditions:[],StartTime:<nil>,CompletionTime:<nil>,Active:0,Succeeded:0,Failed:0,}",
// Status: "&JobStatus{Conditions:[],StartTime:<nil>,CompletionTime:<nil>,Active:0,Succeeded:0,Failed:0,}",
}
if !reflect.DeepEqual(wantStatus, status) {
t.Errorf("status:\n%+v\n!=\n%+v", status, wantStatus)
18 changes: 15 additions & 3 deletions cloud/cloud.proto
Original file line number Diff line number Diff line change
@@ -23,8 +23,8 @@ service CloudRPC {
// output file(s).
rpc RunJob(JobSpec) returns (JobStatus) {}

// OutputAddresses returns status and the addresses the output file(s) of the
// requested simulation name.
// Status returns the status of the simulation with the
// requested name.
rpc Status(JobName) returns(JobStatus) {}

// Output returns the output file(s) of the
@@ -61,9 +61,21 @@ message JobSpec {
map<string,bytes> FileData = 7;
}

enum Status {
Complete = 0;
Failed = 1;
Missing = 2;
Running = 3;
}

message JobStatus {
// Status holds the current status of the job.
string Status = 1;
Status Status = 1;
string Message = 2;

// Unix time, the number of seconds elapsed since January 1, 1970 UTC
int64 StartTime = 3;
int64 CompletionTime = 4;
}

message JobOutput {
133 changes: 97 additions & 36 deletions cloud/cloudrpc/cloud.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

86 changes: 79 additions & 7 deletions cloud/cloudrpc/cloudrpcgojs/cloud.pb.gopherjs.go
28 changes: 28 additions & 0 deletions cloud/config.go
Original file line number Diff line number Diff line change
@@ -59,6 +59,34 @@ func (c *Client) jobOutputAddresses(ctx context.Context, name string, cmd []stri
return o, nil
}

func (c *Client) checkOutputs(ctx context.Context, name string, cmd []string) error {
addrs, err := c.jobOutputAddresses(ctx, name, cmd)
if err != nil {
return err
}
bucket, err := OpenBucket(ctx, c.bucketName)
if err != nil {
return err
}
for _, addr := range addrs {
for _, fname := range expandShp(addr) {
url, err := url.Parse(fname)
if err != nil {
return err
}
r, err := bucket.NewReader(ctx, strings.TrimLeft(url.Path, "/"))
if err != nil {
return err
}
if r.Size() == 0 {

}
r.Close()
}
}
return nil
}

// setOutputPaths changes the paths of the output files in the given
// job specification so that they match
// the locations where the files should be stored.
3 changes: 3 additions & 0 deletions cloud/jobspec.go
Original file line number Diff line number Diff line change
@@ -181,6 +181,9 @@ func fileContentsAndSum(filePath string) ([]byte, string, error) {
if _, err := io.Copy(&dst, src); err != nil {
return nil, "", err
}
if err := src.Close(); err != nil {
return nil, "", err
}
sumBytes := sha256.Sum256(dst.Bytes())
return dst.Bytes(), fmt.Sprintf("%x", sumBytes[0:sha256.Size]), nil
}
10 changes: 7 additions & 3 deletions inmaputil/cloud_test.go
Original file line number Diff line number Diff line change
@@ -22,9 +22,11 @@ import (
"context"
"io/ioutil"
"os"
"reflect"
"testing"

"github.com/spatialmodel/inmap/cloud"
"github.com/spatialmodel/inmap/cloud/cloudrpc"
)

func TestCloud(t *testing.T) {
@@ -50,9 +52,11 @@ func TestCloud(t *testing.T) {
t.Fatal(err)
}

wantStatus := "&JobStatus{Conditions:[],StartTime:<nil>,CompletionTime:<nil>,Active:0,Succeeded:0,Failed:0,}"
if status.Status != wantStatus {
t.Errorf("wrong status: %s != %s", status.Status, wantStatus)
wantStatus := &cloudrpc.JobStatus{
// Status: "&JobStatus{Conditions:[],StartTime:<nil>,CompletionTime:<nil>,Active:0,Succeeded:0,Failed:0,}",
}
if !reflect.DeepEqual(status, wantStatus) {
t.Errorf("wrong status: %v != %v", status, wantStatus)
}
})

0 comments on commit 0cbe9d2

Please sign in to comment.