Skip to content

Commit

Permalink
Add batch status API
Browse files Browse the repository at this point in the history
Currently the status of a completed or failed batch is held in the
memory, a simple restart will lose the status and the user will not
have any visibility of the job that was long running.

In addition to the metrics, add a new API that reads the batch status
from the drives. A batch job will be cleaned up three days after
completion.

Also add the batch type in the batch id, the reason is that the batch
job request is removed immediately when the job is finished, then we
do not know the type of batch job anymore, hence a difficulty to locate
the job report
  • Loading branch information
Anis Eleuch committed May 7, 2024
1 parent b413ff9 commit 91d3a99
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 42 deletions.
3 changes: 3 additions & 0 deletions cmd/admin-router.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,9 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) {
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/list-jobs").HandlerFunc(
adminMiddleware(adminAPI.ListBatchJobs))

adminRouter.Methods(http.MethodGet).Path(adminVersion + "/status-job").HandlerFunc(
adminMiddleware(adminAPI.BatchJobStatus))

adminRouter.Methods(http.MethodGet).Path(adminVersion + "/describe-job").HandlerFunc(
adminMiddleware(adminAPI.DescribeBatchJob))
adminRouter.Methods(http.MethodDelete).Path(adminVersion + "/cancel-job").HandlerFunc(
Expand Down
2 changes: 1 addition & 1 deletion cmd/batch-expire.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo
JobType: string(job.Type()),
StartTime: job.Started,
}
if err := ri.load(ctx, api, job); err != nil {
if err := ri.loadOrInit(ctx, api, job); err != nil {
return err
}

Expand Down
218 changes: 180 additions & 38 deletions cmd/batch-handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"math/rand"
"net/http"
"net/url"
"path/filepath"
"runtime"
"strconv"
"strings"
Expand Down Expand Up @@ -57,6 +58,11 @@ import (

var globalBatchConfig batch.Config

const (
// Keep the completed/falied job stats 3 days before removing it
oldJobsExpiration = 3 * 24 * time.Hour
)

// BatchJobRequest this is an internal data structure not for external consumption.
type BatchJobRequest struct {
ID string `yaml:"-" json:"name"`
Expand Down Expand Up @@ -262,7 +268,7 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay
JobType: string(job.Type()),
StartTime: job.Started,
}
if err := ri.load(ctx, api, job); err != nil {
if err := ri.loadOrInit(ctx, api, job); err != nil {
return err
}
if ri.Complete {
Expand Down Expand Up @@ -722,60 +728,83 @@ const (
batchReplJobDefaultRetryDelay = 250 * time.Millisecond
)

func getJobReportPath(job BatchJobRequest) string {
func getJobPath(job BatchJobRequest) string {
return pathJoin(batchJobPrefix, job.ID)
}

func (ri *batchJobInfo) getJobReportPath() (string, error) {
var fileName string
switch {
case job.Replicate != nil:
switch madmin.BatchJobType(ri.JobType) {
case madmin.BatchJobReplicate:
fileName = batchReplName
case job.KeyRotate != nil:
case madmin.BatchJobKeyRotate:
fileName = batchKeyRotationName
case job.Expire != nil:
case madmin.BatchJobExpire:
fileName = batchExpireName
default:
return "", fmt.Errorf("unknown job type: %v", ri.JobType)
}
return pathJoin(batchJobReportsPrefix, job.ID, fileName)
return pathJoin(batchJobReportsPrefix, ri.JobID, fileName), nil
}

func getJobPath(job BatchJobRequest) string {
return pathJoin(batchJobPrefix, job.ID)
func (ri *batchJobInfo) loadOrInit(ctx context.Context, api ObjectLayer, job BatchJobRequest) error {
err := ri.load(ctx, api, job)
if errors.Is(err, errNoSuchJob) {
switch {
case job.Replicate != nil:
ri.Version = batchReplVersionV1
ri.RetryAttempts = batchReplJobDefaultRetries
if job.Replicate.Flags.Retry.Attempts > 0 {
ri.RetryAttempts = job.Replicate.Flags.Retry.Attempts
}
case job.KeyRotate != nil:
ri.Version = batchKeyRotateVersionV1
ri.RetryAttempts = batchKeyRotateJobDefaultRetries
if job.KeyRotate.Flags.Retry.Attempts > 0 {
ri.RetryAttempts = job.KeyRotate.Flags.Retry.Attempts
}
case job.Expire != nil:
ri.Version = batchExpireVersionV1
ri.RetryAttempts = batchExpireJobDefaultRetries
if job.Expire.Retry.Attempts > 0 {
ri.RetryAttempts = job.Expire.Retry.Attempts
}
}
return nil
}
return err
}

func (ri *batchJobInfo) load(ctx context.Context, api ObjectLayer, job BatchJobRequest) error {
path, err := job.getJobReportPath()
if err != nil {
batchLogIf(ctx, err)
return err
}
return ri.loadByPath(ctx, api, path)
}

func (ri *batchJobInfo) loadByPath(ctx context.Context, api ObjectLayer, path string) error {
var format, version uint16
switch {
case job.Replicate != nil:
switch filepath.Base(path) {
case batchReplName:
version = batchReplVersionV1
format = batchReplFormat
case job.KeyRotate != nil:
case batchKeyRotationName:
version = batchKeyRotateVersionV1
format = batchKeyRotationFormat
case job.Expire != nil:
case batchExpireName:
version = batchExpireVersionV1
format = batchExpireFormat
default:
return errors.New("no supported batch job request specified")
}
data, err := readConfig(ctx, api, getJobReportPath(job))

data, err := readConfig(ctx, api, path)
fmt.Println(path, err)
if err != nil {
if errors.Is(err, errConfigNotFound) || isErrObjectNotFound(err) {
ri.Version = int(version)
switch {
case job.Replicate != nil:
ri.RetryAttempts = batchReplJobDefaultRetries
if job.Replicate.Flags.Retry.Attempts > 0 {
ri.RetryAttempts = job.Replicate.Flags.Retry.Attempts
}
case job.KeyRotate != nil:
ri.RetryAttempts = batchKeyRotateJobDefaultRetries
if job.KeyRotate.Flags.Retry.Attempts > 0 {
ri.RetryAttempts = job.KeyRotate.Flags.Retry.Attempts
}
case job.Expire != nil:
ri.RetryAttempts = batchExpireJobDefaultRetries
if job.Expire.Retry.Attempts > 0 {
ri.RetryAttempts = job.Expire.Retry.Attempts
}
}
return nil
return errNoSuchJob
}
return err
}
Expand Down Expand Up @@ -919,7 +948,12 @@ func (ri *batchJobInfo) updateAfter(ctx context.Context, api ObjectLayer, durati
if err != nil {
return err
}
return saveConfig(ctx, api, getJobReportPath(job), buf)
path, err := ri.getJobReportPath()
if err != nil {
batchLogIf(ctx, err)
return err
}
return saveConfig(ctx, api, path, buf)
}
ri.mu.Unlock()
return nil
Expand Down Expand Up @@ -971,7 +1005,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
JobType: string(job.Type()),
StartTime: job.Started,
}
if err := ri.load(ctx, api, job); err != nil {
if err := ri.loadOrInit(ctx, api, job); err != nil {
return err
}
if ri.Complete {
Expand Down Expand Up @@ -1436,10 +1470,24 @@ func (j BatchJobRequest) Validate(ctx context.Context, o ObjectLayer) error {
}

func (j BatchJobRequest) delete(ctx context.Context, api ObjectLayer) {
deleteConfig(ctx, api, getJobReportPath(j))
deleteConfig(ctx, api, getJobPath(j))
}

func (j BatchJobRequest) getJobReportPath() (string, error) {
var fileName string
switch {
case j.Replicate != nil:
fileName = batchReplName
case j.KeyRotate != nil:
fileName = batchKeyRotationName
case j.Expire != nil:
fileName = batchExpireName
default:
return "", errors.New("unknown job type")
}
return pathJoin(batchJobReportsPrefix, j.ID, fileName), nil
}

func (j *BatchJobRequest) save(ctx context.Context, api ObjectLayer) error {
if j.Replicate == nil && j.KeyRotate == nil && j.Expire == nil {
return errInvalidArgument
Expand Down Expand Up @@ -1542,6 +1590,55 @@ func (a adminAPIHandlers) ListBatchJobs(w http.ResponseWriter, r *http.Request)
batchLogIf(ctx, json.NewEncoder(w).Encode(&listResult))
}

// BatchJobStatus - returns the status of a batch job saved in the disk
func (a adminAPIHandlers) BatchJobStatus(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()

objectAPI, _ := validateAdminReq(ctx, w, r, policy.ListBatchJobsAction)
if objectAPI == nil {
return
}

jobID := r.Form.Get("jobId")
if jobID == "" {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, errInvalidArgument), r.URL)
return
}

req := BatchJobRequest{ID: jobID}
if i := strings.Index(jobID, "-"); i > 0 {
switch madmin.BatchJobType(jobID[:i]) {
case madmin.BatchJobReplicate:
req.Replicate = &BatchJobReplicateV1{}
case madmin.BatchJobKeyRotate:
req.KeyRotate = &BatchJobKeyRotateV1{}
case madmin.BatchJobExpire:
req.Expire = &BatchJobExpire{}
default:
writeErrorResponseJSON(ctx, w, toAPIError(ctx, errors.New("job ID format unrecognized")), r.URL)
return
}
}

ri := &batchJobInfo{}
if err := ri.load(ctx, objectAPI, req); err != nil {
if !errors.Is(err, errNoSuchJob) {
batchLogIf(ctx, err)
}
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
return
}

buf, err := json.Marshal(madmin.BatchJobStatus{LastMetric: ri.metric()})
if err != nil {
batchLogIf(ctx, err)
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
return
}

w.Write(buf)
}

var errNoSuchJob = errors.New("no such job")

// DescribeBatchJob returns the currently active batch job definition
Expand Down Expand Up @@ -1633,7 +1730,7 @@ func (a adminAPIHandlers) StartBatchJob(w http.ResponseWriter, r *http.Request)
return
}

job.ID = fmt.Sprintf("%s:%d", shortuuid.New(), GetProxyEndpointLocalIndex(globalProxyEndpoints))
job.ID = fmt.Sprintf("%s-%s:%d", job.Type(), shortuuid.New(), GetProxyEndpointLocalIndex(globalProxyEndpoints))
job.User = user
job.Started = time.Now()

Expand Down Expand Up @@ -1722,9 +1819,54 @@ func newBatchJobPool(ctx context.Context, o ObjectLayer, workers int) *BatchJobP
}
jpool.ResizeWorkers(workers)
jpool.resume()

go jpool.cleanupReports()

return jpool
}

func (j *BatchJobPool) cleanupReports() {
randomWait := func() time.Duration {
// randomWait depends on the number of nodes to avoid triggering the cleanup at the same time
return time.Duration(rand.Float64() * float64(time.Duration(globalEndpoints.NEndpoints())*time.Hour))
}

t := time.NewTimer(randomWait())
defer t.Stop()

for {
select {
case <-GlobalContext.Done():
return
case <-t.C:
results := make(chan itemOrErr[ObjectInfo], 100)
ctx, cancel := context.WithCancel(j.ctx)
defer cancel()
if err := j.objLayer.Walk(ctx, minioMetaBucket, batchJobReportsPrefix, results, WalkOptions{}); err != nil {
batchLogIf(j.ctx, err)
t.Reset(randomWait())
continue
}
for result := range results {
if result.Err != nil {
batchLogIf(j.ctx, result.Err)
continue
}
ri := &batchJobInfo{}
if err := ri.loadByPath(ctx, j.objLayer, result.Item.Name); err != nil {
batchLogIf(ctx, err)
continue
}
if (ri.Complete || ri.Failed) && time.Since(ri.LastUpdate) > oldJobsExpiration {
deleteConfig(ctx, j.objLayer, result.Item.Name)
}
}

t.Reset(randomWait())
}
}
}

func (j *BatchJobPool) resume() {
results := make(chan itemOrErr[ObjectInfo], 100)
ctx, cancel := context.WithCancel(j.ctx)
Expand Down Expand Up @@ -1988,7 +2130,7 @@ func (m *batchJobMetrics) purgeJobMetrics() {
var toDeleteJobMetrics []string
m.RLock()
for id, metrics := range m.metrics {
if time.Since(metrics.LastUpdate) > 24*time.Hour && (metrics.Complete || metrics.Failed) {
if time.Since(metrics.LastUpdate) > oldJobsExpiration && (metrics.Complete || metrics.Failed) {
toDeleteJobMetrics = append(toDeleteJobMetrics, id)
}
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/batch-rotate.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (r *BatchJobKeyRotateV1) Start(ctx context.Context, api ObjectLayer, job Ba
JobType: string(job.Type()),
StartTime: job.Started,
}
if err := ri.load(ctx, api, job); err != nil {
if err := ri.loadOrInit(ctx, api, job); err != nil {
return err
}
if ri.Complete {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -252,3 +252,5 @@ require (
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
)

replace github.com/minio/madmin-go/v3 => github.com/vadmeste/madmin-go/v3 v3.0.0-20240506102820-f788488bde85
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,6 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/minio/kms-go/kes v0.3.0 h1:SU8VGVM/Hk9w1OiSby3OatkcojooUqIdDHl6dtM6NkY=
github.com/minio/kms-go/kes v0.3.0/go.mod h1:w6DeVT878qEOU3nUrYVy1WOT5H1Ig9hbDIh698NYJKY=
github.com/minio/madmin-go/v3 v3.0.51 h1:brGOvDP8KvoHb/bdzCHUPFCbTtrN8o507uPHZpyuinM=
github.com/minio/madmin-go/v3 v3.0.51/go.mod h1:IFAwr0XMrdsLovxAdCcuq/eoL4nRuMVQQv0iubJANQw=
github.com/minio/mc v0.0.0-20240430174448-dcb911bed9d5 h1:VDXLzvY0Jxk4lzIntGXZuw0VH7S1JgQBmjWGkz7xphU=
github.com/minio/mc v0.0.0-20240430174448-dcb911bed9d5/go.mod h1:aOiBeSNmpfJn1yyz+EujrTM+XmUwkXiM69zSXg12VDM=
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
Expand Down Expand Up @@ -650,6 +648,8 @@ github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqri
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/unrolled/secure v1.14.0 h1:u9vJTU/pR4Bny0ntLUMxdfLtmIRGvQf2sEFuA0TG9AE=
github.com/unrolled/secure v1.14.0/go.mod h1:BmF5hyM6tXczk3MpQkFf1hpKSRqCyhqcbiQtiAF7+40=
github.com/vadmeste/madmin-go/v3 v3.0.0-20240506102820-f788488bde85 h1:z952me4F6QDel4xehUm+P9FnGutbrpYVj9VH0uiATog=
github.com/vadmeste/madmin-go/v3 v3.0.0-20240506102820-f788488bde85/go.mod h1:IFAwr0XMrdsLovxAdCcuq/eoL4nRuMVQQv0iubJANQw=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/vbauerster/mpb/v8 v8.7.3 h1:n/mKPBav4FFWp5fH4U0lPpXfiOmCEgl5Yx/NM3tKJA0=
Expand Down

0 comments on commit 91d3a99

Please sign in to comment.