From 1c4833dbeae42a43d0c913c9eff9a33d463adf66 Mon Sep 17 00:00:00 2001 From: Daniel Gruber Date: Mon, 10 Feb 2025 20:31:09 +0100 Subject: [PATCH] Add qstat -g d (#30) * EH: Watch qacct file; improved types * BF: Fixing go.mod file for simulator * Add qstat -g d --- pkg/qacct/v9.0/file_test.go | 23 ++++---- pkg/qstat/v9.0/parser.go | 108 ++++++++++++++++++++++++++++++++++ pkg/qstat/v9.0/parser_test.go | 43 ++++++++++++++ pkg/qstat/v9.0/qstat.go | 2 + pkg/qstat/v9.0/qstat_impl.go | 12 +++- 5 files changed, 176 insertions(+), 12 deletions(-) diff --git a/pkg/qacct/v9.0/file_test.go b/pkg/qacct/v9.0/file_test.go index d223804..a0464ae 100644 --- a/pkg/qacct/v9.0/file_test.go +++ b/pkg/qacct/v9.0/file_test.go @@ -25,26 +25,27 @@ var _ = Describe("File", func() { It("returns a channel that emits JobDetail objects for 10 jobs", func() { + jobDetailsChan, err := qacct.WatchFile(context.Background(), + qacct.GetDefaultQacctFile(), 0) + Expect(err).NotTo(HaveOccurred()) + Expect(jobDetailsChan).NotTo(BeNil()) + qs, err := qsub.NewCommandLineQSub(qsub.CommandLineQSubConfig{}) Expect(err).NotTo(HaveOccurred()) jobIDs := make([]int, 10) for i := 0; i < 10; i++ { - jobID, _, err := qs.Submit(context.Background(), qsub.JobOptions{ - Command: "echo", - CommandArgs: []string{fmt.Sprintf("job %d", i+1)}, - Binary: qsub.ToPtr(true), - }) + jobID, _, err := qs.Submit(context.Background(), + qsub.JobOptions{ + Command: "/bin/bash", + CommandArgs: []string{"-c", fmt.Sprintf("echo job %d; sleep 0", i+1)}, + Binary: qsub.ToPtr(true), + }) Expect(err).NotTo(HaveOccurred()) log.Printf("jobID: %d", jobID) jobIDs[i] = int(jobID) } - jobDetailsChan, err := qacct.WatchFile(context.Background(), - qacct.GetDefaultQacctFile(), 0) - Expect(err).NotTo(HaveOccurred()) - Expect(jobDetailsChan).NotTo(BeNil()) - receivedJobs := make(map[int]bool) Eventually(func() bool { select { @@ -52,7 +53,7 @@ var _ = Describe("File", func() { log.Printf("job: %+v", jd.JobNumber) // check if jobID is in the jobIDs list if slices.Contains(jobIDs, int(jd.JobNumber)) { - Expect(jd.SubmitCommandLine).To(ContainSubstring("echo 'job")) + Expect(jd.SubmitCommandLine).To(ContainSubstring("bash")) Expect(jd.JobUsage.Usage.Memory).To(BeNumerically(">=", 0)) receivedJobs[int(jd.JobNumber)] = true } diff --git a/pkg/qstat/v9.0/parser.go b/pkg/qstat/v9.0/parser.go index 6e40623..e722399 100644 --- a/pkg/qstat/v9.0/parser.go +++ b/pkg/qstat/v9.0/parser.go @@ -830,3 +830,111 @@ func ParseClusterQueueSummary(out string) ([]ClusterQueueSummary, error) { return summaries, nil } + +/* +qstat -g d +job-ID prior name user state submit/start at queue slots ja-task-ID +----------------------------------------------------------------------------------------------------------------- + + 33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 1 + 33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 3 + 33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 5 + 33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 7 + 33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 25 + 33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 27 + 36 0.60500 sleep root qw 2025-02-10 16:52:21 2 + 37 0.60500 sleep root qw 2025-02-10 16:52:35 2 + 38 0.60500 sleep root qw 2025-02-10 16:52:49 2 + 39 0.60500 sleep root qw 2025-02-10 16:53:23 2 1 + 39 0.60500 sleep root qw 2025-02-10 16:53:23 2 2 + 39 0.60500 sleep root qw 2025-02-10 16:53:23 2 3 + 39 0.60500 sleep root qw 2025-02-10 16:53:23 2 8 + 39 0.60500 sleep root qw 2025-02-10 16:53:23 2 9 + 39 0.60500 sleep root qw 2025-02-10 16:53:23 2 10 + 33 0.50500 sleep root qw 2025-02-10 16:47:18 1 29 + 33 0.50500 sleep root qw 2025-02-10 16:47:18 1 31 + 33 0.50500 sleep root qw 2025-02-10 16:47:18 1 99 + 34 0.50500 sleep root qw 2025-02-10 16:51:51 1 +*/ +func ParseJobArrayTask(out string) ([]JobArrayTask, error) { + lines := strings.Split(out, "\n") + + jobArrayTasks := make([]JobArrayTask, 0, len(lines)-3) + + for _, line := range lines[2:] { + fields := strings.Fields(line) + if len(fields) < 8 { + continue + } + jobID, err := strconv.Atoi(fields[0]) + if err != nil { + return nil, fmt.Errorf("failed to parse jobID: %v", err) + } + priority, err := strconv.ParseFloat(fields[1], 64) + if err != nil { + return nil, fmt.Errorf("failed to parse priority: %v", err) + } + name := fields[2] + user := fields[3] + state := fields[4] + timeString := fields[5] + " " + fields[6] + jobTime, err := time.Parse("2006-01-02 15:04:05", timeString) + if err != nil { + return nil, fmt.Errorf("failed to parse submit time: %v", err) + } + var submitTime time.Time + var startTime time.Time + if strings.Contains(state, "qw") { + startTime = jobTime + } else { + submitTime = jobTime + } + + // if fields[7] is not a number, it is the queue name + var slots int + var taskID int + var queue string + + // when waiting there is no queue name + if slotsInt, err := strconv.Atoi(fields[7]); err != nil { + queue = fields[7] + if len(fields) > 8 { + slots, _ = strconv.Atoi(fields[8]) + } + if len(fields) > 9 { + taskID, _ = strconv.Atoi(fields[9]) + } + } else { + slots = slotsInt + // waiting jobs + if len(fields) > 8 { + slots, _ = strconv.Atoi(fields[8]) + } + if len(fields) > 9 { + taskID, err = strconv.Atoi(fields[9]) + if err != nil { + // a single job and parallel job has no taskID + taskID = 0 + } + } + } + + jobInfo := JobInfo{ + JobID: jobID, + Priority: priority, + Name: name, + User: user, + State: state, + SubmitTime: submitTime, + StartTime: startTime, + Queue: queue, + Slots: slots, + JaTaskIDs: []int64{int64(taskID)}, + } + jobArrayTasks = append(jobArrayTasks, JobArrayTask{ + JobInfo: jobInfo, + }) + + } + return jobArrayTasks, nil +} diff --git a/pkg/qstat/v9.0/parser_test.go b/pkg/qstat/v9.0/parser_test.go index b45f211..d1da3fa 100644 --- a/pkg/qstat/v9.0/parser_test.go +++ b/pkg/qstat/v9.0/parser_test.go @@ -452,4 +452,47 @@ test.q 0.08 0 0 2 2 0 0 }) + Describe("JobArrayTask", func() { + + It("should parse the output of qstat -g d", func() { + input := `job-ID prior name user state submit/start at queue slots ja-task-ID +----------------------------------------------------------------------------------------------------------------- + 33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 1 + 33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 3 + 33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 5 + 33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 23 + 33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 25 + 33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 27 + 36 0.60500 sleep root qw 2025-02-10 16:52:21 2 + 37 0.60500 sleep root qw 2025-02-10 16:52:35 2 + 38 0.60500 sleep root qw 2025-02-10 16:52:49 2 + 39 0.60500 sleep root qw 2025-02-10 16:53:23 2 1 + 39 0.60500 sleep root qw 2025-02-10 16:53:23 2 2 + 33 0.50500 sleep root qw 2025-02-10 16:47:18 1 95 + 33 0.50500 sleep root qw 2025-02-10 16:47:18 1 97 + 33 0.50500 sleep root qw 2025-02-10 16:47:18 1 99 + 34 0.50500 sleep root qw 2025-02-10 16:51:51 1 +` + jobArrayTasks, err := qstat.ParseJobArrayTask(input) + Expect(err).NotTo(HaveOccurred()) + Expect(len(jobArrayTasks)).To(Equal(15)) + + Expect(jobArrayTasks).To(ContainElement(qstat.JobArrayTask{ + JobInfo: qstat.JobInfo{ + JobID: 33, + Priority: 0.505, + Name: "sleep", + User: "root", + State: "r", + SubmitTime: time.Date(2025, 2, 10, 16, 47, 18, 0, time.UTC), + StartTime: time.Time{}, + Queue: "all.q@master", + Slots: 1, + JaTaskIDs: []int64{1}, + }, + })) + }) + + }) + }) diff --git a/pkg/qstat/v9.0/qstat.go b/pkg/qstat/v9.0/qstat.go index 09c569f..4a97977 100644 --- a/pkg/qstat/v9.0/qstat.go +++ b/pkg/qstat/v9.0/qstat.go @@ -45,7 +45,9 @@ type QStat interface { ShowFullOutputWithResources(resourceAttributes string) ([]JobInfo, error) // qstat -g c DisplayClusterQueueSummary() ([]ClusterQueueSummary, error) + // qstat -g d shows all job array tasks individually DisplayAllJobArrayTasks() ([]JobArrayTask, error) + // qstat -g p shows all parallel job tasks individually DisplayAllParallelJobTasks() ([]ParallelJobTask, error) // qstat -help ShowHelp() (string, error) diff --git a/pkg/qstat/v9.0/qstat_impl.go b/pkg/qstat/v9.0/qstat_impl.go index 983b713..f1146e8 100644 --- a/pkg/qstat/v9.0/qstat_impl.go +++ b/pkg/qstat/v9.0/qstat_impl.go @@ -178,8 +178,18 @@ func (q *QStatImpl) DisplayClusterQueueSummary() ([]ClusterQueueSummary, error) return ParseClusterQueueSummary(out) } +// DisplayAllJobArrayTasks is equivalent to "qstat -g d" func (q *QStatImpl) DisplayAllJobArrayTasks() ([]JobArrayTask, error) { - return nil, fmt.Errorf("not implemented") + out, err := q.NativeSpecification([]string{"-g", "d"}) + if err != nil { + return nil, fmt.Errorf("failed to get output of qstat: %w", err) + } + jobArrayTasks, err := ParseJobArrayTask(out) + if err != nil { + return nil, fmt.Errorf("failed to parse job array tasks: %w", err) + } + + return jobArrayTasks, nil } func (q *QStatImpl) DisplayAllParallelJobTasks() ([]ParallelJobTask, error) {