From 5c81e7cba4cb1466b498bc75b6df5da6f152f884 Mon Sep 17 00:00:00 2001 From: "Jeffrey N. Johnson" Date: Wed, 6 Nov 2024 16:52:08 -0800 Subject: [PATCH] Splitting taskType into TransferTask and TransferSubtask types. A transfer task now has one or more subtasks, depending on how many endpoints are involved in a file transfer. --- tasks/tasks.go | 613 ++++++++++++++++++++++++++++++------------------- 1 file changed, 382 insertions(+), 231 deletions(-) diff --git a/tasks/tasks.go b/tasks/tasks.go index 277bad4..60a95dd 100644 --- a/tasks/tasks.go +++ b/tasks/tasks.go @@ -49,6 +49,7 @@ import ( ) // useful type aliases +type Contributor = frictionless.Contributor type Database = databases.Database type DataPackage = frictionless.DataPackage type DataResource = frictionless.DataResource @@ -71,7 +72,7 @@ const ( type Specification struct { // a Markdown description of the transfer task Description string - // the name of destination database from which files are transferred (as + // the name of destination database to which files are transferred (as // specified in the DTS config file) Destination string // machine-readable instructions for processing the payload at its destination @@ -86,140 +87,75 @@ type Specification struct { UserInfo auth.UserInfo } -// this type holds multiple (possibly null) UUIDs corresponding to different +// This type tracks subtasks within a transfer (e.g. files transferred from +// multiple endpoints attached to a single source/destination database pair). +// It holds multiple (possibly null) UUIDs corresponding to different // states in the file transfer lifecycle -type taskType struct { - Canceled bool // set if a cancellation request has been made - CompletionTime time.Time // time at which the transfer completed - Description string // Markdown description of the task - Destination string // name of destination database - DestinationFolder string // folder path to which files are transferred - FileIds []string // IDs of files within Source - Id uuid.UUID // task identifier - Instructions json.RawMessage // machine-readable task processing instructions - Manifest uuid.NullUUID // manifest generation UUID (if any) - ManifestFile string // name of locally-created manifest file - PayloadSize float64 // Size of payload (gigabytes) - Resources []DataResource // Frictionless DataResources for files - Source string // name of source database - Staging uuid.NullUUID // staging UUID (if any) - Status TransferStatus // status of file transfer operation - Transfer uuid.NullUUID // file transfer UUID (if any) - UserInfo auth.UserInfo // info about user requesting transfer -} - -// This error type is returned when a payload is requested that is too large. -type PayloadTooLargeError struct { - size float64 // size of the requested payload in gigabytes -} - -func (e PayloadTooLargeError) Error() string { - return fmt.Sprintf("Requested payload is too large: %g GB (limit is %g GB).", - e.size, config.Service.MaxPayloadSize) -} - -// computes the size of a payload for a transfer task (in gigabytes) -func payloadSize(resources []DataResource) float64 { - var size uint64 - for _, resource := range resources { - size += uint64(resource.Bytes) - } - return float64(size) / float64(1024*1024*1024) -} - -// starts a task going, initiating staging if needed -func (task *taskType) start() error { - source, err := databases.NewDatabase(task.UserInfo.Orcid, task.Source) - if err != nil { - return err - } - - // resolve file paths using file IDs - task.Resources, err = source.Resources(task.FileIds) - if err != nil { - return err - } - - // make sure the size of the payload doesn't exceed our specified limit - task.PayloadSize = payloadSize(task.Resources) // (in GB) - if task.PayloadSize > config.Service.MaxPayloadSize { - return &PayloadTooLargeError{size: task.PayloadSize} - } - +type TransferSubtask struct { + Destination string // name of destination database (in config) + DestinationEndpoint string // name of destination database (in config) + DestinationFolder string // folder path to which files are transferred + Resources []DataResource // Frictionless DataResources for files + Source string // name of source database (in config) + SourceEndpoint string // name of source endpoint (in config) + Staging uuid.NullUUID // staging UUID (if any) + StagingStatus databases.StagingStatus // staging status + Transfer uuid.NullUUID // file transfer UUID (if any) + TransferStatus TransferStatus // status of file transfer operation + UserInfo auth.UserInfo // info about user requesting transfer +} + +func (subtask *TransferSubtask) start() error { // are the files already staged? (only works for public data) - sourceEndpoint, err := source.Endpoint() + sourceEndpoint, err := endpoints.NewEndpoint(subtask.SourceEndpoint) if err != nil { return err } - staged, err := sourceEndpoint.FilesStaged(task.Resources) + staged, err := sourceEndpoint.FilesStaged(subtask.Resources) if err != nil { return err } if staged { - err = task.beginTransfer() + err = subtask.beginTransfer() } else { // tell the source DB to stage the files, stash the task, and return // its new ID - task.Staging.UUID, err = source.StageFiles(task.FileIds) - task.Staging.Valid = true + source, err := databases.NewDatabase(subtask.UserInfo.Orcid, subtask.Source) if err != nil { return err } - task.Status = TransferStatus{ - Code: TransferStatusStaging, - NumFiles: len(task.FileIds), + fileIds := make([]string, len(subtask.Resources)) + for i, resource := range subtask.Resources { + fileIds[i] = resource.Id } - } - return err -} - -// updates the status of a canceled task depending on where it is in its -// lifecycle -func (task *taskType) checkCancellation() error { - if task.Transfer.Valid { - // the task's status is the same as its transfer status - source, err := databases.NewDatabase(task.UserInfo.Orcid, task.Source) + subtask.Staging.UUID, err = source.StageFiles(fileIds) + subtask.Staging.Valid = true if err != nil { return err } - endpoint, err := source.Endpoint() - if err != nil { - return err + subtask.TransferStatus = TransferStatus{ + Code: TransferStatusStaging, + NumFiles: len(subtask.Resources), } - task.Status, err = endpoint.Status(task.Id) - return err - } else { - // at any other point in the lifecycle, terminate the task - task.Status.Code = TransferStatusFailed - task.Status.Message = "Task canceled at user request" } - if task.Completed() { - task.CompletionTime = time.Now() - } - return nil + return err } // initiates a file transfer on a set of staged files -func (task *taskType) beginTransfer() error { - source, err := databases.NewDatabase(task.UserInfo.Orcid, task.Source) +func (subtask *TransferSubtask) beginTransfer() error { + source, err := databases.NewDatabase(subtask.UserInfo.Orcid, subtask.Source) if err != nil { return err } - destination, err := databases.NewDatabase(task.UserInfo.Orcid, task.Destination) + destination, err := databases.NewDatabase(subtask.UserInfo.Orcid, subtask.Destination) if err != nil { return err } - // construct the source/destination file paths - username, err := destination.LocalUser(task.UserInfo.Orcid) - if err != nil { - return err - } - task.DestinationFolder = filepath.Join(username, "dts-"+task.Id.String()) - fileXfers := make([]FileTransfer, len(task.Resources)) - for i, resource := range task.Resources { - destinationPath := filepath.Join(task.DestinationFolder, resource.Path) + fileXfers := make([]FileTransfer, len(subtask.Resources)) + for i, resource := range subtask.Resources { + destinationPath := filepath.Join(subtask.DestinationFolder, resource.Path) fileXfers[i] = FileTransfer{ SourcePath: resource.Path, DestinationPath: destinationPath, @@ -236,53 +172,228 @@ func (task *taskType) beginTransfer() error { if err != nil { return err } - task.Transfer.UUID, err = sourceEndpoint.Transfer(destinationEndpoint, fileXfers) + subtask.Transfer.UUID, err = sourceEndpoint.Transfer(destinationEndpoint, fileXfers) if err != nil { return err } - task.Status = TransferStatus{ + subtask.TransferStatus = TransferStatus{ Code: TransferStatusActive, - NumFiles: len(task.FileIds), + NumFiles: len(subtask.Resources), } - task.Staging = uuid.NullUUID{} - task.Transfer.Valid = true + subtask.Staging = uuid.NullUUID{} + subtask.Transfer.Valid = true return nil } -// checks whether files for a task are finished staging and, if so, +// checks whether files for a subtask are finished staging and, if so, // initiates the transfer process -func (task *taskType) checkStaging() error { - source, err := databases.NewDatabase(task.UserInfo.Orcid, task.Source) +func (subtask *TransferSubtask) checkStaging() error { + source, err := databases.NewDatabase(subtask.UserInfo.Orcid, subtask.Source) if err != nil { return err } // check with the database first to see whether the files are staged - stagingStatus, err := source.StagingStatus(task.Staging.UUID) + subtask.StagingStatus, err = source.StagingStatus(subtask.Staging.UUID) + if err != nil { + return err + } + + if subtask.StagingStatus == databases.StagingStatusSucceeded { // staged! + return subtask.beginTransfer() // move along + } + return nil +} + +// checks whether files for a task are finished transferring and, if so, +// initiates the generation of the file manifest +func (subtask *TransferSubtask) checkTransfer() error { + // has the data transfer completed? + source, err := databases.NewDatabase(subtask.UserInfo.Orcid, subtask.Source) + if err != nil { + return err + } + sourceEndpoint, err := source.Endpoint() + if err != nil { + return err + } + xferStatus, err := sourceEndpoint.Status(subtask.Transfer.UUID) if err != nil { return err } + if xferStatus.Code == TransferStatusSucceeded || + xferStatus.Code == TransferStatusFailed { // transfer finished + subtask.Transfer = uuid.NullUUID{} + } + return nil +} - if stagingStatus == databases.StagingStatusSucceeded { // staged! - return task.beginTransfer() // move along - } else if stagingStatus == databases.StagingStatusFailed { - // staging failed, so cancel the task - task.Cancel() - task.Status.Code = TransferStatusUnknown - task.Status.Message = "task cancelled because of staging failure" +func (subtask *TransferSubtask) cancel() error { + if subtask.Transfer.Valid { // we're transferring + // fetch the source endpoint + endpoint, err := endpoints.NewEndpoint(subtask.SourceEndpoint) + if err != nil { + return err + } + + // request that the task be canceled using its UUID + return endpoint.Cancel(subtask.Transfer.UUID) } return nil } +// updates the status of a canceled subtask depending on where it is in its +// lifecycle +func (subtask *TransferSubtask) checkCancellation() error { + if subtask.Transfer.Valid { + endpoint, err := endpoints.NewEndpoint(subtask.SourceEndpoint) + if err != nil { + return err + } + subtask.TransferStatus, err = endpoint.Status(subtask.Transfer.UUID) + return err + } else { + // at any other point in the lifecycle, terminate the task + subtask.TransferStatus.Code = TransferStatusFailed + subtask.TransferStatus.Message = "Task canceled at user request" + } + return nil +} + +// This type tracks the lifecycle of a file transfer task that copies files from +// a source database to a destination database. A TransferTask can have one or +// more subtasks, depending on how many transfer endpoints are involved. +type TransferTask struct { + Canceled bool // set if a cancellation request has been made + CompletionTime time.Time // time at which the transfer completed + Description string // Markdown description of the task + Destination string // name of destination database (in config) + DestinationFolder string // folder path to which files are transferred + FileIds []string // IDs of all files being transferred + Id uuid.UUID // task identifier + Instructions json.RawMessage // machine-readable task processing instructions + Manifest uuid.NullUUID // manifest generation UUID (if any) + ManifestFile string // name of locally-created manifest file + PayloadSize float64 // Size of payload (gigabytes) + Source string // name of source database (in config) + Status TransferStatus // status of file transfer operation + Subtasks []TransferSubtask // list of constituent file transfer subtasks + UserInfo auth.UserInfo // info about user requesting transfer +} + +// This error type is returned when a payload is requested that is too large. +type PayloadTooLargeError struct { + size float64 // size of the requested payload in gigabytes +} + +func (e PayloadTooLargeError) Error() string { + return fmt.Sprintf("Requested payload is too large: %g GB (limit is %g GB).", + e.size, config.Service.MaxPayloadSize) +} + +// computes the size of a payload for a transfer task (in gigabytes) +func payloadSize(resources []DataResource) float64 { + var size uint64 + for _, resource := range resources { + size += uint64(resource.Bytes) + } + return float64(size) / float64(1024*1024*1024) +} + +// starts a task going, initiating staging if needed +func (task *TransferTask) start() error { + source, err := databases.NewDatabase(task.UserInfo.Orcid, task.Source) + if err != nil { + return err + } + + // resolve resource data using file IDs + resources, err := source.Resources(task.FileIds) + if err != nil { + return err + } + + // make sure the size of the payload doesn't exceed our specified limit + task.PayloadSize = payloadSize(resources) // (in GB) + if task.PayloadSize > config.Service.MaxPayloadSize { + return &PayloadTooLargeError{size: task.PayloadSize} + } + + // map resource data to relevant endpoints and create subtasks + // FIXME: we need to add a feature to databases to support this! + endpointForResource := make([]string, len(resources)) + + // determine the destination endpoint + // FIXME: this conflicts with our redesign!! + destinationEndpoint := config.Databases[task.Destination].Endpoint + + // construct a destination folder name + destination, err := databases.NewDatabase(task.UserInfo.Orcid, task.Destination) + if err != nil { + return err + } + username, err := destination.LocalUser(task.UserInfo.Orcid) + if err != nil { + return err + } + task.DestinationFolder = filepath.Join(username, "dts-"+task.Id.String()) + + // assemble distinct endpoints and create a subtask for each + distinctEndpoints := make(map[string]interface{}) + for _, endpoint := range endpointForResource { + if _, found := distinctEndpoints[endpoint]; !found { + distinctEndpoints[endpoint] = struct{}{} + } + } + task.Subtasks = make([]TransferSubtask, len(distinctEndpoints)) + for endpoint, _ := range distinctEndpoints { + // pick out the files corresponding to the source endpoint + // NOTE: this is slow, but preserves file ID ordering + resourcesForEndpoint := make([]DataResource, 0) + for i, resource := range resources { + if endpointForResource[i] == endpoint { + resourcesForEndpoint = append(resourcesForEndpoint, resource) + } + } + + // set up a subtask for the endpoint + task.Subtasks = append(task.Subtasks, TransferSubtask{ + Destination: task.Destination, + DestinationEndpoint: destinationEndpoint, + DestinationFolder: task.DestinationFolder, + Resources: resourcesForEndpoint, + Source: task.Source, + SourceEndpoint: endpoint, + UserInfo: task.UserInfo, + }) + } + + // start the subtasks + for _, subtask := range task.Subtasks { + subErr := subtask.start() + if subErr != nil { + err = subErr + } + } + return err +} + // creates a DataPackage that serves as the transfer manifest -func (task *taskType) createManifest() frictionless.DataPackage { +func (task *TransferTask) createManifest() DataPackage { + resources := make([]DataResource, 0) + for _, subtask := range task.Subtasks { + n := len(resources) + resources = resources[:len(subtask.Resources)] + copy(resources[n:], subtask.Resources) + } + manifest := DataPackage{ Name: "manifest", - Resources: make([]DataResource, len(task.Resources)), + Resources: resources, Created: time.Now().Format(time.RFC3339), Profile: "data-package", Keywords: []string{"dts", "manifest"}, - Contributors: []frictionless.Contributor{ + Contributors: []Contributor{ { Title: task.UserInfo.Name, Email: task.UserInfo.Email, @@ -293,7 +404,7 @@ func (task *taskType) createManifest() frictionless.DataPackage { Description: task.Description, Instructions: make(json.RawMessage, len(task.Instructions)), } - copy(manifest.Resources, task.Resources) + copy(manifest.Resources, resources) copy(manifest.Instructions, task.Instructions) // strip URL prefixes from resource paths @@ -305,30 +416,150 @@ func (task *taskType) createManifest() frictionless.DataPackage { return manifest } -// checks whether files for a task are finished transferring and, if so, -// initiates the generation of the file manifest -func (task *taskType) checkTransfer() error { - // has the data transfer completed? - source, err := databases.NewDatabase(task.UserInfo.Orcid, task.Source) - if err != nil { - return err - } - sourceEndpoint, err := source.Endpoint() +// checks whether the file manifest for a task has been generated and, if so, +// marks the task as completed +func (task *TransferTask) checkManifest() error { + // has the manifest transfer completed? + localEndpoint, err := endpoints.NewEndpoint(config.Service.Endpoint) if err != nil { return err } - xferStatus, err := sourceEndpoint.Status(task.Transfer.UUID) + xferStatus, err := localEndpoint.Status(task.Manifest.UUID) if err != nil { return err } if xferStatus.Code == TransferStatusSucceeded || - xferStatus.Code == TransferStatusFailed { // transfer finished - task.Transfer = uuid.NullUUID{} - if xferStatus.Code == TransferStatusSucceeded { + xferStatus.Code == TransferStatusFailed { // manifest transferred + task.Manifest = uuid.NullUUID{} + os.Remove(task.ManifestFile) + task.ManifestFile = "" + task.Status.Code = xferStatus.Code + task.Status.Message = "" + task.CompletionTime = time.Now() + } + return nil +} + +// returns the duration since the task completed (successfully or otherwise), +// or 0 if the task has not completed +func (task TransferTask) Age() time.Duration { + if task.Status.Code == TransferStatusSucceeded || + task.Status.Code == TransferStatusFailed { + return time.Since(task.CompletionTime) + } else { + return time.Duration(0) + } +} + +// returns true if the task has completed (successfully or not), false otherwise +func (task TransferTask) completed() bool { + for _, subtask := range task.Subtasks { + if subtask.TransferStatus.Code != TransferStatusSucceeded && + subtask.TransferStatus.Code != TransferStatusFailed { + return false + } + } + return true +} + +// requests that the task be canceled +func (task *TransferTask) cancel() error { + task.Canceled = true // mark as canceled + + for _, subtask := range task.Subtasks { + subtask.cancel() + } + return nil +} + +// updates the state of a subtask, setting its status as necessary +func (subtask *TransferSubtask) update() error { + var err error + if subtask.Staging.Valid { // we're staging + err = subtask.checkStaging() + } else if subtask.Transfer.Valid { // we're transferring + err = subtask.checkTransfer() + } + return err +} + +// updates the state of a task, setting its status as necessary +func (task *TransferTask) update() error { + var err error + if len(task.Subtasks) == 0 { // new task! + err = task.start() + } else if task.Canceled { // cancellation requested + for _, subtask := range task.Subtasks { + err = subtask.checkCancellation() + } + if task.completed() { + task.CompletionTime = time.Now() + } + } else if task.Manifest.Valid { // we're generating/sending a manifest + err = task.checkManifest() + } else { // update subtasks + + // track subtask failures + var subtaskFailed bool + var failedSubtaskStatus TransferStatus + + // update each subtask and check for failures + subtaskStaging := false + allTransfersSucceeded := true + for _, subtask := range task.Subtasks { + subErr := subtask.update() + // FIXME: vvv is this the right thing to do?? vvv + if subErr != nil { + err = subErr + } + + if subtask.StagingStatus == databases.StagingStatusFailed { + subtaskFailed = true + failedSubtaskStatus.Code = TransferStatusUnknown + failedSubtaskStatus.Message = "task cancelled because of staging failure" + } else if subtask.TransferStatus.Code == TransferStatusFailed { + subtaskFailed = true + failedSubtaskStatus.Code = TransferStatusFailed + failedSubtaskStatus.Message = "task cancelled because of transfer failure" + } + if subtask.TransferStatus.Code != TransferStatusSucceeded { + allTransfersSucceeded = false + } + } + + // if a subtask failed, cancel the task -- otherwise, update the task's + // status based on those of its subtasks + if subtaskFailed { + // overwrite only the error code and message fields + task.Status.Code = failedSubtaskStatus.Code + task.Status.Message = failedSubtaskStatus.Message + task.cancel() + } else { + // accumulate statistics + task.Status.NumFiles = 0 + task.Status.NumFilesTransferred = 0 + task.Status.NumFilesSkipped = 0 + for _, subtask := range task.Subtasks { + if subtask.Staging.Valid { + subtaskStaging = true + } else if subtask.Transfer.Valid { + task.Status.NumFiles += subtask.TransferStatus.NumFiles + task.Status.NumFilesTransferred += subtask.TransferStatus.NumFilesTransferred + task.Status.NumFilesSkipped += subtask.TransferStatus.NumFilesSkipped + } + } + } + + if subtaskStaging && task.Status.NumFiles == 0 { + task.Status = TransferStatus{ + Code: TransferStatusStaging, + } + } else if allTransfersSucceeded { // write a manifest localEndpoint, err := endpoints.NewEndpoint(config.Service.Endpoint) if err != nil { return err } + // generate a manifest for the transfer manifest := task.createManifest() @@ -379,118 +610,38 @@ func (task *taskType) checkTransfer() error { task.Status = TransferStatus{ Code: TransferStatusFinalizing, } - task.Transfer.Valid = false task.Manifest.Valid = true } else { // the transfer failed, so make sure we cancel it in case it's still // trying (because e.g. Globus continues trying transfers for ~3 days!!) - task.Cancel() - } - } - return nil -} - -// checks whether the file manifest for a task has been generated and, if so, -// marks the task as completed -func (task *taskType) checkManifest() error { - // has the manifest transfer completed? - localEndpoint, err := endpoints.NewEndpoint(config.Service.Endpoint) - if err != nil { - return err - } - xferStatus, err := localEndpoint.Status(task.Manifest.UUID) - if err != nil { - return err - } - if xferStatus.Code == TransferStatusSucceeded || - xferStatus.Code == TransferStatusFailed { // manifest transferred - task.Manifest = uuid.NullUUID{} - os.Remove(task.ManifestFile) - task.ManifestFile = "" - task.Status.Code = xferStatus.Code - task.Status.Message = "" - task.CompletionTime = time.Now() - } - return nil -} - -// returns the duration since the task completed (successfully or otherwise), -// or 0 if the task has not completed -func (task taskType) Age() time.Duration { - if task.Status.Code == TransferStatusSucceeded || - task.Status.Code == TransferStatusFailed { - return time.Since(task.CompletionTime) - } else { - return time.Duration(0) - } -} - -// returns true if the task has completed (successfully or not), false otherwise -func (task taskType) Completed() bool { - return task.Status.Code == TransferStatusSucceeded || - task.Status.Code == TransferStatusFailed -} - -// requests that the task be canceled -func (task *taskType) Cancel() error { - task.Canceled = true // mark as canceled - - if task.Transfer.Valid { // we're transferring - // fetch the source endpoint - var endpoint endpoints.Endpoint - source, err := databases.NewDatabase(task.UserInfo.Orcid, task.Source) - if err != nil { - return err + task.cancel() } - endpoint, err = source.Endpoint() - if err != nil { - return err - } - // request that the task be canceled using its UUID - return endpoint.Cancel(task.Transfer.UUID) - } - return nil -} - -// updates the state of a task, setting its status as necessary -func (task *taskType) Update() error { - var err error - if task.Resources == nil { // new task! - err = task.start() - } else if task.Canceled { // cancellation requested - err = task.checkCancellation() - } else if task.Staging.Valid { // we're staging - err = task.checkStaging() - } else if task.Transfer.Valid { // we're transferring - err = task.checkTransfer() - } else if task.Manifest.Valid { // we're generating/sending a manifest - err = task.checkManifest() } return err } // loads a map of task IDs to tasks from a previously saved file if available, // or creates an empty map if no such file is available or valid -func createOrLoadTasks(dataFile string) map[uuid.UUID]taskType { +func createOrLoadTasks(dataFile string) map[uuid.UUID]TransferTask { file, err := os.Open(dataFile) if err != nil { - return make(map[uuid.UUID]taskType) + return make(map[uuid.UUID]TransferTask) } slog.Debug(fmt.Sprintf("Found previous tasks in %s.", dataFile)) defer file.Close() enc := gob.NewDecoder(file) - var tasks map[uuid.UUID]taskType + var tasks map[uuid.UUID]TransferTask err = enc.Decode(&tasks) if err != nil { // file not readable slog.Error(fmt.Sprintf("Reading task file %s: %s", dataFile, err.Error())) - return make(map[uuid.UUID]taskType) + return make(map[uuid.UUID]TransferTask) } slog.Debug(fmt.Sprintf("Restored %d tasks from %s", len(tasks), dataFile)) return tasks } // saves a map of task IDs to tasks to the given file -func saveTasks(tasks map[uuid.UUID]taskType, dataFile string) error { +func saveTasks(tasks map[uuid.UUID]TransferTask, dataFile string) error { if len(tasks) > 0 { slog.Debug(fmt.Sprintf("Saving %d tasks to %s", len(tasks), dataFile)) file, err := os.OpenFile(dataFile, os.O_RDWR|os.O_CREATE, 0644) @@ -522,7 +673,7 @@ func saveTasks(tasks map[uuid.UUID]taskType, dataFile string) error { // this type holds various channels used by the task manager to communicate // with its worker goroutine type channelsType struct { - CreateTask chan taskType // used by client to request task creation + CreateTask chan TransferTask // used by client to request task creation CancelTask chan uuid.UUID // used by client to request task cancellation GetTaskStatus chan uuid.UUID // used by client to request task status ReturnTaskId chan uuid.UUID // returns task ID to client @@ -541,7 +692,7 @@ func processTasks() { tasks := createOrLoadTasks(dataStore) // parse the task channels into directional types as needed - var createTaskChan <-chan taskType = taskChannels.CreateTask + var createTaskChan <-chan TransferTask = taskChannels.CreateTask var cancelTaskChan <-chan uuid.UUID = taskChannels.CancelTask var getTaskStatusChan <-chan uuid.UUID = taskChannels.GetTaskStatus var returnTaskIdChan chan<- uuid.UUID = taskChannels.ReturnTaskId @@ -565,7 +716,7 @@ func processTasks() { case taskId := <-cancelTaskChan: // Cancel() called if task, found := tasks[taskId]; found { slog.Info(fmt.Sprintf("Task %s: received cancellation request", taskId.String())) - err := task.Cancel() + err := task.cancel() if err != nil { task.Status.Code = TransferStatusUnknown task.Status.Message = fmt.Sprintf("error in cancellation: %s", err.Error()) @@ -586,9 +737,9 @@ func processTasks() { } case <-pollChan: // time to move things along for taskId, task := range tasks { - if !task.Completed() { + if !task.completed() { oldStatus := task.Status - err := task.Update() + err := task.update() if err != nil { // We log task update errors but do not propagate them. All // task errors result in a failed status. @@ -601,10 +752,10 @@ func processTasks() { switch task.Status.Code { case TransferStatusStaging: slog.Info(fmt.Sprintf("Task %s: staging %d file(s) (%g GB)", - task.Id.String(), len(task.Resources), task.PayloadSize)) + task.Id.String(), len(task.FileIds), task.PayloadSize)) case TransferStatusActive: slog.Info(fmt.Sprintf("Task %s: beginning transfer (%d file(s), %g GB)", - task.Id.String(), len(task.Resources), task.PayloadSize)) + task.Id.String(), len(task.FileIds), task.PayloadSize)) case TransferStatusInactive: slog.Info(fmt.Sprintf("Task %s: suspended transfer", task.Id.String())) case TransferStatusFinalizing: @@ -762,7 +913,7 @@ func Start() error { // allocate channels taskChannels = channelsType{ - CreateTask: make(chan taskType, 32), + CreateTask: make(chan TransferTask, 32), CancelTask: make(chan uuid.UUID, 32), GetTaskStatus: make(chan uuid.UUID, 32), ReturnTaskId: make(chan uuid.UUID, 32), @@ -825,7 +976,7 @@ func Create(spec Specification) (uuid.UUID, error) { } // create a new task and send it along for processing - taskChannels.CreateTask <- taskType{ + taskChannels.CreateTask <- TransferTask{ UserInfo: spec.UserInfo, Source: spec.Source, Destination: spec.Destination,