From c651ac98049aad7337a0c560ee57c3422e36f643 Mon Sep 17 00:00:00 2001 From: "Jeffrey N. Johnson" Date: Thu, 7 Nov 2024 09:35:42 -0800 Subject: [PATCH] A bit of reorganization and generalization of database endpoints. --- config/config.go | 6 +- config/database_config.go | 8 +- databases/databases.go | 12 +- databases/jdp/database.go | 20 +- databases/kbase/database.go | 6 - databases/nmdc/database.go | 21 +- frictionless/frictionless.go | 2 + tasks/subtask.go | 203 +++++++++++++ tasks/task.go | 383 +++++++++++++++++++++++++ tasks/tasks.go | 534 ----------------------------------- 10 files changed, 638 insertions(+), 557 deletions(-) create mode 100644 tasks/subtask.go create mode 100644 tasks/task.go diff --git a/config/config.go b/config/config.go index 092f5e12..18956a3f 100644 --- a/config/config.go +++ b/config/config.go @@ -146,8 +146,10 @@ func validateEndpoints(endpoints map[string]endpointConfig) error { func validateDatabases(databases map[string]databaseConfig) error { for name, db := range databases { - if db.Endpoint == "" { - return fmt.Errorf("No endpoint given for database '%s'", name) + if db.Endpoint == "" && len(db.Endpoints) == 0 { + return fmt.Errorf("No endpoints given for database '%s'", name) + } else if db.Endpoint != "" && len(db.Endpoints) > 0 { + return fmt.Errorf("Database '%s' may have EITHER endpoint OR endpoints specified, but not both", name) } } return nil diff --git a/config/database_config.go b/config/database_config.go index 6a15487d..2e0640e5 100644 --- a/config/database_config.go +++ b/config/database_config.go @@ -27,6 +27,10 @@ type databaseConfig struct { Name string `yaml:"name"` // the name of the organization hosting the database Organization string `yaml:"organization"` - // the name of an endpoint for this database - Endpoint string `yaml:"endpoint"` + // if set, the name of the single endpoint available to this database + // (only one of Endpoint and Endpoints may be set) + Endpoint string `yaml:"endpoint,omitempty"` + // if set, a set of endpoints assigned functional names, available to thi + // database (only one of Endpoint and Endpoints may be set) + Endpoints map[string]string `yaml:"endpoints,omitempty"` } diff --git a/databases/databases.go b/databases/databases.go index 0dbe1876..6405f145 100644 --- a/databases/databases.go +++ b/databases/databases.go @@ -27,7 +27,6 @@ import ( "github.com/google/uuid" - "github.com/kbase/dts/endpoints" "github.com/kbase/dts/frictionless" ) @@ -96,8 +95,6 @@ type Database interface { StageFiles(fileIds []string) (uuid.UUID, error) // returns the status of a given staging operation StagingStatus(id uuid.UUID) (StagingStatus, error) - // returns the endpoint associated with this database - Endpoint() (endpoints.Endpoint, error) // returns the local username associated with the given Orcid ID LocalUser(orcid string) (string, error) } @@ -141,6 +138,15 @@ func (e InvalidSearchParameter) Error() string { return fmt.Sprintf("Invalid search parameter for database %s: %s", e.Database, e.Message) } +// this error type is returned when a database's endpoint configuration is invalid +type InvalidEndpointsError struct { + Database, Message string +} + +func (e InvalidEndpointsError) Error() string { + return fmt.Sprintf("Invalid endpoint configuration for database %s: %s", e.Database, e.Message) +} + // we maintain a table of database instances, identified by their names var allDatabases = make(map[string]Database) diff --git a/databases/jdp/database.go b/databases/jdp/database.go index 9b19b6a6..bfed5222 100644 --- a/databases/jdp/database.go +++ b/databases/jdp/database.go @@ -41,7 +41,6 @@ import ( "github.com/kbase/dts/config" "github.com/kbase/dts/credit" "github.com/kbase/dts/databases" - "github.com/kbase/dts/endpoints" "github.com/kbase/dts/frictionless" ) @@ -359,6 +358,14 @@ func NewDatabase(orcid string) (databases.Database, error) { } } + // make sure we are using only a single endpoint + if config.Databases["jdp"].Endpoint == "" { + return nil, databases.InvalidEndpointsError{ + Database: "jdp", + Message: "The JGI data portal should only have a single endpoint configured.", + } + } + return &Database{ Id: "jdp", Orcid: orcid, @@ -641,6 +648,10 @@ func (db *Database) Search(params databases.SearchParameters) (databases.SearchR } func (db *Database) Resources(fileIds []string) ([]frictionless.DataResource, error) { + // the JDP only uses a single Globus endpoint, so we can associate all + // resources with this endpoint + resourceEndpoint := config.Databases["jdp"].Endpoint + // strip the "JDP:" prefix from our files and create a mapping from IDs to // their original order so we can hand back metadata accordingly strippedFileIds := make([]string, len(fileIds)) @@ -728,6 +739,9 @@ func (db *Database) Resources(fileIds []string) ([]frictionless.DataResource, er // NOTE: solution resources[index].Format = formatFromFileName(resources[index].Path) resources[index].MediaType = mimeTypeFromFormatAndTypes(resources[index].Format, []string{}) + + // set the endpoint for the resource + resources[index].Endpoint = resourceEndpoint } return resources, err } @@ -825,10 +839,6 @@ func (db *Database) StagingStatus(id uuid.UUID) (databases.StagingStatus, error) } } -func (db *Database) Endpoint() (endpoints.Endpoint, error) { - return endpoints.NewEndpoint(config.Databases[db.Id].Endpoint) -} - func (db *Database) LocalUser(orcid string) (string, error) { // no current mechanism for this return "localuser", nil diff --git a/databases/kbase/database.go b/databases/kbase/database.go index 4b0bc488..a3367e91 100644 --- a/databases/kbase/database.go +++ b/databases/kbase/database.go @@ -27,9 +27,7 @@ import ( "github.com/google/uuid" "github.com/kbase/dts/auth" - "github.com/kbase/dts/config" "github.com/kbase/dts/databases" - "github.com/kbase/dts/endpoints" "github.com/kbase/dts/frictionless" ) @@ -74,10 +72,6 @@ func (db *Database) StagingStatus(id uuid.UUID) (databases.StagingStatus, error) return databases.StagingStatusUnknown, err } -func (db *Database) Endpoint() (endpoints.Endpoint, error) { - return endpoints.NewEndpoint(config.Databases[db.Id].Endpoint) -} - func (db *Database) LocalUser(orcid string) (string, error) { // for KBase user federation, we rely on a table maintained by our KBase // auth server proxy diff --git a/databases/nmdc/database.go b/databases/nmdc/database.go index 0d7a79e5..5a6adac1 100644 --- a/databases/nmdc/database.go +++ b/databases/nmdc/database.go @@ -39,7 +39,6 @@ import ( "github.com/kbase/dts/config" "github.com/kbase/dts/credit" "github.com/kbase/dts/databases" - "github.com/kbase/dts/endpoints" "github.com/kbase/dts/frictionless" ) @@ -220,6 +219,22 @@ func NewDatabase(orcid string) (databases.Database, error) { } */ + // check for "nersc" and "emsl" Globus endpoints + if config.Databases["nmdc"].Endpoint != "" { + return nil, databases.InvalidEndpointsError{ + Database: "nmdc", + Message: "NMDC requires \"nersc\" and \"emsl\" endpoints to be specified", + } + } + for _, functionalName := range []string{"nersc", "esml"} { + if _, found := config.Databases["nmdc"].Endpoints[functionalName]; !found { + return nil, databases.InvalidEndpointsError{ + Database: "nmdc", + Message: fmt.Sprintf("Could not find \"%s\" endpoint for NMDC database", functionalName), + } + } + } + return &Database{ Id: "nmdc", Orcid: orcid, @@ -811,10 +826,6 @@ func (db *Database) StagingStatus(id uuid.UUID) (databases.StagingStatus, error) return databases.StagingStatusSucceeded, nil } -func (db *Database) Endpoint() (endpoints.Endpoint, error) { - return endpoints.NewEndpoint(config.Databases[db.Id].Endpoint) -} - func (db *Database) LocalUser(orcid string) (string, error) { // no current mechanism for this return "localuser", nil diff --git a/frictionless/frictionless.go b/frictionless/frictionless.go index 90300e99..b0fad1f5 100644 --- a/frictionless/frictionless.go +++ b/frictionless/frictionless.go @@ -98,6 +98,8 @@ type DataResource struct { Sources []DataSource `json:"sources,omitempty"` // a title or label for the resource (optional) Title string `json:"title,omitempty"` + // the name of the endpoint at which this resource is accessed (not exposed to JSON) + Endpoint string } // call this to get a string containing the name of the hashing algorithm used diff --git a/tasks/subtask.go b/tasks/subtask.go new file mode 100644 index 00000000..c80b249b --- /dev/null +++ b/tasks/subtask.go @@ -0,0 +1,203 @@ +// Copyright (c) 2023 The KBase Project and its Contributors +// Copyright (c) 2023 Cohere Consulting, LLC +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +// of the Software, and to permit persons to whom the Software is furnished to do +// so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package tasks + +import ( + "path/filepath" + + "github.com/google/uuid" + + "github.com/kbase/dts/auth" + "github.com/kbase/dts/databases" + "github.com/kbase/dts/endpoints" +) + +// This type tracks subtasks within a transfer (e.g. files transferred from +// multiple endpoints attached to a single source/destination database pair). +// It holds multiple (possibly null) UUIDs corresponding to different +// states in the file transfer lifecycle +type TransferSubtask struct { + Destination string // name of destination database (in config) + DestinationEndpoint string // name of destination database (in config) + DestinationFolder string // folder path to which files are transferred + Resources []DataResource // Frictionless DataResources for files + Source string // name of source database (in config) + SourceEndpoint string // name of source endpoint (in config) + Staging uuid.NullUUID // staging UUID (if any) + StagingStatus databases.StagingStatus // staging status + Transfer uuid.NullUUID // file transfer UUID (if any) + TransferStatus TransferStatus // status of file transfer operation + UserInfo auth.UserInfo // info about user requesting transfer +} + +func (subtask *TransferSubtask) start() error { + // are the files already staged? (only works for public data) + sourceEndpoint, err := endpoints.NewEndpoint(subtask.SourceEndpoint) + if err != nil { + return err + } + staged, err := sourceEndpoint.FilesStaged(subtask.Resources) + if err != nil { + return err + } + + if staged { + err = subtask.beginTransfer() + } else { + // tell the source DB to stage the files, stash the task, and return + // its new ID + source, err := databases.NewDatabase(subtask.UserInfo.Orcid, subtask.Source) + if err != nil { + return err + } + fileIds := make([]string, len(subtask.Resources)) + for i, resource := range subtask.Resources { + fileIds[i] = resource.Id + } + subtask.Staging.UUID, err = source.StageFiles(fileIds) + subtask.Staging.Valid = true + if err != nil { + return err + } + subtask.TransferStatus = TransferStatus{ + Code: TransferStatusStaging, + NumFiles: len(subtask.Resources), + } + } + return err +} + +// initiates a file transfer on a set of staged files +func (subtask *TransferSubtask) beginTransfer() error { + fileXfers := make([]FileTransfer, len(subtask.Resources)) + for i, resource := range subtask.Resources { + destinationPath := filepath.Join(subtask.DestinationFolder, resource.Path) + fileXfers[i] = FileTransfer{ + SourcePath: resource.Path, + DestinationPath: destinationPath, + Hash: resource.Hash, + } + } + + // initiate the transfer + sourceEndpoint, err := endpoints.NewEndpoint(subtask.SourceEndpoint) + if err != nil { + return err + } + destinationEndpoint, err := endpoints.NewEndpoint(subtask.DestinationEndpoint) + if err != nil { + return err + } + subtask.Transfer.UUID, err = sourceEndpoint.Transfer(destinationEndpoint, fileXfers) + if err != nil { + return err + } + + subtask.TransferStatus = TransferStatus{ + Code: TransferStatusActive, + NumFiles: len(subtask.Resources), + } + subtask.Staging = uuid.NullUUID{} + subtask.Transfer.Valid = true + return nil +} + +// checks whether files for a subtask are finished staging and, if so, +// initiates the transfer process +func (subtask *TransferSubtask) checkStaging() error { + source, err := databases.NewDatabase(subtask.UserInfo.Orcid, subtask.Source) + if err != nil { + return err + } + // check with the database first to see whether the files are staged + subtask.StagingStatus, err = source.StagingStatus(subtask.Staging.UUID) + if err != nil { + return err + } + + if subtask.StagingStatus == databases.StagingStatusSucceeded { // staged! + return subtask.beginTransfer() // move along + } + return nil +} + +// checks whether files for a task are finished transferring and, if so, +// initiates the generation of the file manifest +func (subtask *TransferSubtask) checkTransfer() error { + // has the data transfer completed? + sourceEndpoint, err := endpoints.NewEndpoint(subtask.SourceEndpoint) + if err != nil { + return err + } + xferStatus, err := sourceEndpoint.Status(subtask.Transfer.UUID) + if err != nil { + return err + } + if xferStatus.Code == TransferStatusSucceeded || + xferStatus.Code == TransferStatusFailed { // transfer finished + subtask.Transfer = uuid.NullUUID{} + } + return nil +} + +func (subtask *TransferSubtask) cancel() error { + if subtask.Transfer.Valid { // we're transferring + // fetch the source endpoint + endpoint, err := endpoints.NewEndpoint(subtask.SourceEndpoint) + if err != nil { + return err + } + + // request that the task be canceled using its UUID + return endpoint.Cancel(subtask.Transfer.UUID) + } + return nil +} + +// updates the status of a canceled subtask depending on where it is in its +// lifecycle +func (subtask *TransferSubtask) checkCancellation() error { + if subtask.Transfer.Valid { + endpoint, err := endpoints.NewEndpoint(subtask.SourceEndpoint) + if err != nil { + return err + } + subtask.TransferStatus, err = endpoint.Status(subtask.Transfer.UUID) + return err + } else { + // at any other point in the lifecycle, terminate the task + subtask.TransferStatus.Code = TransferStatusFailed + subtask.TransferStatus.Message = "Task canceled at user request" + } + return nil +} + +// updates the state of a subtask, setting its status as necessary +func (subtask *TransferSubtask) update() error { + var err error + if subtask.Staging.Valid { // we're staging + err = subtask.checkStaging() + } else if subtask.Transfer.Valid { // we're transferring + err = subtask.checkTransfer() + } + return err +} diff --git a/tasks/task.go b/tasks/task.go new file mode 100644 index 00000000..95aa8822 --- /dev/null +++ b/tasks/task.go @@ -0,0 +1,383 @@ +// Copyright (c) 2023 The KBase Project and its Contributors +// Copyright (c) 2023 Cohere Consulting, LLC +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +// of the Software, and to permit persons to whom the Software is furnished to do +// so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package tasks + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "regexp" + "time" + + "github.com/google/uuid" + + "github.com/kbase/dts/auth" + "github.com/kbase/dts/config" + "github.com/kbase/dts/databases" + "github.com/kbase/dts/endpoints" +) + +// This type tracks the lifecycle of a file transfer task that copies files from +// a source database to a destination database. A TransferTask can have one or +// more subtasks, depending on how many transfer endpoints are involved. +type TransferTask struct { + Canceled bool // set if a cancellation request has been made + CompletionTime time.Time // time at which the transfer completed + Description string // Markdown description of the task + Destination string // name of destination database (in config) + DestinationFolder string // folder path to which files are transferred + FileIds []string // IDs of all files being transferred + Id uuid.UUID // task identifier + Instructions json.RawMessage // machine-readable task processing instructions + Manifest uuid.NullUUID // manifest generation UUID (if any) + ManifestFile string // name of locally-created manifest file + PayloadSize float64 // Size of payload (gigabytes) + Source string // name of source database (in config) + Status TransferStatus // status of file transfer operation + Subtasks []TransferSubtask // list of constituent file transfer subtasks + UserInfo auth.UserInfo // info about user requesting transfer +} + +// This error type is returned when a payload is requested that is too large. +type PayloadTooLargeError struct { + size float64 // size of the requested payload in gigabytes +} + +func (e PayloadTooLargeError) Error() string { + return fmt.Sprintf("Requested payload is too large: %g GB (limit is %g GB).", + e.size, config.Service.MaxPayloadSize) +} + +// computes the size of a payload for a transfer task (in gigabytes) +func payloadSize(resources []DataResource) float64 { + var size uint64 + for _, resource := range resources { + size += uint64(resource.Bytes) + } + return float64(size) / float64(1024*1024*1024) +} + +// starts a task going, initiating staging if needed +func (task *TransferTask) start() error { + source, err := databases.NewDatabase(task.UserInfo.Orcid, task.Source) + if err != nil { + return err + } + + // resolve resource data using file IDs + resources, err := source.Resources(task.FileIds) + if err != nil { + return err + } + + // make sure the size of the payload doesn't exceed our specified limit + task.PayloadSize = payloadSize(resources) // (in GB) + if task.PayloadSize > config.Service.MaxPayloadSize { + return &PayloadTooLargeError{size: task.PayloadSize} + } + + // determine the destination endpoint + // FIXME: this conflicts with our redesign!! + destinationEndpoint := config.Databases[task.Destination].Endpoint + + // construct a destination folder name + destination, err := databases.NewDatabase(task.UserInfo.Orcid, task.Destination) + if err != nil { + return err + } + username, err := destination.LocalUser(task.UserInfo.Orcid) + if err != nil { + return err + } + task.DestinationFolder = filepath.Join(username, "dts-"+task.Id.String()) + + // assemble distinct endpoints and create a subtask for each + distinctEndpoints := make(map[string]interface{}) + for _, resource := range resources { + if _, found := distinctEndpoints[resource.Endpoint]; !found { + distinctEndpoints[resource.Endpoint] = struct{}{} + } + } + task.Subtasks = make([]TransferSubtask, len(distinctEndpoints)) + for endpoint := range distinctEndpoints { + // pick out the files corresponding to the source endpoint + // NOTE: this is slow, but preserves file ID ordering + resourcesForEndpoint := make([]DataResource, 0) + for _, resource := range resources { + if resource.Endpoint == endpoint { + resourcesForEndpoint = append(resourcesForEndpoint, resource) + } + } + + // set up a subtask for the endpoint + task.Subtasks = append(task.Subtasks, TransferSubtask{ + Destination: task.Destination, + DestinationEndpoint: destinationEndpoint, + DestinationFolder: task.DestinationFolder, + Resources: resourcesForEndpoint, + Source: task.Source, + SourceEndpoint: endpoint, + UserInfo: task.UserInfo, + }) + } + + // start the subtasks + for _, subtask := range task.Subtasks { + subErr := subtask.start() + if subErr != nil { + err = subErr + } + } + return err +} + +// creates a DataPackage that serves as the transfer manifest +func (task *TransferTask) createManifest() DataPackage { + resources := make([]DataResource, 0) + for _, subtask := range task.Subtasks { + n := len(resources) + resources = resources[:len(subtask.Resources)] + copy(resources[n:], subtask.Resources) + } + + manifest := DataPackage{ + Name: "manifest", + Resources: resources, + Created: time.Now().Format(time.RFC3339), + Profile: "data-package", + Keywords: []string{"dts", "manifest"}, + Contributors: []Contributor{ + { + Title: task.UserInfo.Name, + Email: task.UserInfo.Email, + Role: "author", + Organization: task.UserInfo.Organization, + }, + }, + Description: task.Description, + Instructions: make(json.RawMessage, len(task.Instructions)), + } + copy(manifest.Resources, resources) + copy(manifest.Instructions, task.Instructions) + + // strip URL prefixes from resource paths + hostPattern, _ := regexp.Compile(`^https:\/\/.+\/`) + for i, resource := range manifest.Resources { + manifest.Resources[i].Path = hostPattern.ReplaceAllLiteralString(resource.Path, "") + } + + return manifest +} + +// checks whether the file manifest for a task has been generated and, if so, +// marks the task as completed +func (task *TransferTask) checkManifest() error { + // has the manifest transfer completed? + localEndpoint, err := endpoints.NewEndpoint(config.Service.Endpoint) + if err != nil { + return err + } + xferStatus, err := localEndpoint.Status(task.Manifest.UUID) + if err != nil { + return err + } + if xferStatus.Code == TransferStatusSucceeded || + xferStatus.Code == TransferStatusFailed { // manifest transferred + task.Manifest = uuid.NullUUID{} + os.Remove(task.ManifestFile) + task.ManifestFile = "" + task.Status.Code = xferStatus.Code + task.Status.Message = "" + task.CompletionTime = time.Now() + } + return nil +} + +// returns the duration since the task completed (successfully or otherwise), +// or 0 if the task has not completed +func (task TransferTask) Age() time.Duration { + if task.Status.Code == TransferStatusSucceeded || + task.Status.Code == TransferStatusFailed { + return time.Since(task.CompletionTime) + } else { + return time.Duration(0) + } +} + +// returns true if the task has completed (successfully or not), false otherwise +func (task TransferTask) completed() bool { + for _, subtask := range task.Subtasks { + if subtask.TransferStatus.Code != TransferStatusSucceeded && + subtask.TransferStatus.Code != TransferStatusFailed { + return false + } + } + return true +} + +// requests that the task be canceled +func (task *TransferTask) cancel() error { + task.Canceled = true // mark as canceled + for _, subtask := range task.Subtasks { // cancel subtasks + subtask.cancel() + } + return nil +} + +// updates the state of a task, setting its status as necessary +func (task *TransferTask) update() error { + var err error + if len(task.Subtasks) == 0 { // new task! + err = task.start() + } else if task.Canceled { // cancellation requested + for _, subtask := range task.Subtasks { + err = subtask.checkCancellation() + } + if task.completed() { + task.CompletionTime = time.Now() + } + } else if task.Manifest.Valid { // we're generating/sending a manifest + err = task.checkManifest() + } else { // update subtasks + + // track subtask failures + var subtaskFailed bool + var failedSubtaskStatus TransferStatus + + // update each subtask and check for failures + subtaskStaging := false + allTransfersSucceeded := true + for _, subtask := range task.Subtasks { + subErr := subtask.update() + // FIXME: vvv is this the right thing to do?? vvv + if subErr != nil { + err = subErr + } + + if subtask.StagingStatus == databases.StagingStatusFailed { + subtaskFailed = true + failedSubtaskStatus.Code = TransferStatusUnknown + failedSubtaskStatus.Message = "task cancelled because of staging failure" + } else if subtask.TransferStatus.Code == TransferStatusFailed { + subtaskFailed = true + failedSubtaskStatus.Code = TransferStatusFailed + failedSubtaskStatus.Message = "task cancelled because of transfer failure" + } + if subtask.TransferStatus.Code != TransferStatusSucceeded { + allTransfersSucceeded = false + } + } + + // if a subtask failed, cancel the task -- otherwise, update the task's + // status based on those of its subtasks + if subtaskFailed { + // overwrite only the error code and message fields + task.Status.Code = failedSubtaskStatus.Code + task.Status.Message = failedSubtaskStatus.Message + task.cancel() + } else { + // accumulate statistics + task.Status.NumFiles = 0 + task.Status.NumFilesTransferred = 0 + task.Status.NumFilesSkipped = 0 + for _, subtask := range task.Subtasks { + if subtask.Staging.Valid { + subtaskStaging = true + } else if subtask.Transfer.Valid { + task.Status.NumFiles += subtask.TransferStatus.NumFiles + task.Status.NumFilesTransferred += subtask.TransferStatus.NumFilesTransferred + task.Status.NumFilesSkipped += subtask.TransferStatus.NumFilesSkipped + } + } + } + + if subtaskStaging && task.Status.NumFiles == 0 { + task.Status = TransferStatus{ + Code: TransferStatusStaging, + } + } else if allTransfersSucceeded { // write a manifest + localEndpoint, err := endpoints.NewEndpoint(config.Service.Endpoint) + if err != nil { + return err + } + + // generate a manifest for the transfer + manifest := task.createManifest() + + // write the manifest to disk and begin transferring it to the + // destination endpoint + var manifestBytes []byte + manifestBytes, err = json.Marshal(manifest) + if err != nil { + return fmt.Errorf("marshalling manifest content: %s", err.Error()) + } + task.ManifestFile = filepath.Join(config.Service.ManifestDirectory, + fmt.Sprintf("manifest-%s.json", task.Id.String())) + manifestFile, err := os.Create(task.ManifestFile) + if err != nil { + return fmt.Errorf("creating manifest file: %s", err.Error()) + } + _, err = manifestFile.Write(manifestBytes) + if err != nil { + return fmt.Errorf("writing manifest file content: %s", err.Error()) + } + err = manifestFile.Close() + if err != nil { + return fmt.Errorf("closing manifest file: %s", err.Error()) + } + + // construct the source/destination file manifest paths + fileXfers := []FileTransfer{ + { + SourcePath: task.ManifestFile, + DestinationPath: filepath.Join(task.DestinationFolder, "manifest.json"), + }, + } + + // begin transferring the manifest + // FIXME: how do we determine the database's destination endpoint? + destinationEndpointName, found := config.Databases[task.Destination].Endpoints["destination"] + if !found { + destinationEndpointName = config.Databases[task.Destination].Endpoint + } + destinationEndpoint, err := endpoints.NewEndpoint(destinationEndpointName) + if err != nil { + return err + } + task.Manifest.UUID, err = localEndpoint.Transfer(destinationEndpoint, fileXfers) + if err != nil { + return fmt.Errorf("transferring manifest file: %s", err.Error()) + } + + task.Status = TransferStatus{ + Code: TransferStatusFinalizing, + } + task.Manifest.Valid = true + } else { + // the transfer failed, so make sure we cancel it in case it's still + // trying (because e.g. Globus continues trying transfers for ~3 days!!) + task.cancel() + } + } + return err +} diff --git a/tasks/tasks.go b/tasks/tasks.go index 60a95dd7..f9c30bf2 100644 --- a/tasks/tasks.go +++ b/tasks/tasks.go @@ -31,7 +31,6 @@ import ( "log/slog" "os" "path/filepath" - "regexp" "time" "github.com/google/uuid" @@ -87,539 +86,6 @@ type Specification struct { UserInfo auth.UserInfo } -// This type tracks subtasks within a transfer (e.g. files transferred from -// multiple endpoints attached to a single source/destination database pair). -// It holds multiple (possibly null) UUIDs corresponding to different -// states in the file transfer lifecycle -type TransferSubtask struct { - Destination string // name of destination database (in config) - DestinationEndpoint string // name of destination database (in config) - DestinationFolder string // folder path to which files are transferred - Resources []DataResource // Frictionless DataResources for files - Source string // name of source database (in config) - SourceEndpoint string // name of source endpoint (in config) - Staging uuid.NullUUID // staging UUID (if any) - StagingStatus databases.StagingStatus // staging status - Transfer uuid.NullUUID // file transfer UUID (if any) - TransferStatus TransferStatus // status of file transfer operation - UserInfo auth.UserInfo // info about user requesting transfer -} - -func (subtask *TransferSubtask) start() error { - // are the files already staged? (only works for public data) - sourceEndpoint, err := endpoints.NewEndpoint(subtask.SourceEndpoint) - if err != nil { - return err - } - staged, err := sourceEndpoint.FilesStaged(subtask.Resources) - if err != nil { - return err - } - - if staged { - err = subtask.beginTransfer() - } else { - // tell the source DB to stage the files, stash the task, and return - // its new ID - source, err := databases.NewDatabase(subtask.UserInfo.Orcid, subtask.Source) - if err != nil { - return err - } - fileIds := make([]string, len(subtask.Resources)) - for i, resource := range subtask.Resources { - fileIds[i] = resource.Id - } - subtask.Staging.UUID, err = source.StageFiles(fileIds) - subtask.Staging.Valid = true - if err != nil { - return err - } - subtask.TransferStatus = TransferStatus{ - Code: TransferStatusStaging, - NumFiles: len(subtask.Resources), - } - } - return err -} - -// initiates a file transfer on a set of staged files -func (subtask *TransferSubtask) beginTransfer() error { - source, err := databases.NewDatabase(subtask.UserInfo.Orcid, subtask.Source) - if err != nil { - return err - } - destination, err := databases.NewDatabase(subtask.UserInfo.Orcid, subtask.Destination) - if err != nil { - return err - } - - fileXfers := make([]FileTransfer, len(subtask.Resources)) - for i, resource := range subtask.Resources { - destinationPath := filepath.Join(subtask.DestinationFolder, resource.Path) - fileXfers[i] = FileTransfer{ - SourcePath: resource.Path, - DestinationPath: destinationPath, - Hash: resource.Hash, - } - } - - // initiate the transfer - sourceEndpoint, err := source.Endpoint() - if err != nil { - return err - } - destinationEndpoint, err := destination.Endpoint() - if err != nil { - return err - } - subtask.Transfer.UUID, err = sourceEndpoint.Transfer(destinationEndpoint, fileXfers) - if err != nil { - return err - } - - subtask.TransferStatus = TransferStatus{ - Code: TransferStatusActive, - NumFiles: len(subtask.Resources), - } - subtask.Staging = uuid.NullUUID{} - subtask.Transfer.Valid = true - return nil -} - -// checks whether files for a subtask are finished staging and, if so, -// initiates the transfer process -func (subtask *TransferSubtask) checkStaging() error { - source, err := databases.NewDatabase(subtask.UserInfo.Orcid, subtask.Source) - if err != nil { - return err - } - // check with the database first to see whether the files are staged - subtask.StagingStatus, err = source.StagingStatus(subtask.Staging.UUID) - if err != nil { - return err - } - - if subtask.StagingStatus == databases.StagingStatusSucceeded { // staged! - return subtask.beginTransfer() // move along - } - return nil -} - -// checks whether files for a task are finished transferring and, if so, -// initiates the generation of the file manifest -func (subtask *TransferSubtask) checkTransfer() error { - // has the data transfer completed? - source, err := databases.NewDatabase(subtask.UserInfo.Orcid, subtask.Source) - if err != nil { - return err - } - sourceEndpoint, err := source.Endpoint() - if err != nil { - return err - } - xferStatus, err := sourceEndpoint.Status(subtask.Transfer.UUID) - if err != nil { - return err - } - if xferStatus.Code == TransferStatusSucceeded || - xferStatus.Code == TransferStatusFailed { // transfer finished - subtask.Transfer = uuid.NullUUID{} - } - return nil -} - -func (subtask *TransferSubtask) cancel() error { - if subtask.Transfer.Valid { // we're transferring - // fetch the source endpoint - endpoint, err := endpoints.NewEndpoint(subtask.SourceEndpoint) - if err != nil { - return err - } - - // request that the task be canceled using its UUID - return endpoint.Cancel(subtask.Transfer.UUID) - } - return nil -} - -// updates the status of a canceled subtask depending on where it is in its -// lifecycle -func (subtask *TransferSubtask) checkCancellation() error { - if subtask.Transfer.Valid { - endpoint, err := endpoints.NewEndpoint(subtask.SourceEndpoint) - if err != nil { - return err - } - subtask.TransferStatus, err = endpoint.Status(subtask.Transfer.UUID) - return err - } else { - // at any other point in the lifecycle, terminate the task - subtask.TransferStatus.Code = TransferStatusFailed - subtask.TransferStatus.Message = "Task canceled at user request" - } - return nil -} - -// This type tracks the lifecycle of a file transfer task that copies files from -// a source database to a destination database. A TransferTask can have one or -// more subtasks, depending on how many transfer endpoints are involved. -type TransferTask struct { - Canceled bool // set if a cancellation request has been made - CompletionTime time.Time // time at which the transfer completed - Description string // Markdown description of the task - Destination string // name of destination database (in config) - DestinationFolder string // folder path to which files are transferred - FileIds []string // IDs of all files being transferred - Id uuid.UUID // task identifier - Instructions json.RawMessage // machine-readable task processing instructions - Manifest uuid.NullUUID // manifest generation UUID (if any) - ManifestFile string // name of locally-created manifest file - PayloadSize float64 // Size of payload (gigabytes) - Source string // name of source database (in config) - Status TransferStatus // status of file transfer operation - Subtasks []TransferSubtask // list of constituent file transfer subtasks - UserInfo auth.UserInfo // info about user requesting transfer -} - -// This error type is returned when a payload is requested that is too large. -type PayloadTooLargeError struct { - size float64 // size of the requested payload in gigabytes -} - -func (e PayloadTooLargeError) Error() string { - return fmt.Sprintf("Requested payload is too large: %g GB (limit is %g GB).", - e.size, config.Service.MaxPayloadSize) -} - -// computes the size of a payload for a transfer task (in gigabytes) -func payloadSize(resources []DataResource) float64 { - var size uint64 - for _, resource := range resources { - size += uint64(resource.Bytes) - } - return float64(size) / float64(1024*1024*1024) -} - -// starts a task going, initiating staging if needed -func (task *TransferTask) start() error { - source, err := databases.NewDatabase(task.UserInfo.Orcid, task.Source) - if err != nil { - return err - } - - // resolve resource data using file IDs - resources, err := source.Resources(task.FileIds) - if err != nil { - return err - } - - // make sure the size of the payload doesn't exceed our specified limit - task.PayloadSize = payloadSize(resources) // (in GB) - if task.PayloadSize > config.Service.MaxPayloadSize { - return &PayloadTooLargeError{size: task.PayloadSize} - } - - // map resource data to relevant endpoints and create subtasks - // FIXME: we need to add a feature to databases to support this! - endpointForResource := make([]string, len(resources)) - - // determine the destination endpoint - // FIXME: this conflicts with our redesign!! - destinationEndpoint := config.Databases[task.Destination].Endpoint - - // construct a destination folder name - destination, err := databases.NewDatabase(task.UserInfo.Orcid, task.Destination) - if err != nil { - return err - } - username, err := destination.LocalUser(task.UserInfo.Orcid) - if err != nil { - return err - } - task.DestinationFolder = filepath.Join(username, "dts-"+task.Id.String()) - - // assemble distinct endpoints and create a subtask for each - distinctEndpoints := make(map[string]interface{}) - for _, endpoint := range endpointForResource { - if _, found := distinctEndpoints[endpoint]; !found { - distinctEndpoints[endpoint] = struct{}{} - } - } - task.Subtasks = make([]TransferSubtask, len(distinctEndpoints)) - for endpoint, _ := range distinctEndpoints { - // pick out the files corresponding to the source endpoint - // NOTE: this is slow, but preserves file ID ordering - resourcesForEndpoint := make([]DataResource, 0) - for i, resource := range resources { - if endpointForResource[i] == endpoint { - resourcesForEndpoint = append(resourcesForEndpoint, resource) - } - } - - // set up a subtask for the endpoint - task.Subtasks = append(task.Subtasks, TransferSubtask{ - Destination: task.Destination, - DestinationEndpoint: destinationEndpoint, - DestinationFolder: task.DestinationFolder, - Resources: resourcesForEndpoint, - Source: task.Source, - SourceEndpoint: endpoint, - UserInfo: task.UserInfo, - }) - } - - // start the subtasks - for _, subtask := range task.Subtasks { - subErr := subtask.start() - if subErr != nil { - err = subErr - } - } - return err -} - -// creates a DataPackage that serves as the transfer manifest -func (task *TransferTask) createManifest() DataPackage { - resources := make([]DataResource, 0) - for _, subtask := range task.Subtasks { - n := len(resources) - resources = resources[:len(subtask.Resources)] - copy(resources[n:], subtask.Resources) - } - - manifest := DataPackage{ - Name: "manifest", - Resources: resources, - Created: time.Now().Format(time.RFC3339), - Profile: "data-package", - Keywords: []string{"dts", "manifest"}, - Contributors: []Contributor{ - { - Title: task.UserInfo.Name, - Email: task.UserInfo.Email, - Role: "author", - Organization: task.UserInfo.Organization, - }, - }, - Description: task.Description, - Instructions: make(json.RawMessage, len(task.Instructions)), - } - copy(manifest.Resources, resources) - copy(manifest.Instructions, task.Instructions) - - // strip URL prefixes from resource paths - hostPattern, _ := regexp.Compile(`^https:\/\/.+\/`) - for i, resource := range manifest.Resources { - manifest.Resources[i].Path = hostPattern.ReplaceAllLiteralString(resource.Path, "") - } - - return manifest -} - -// checks whether the file manifest for a task has been generated and, if so, -// marks the task as completed -func (task *TransferTask) checkManifest() error { - // has the manifest transfer completed? - localEndpoint, err := endpoints.NewEndpoint(config.Service.Endpoint) - if err != nil { - return err - } - xferStatus, err := localEndpoint.Status(task.Manifest.UUID) - if err != nil { - return err - } - if xferStatus.Code == TransferStatusSucceeded || - xferStatus.Code == TransferStatusFailed { // manifest transferred - task.Manifest = uuid.NullUUID{} - os.Remove(task.ManifestFile) - task.ManifestFile = "" - task.Status.Code = xferStatus.Code - task.Status.Message = "" - task.CompletionTime = time.Now() - } - return nil -} - -// returns the duration since the task completed (successfully or otherwise), -// or 0 if the task has not completed -func (task TransferTask) Age() time.Duration { - if task.Status.Code == TransferStatusSucceeded || - task.Status.Code == TransferStatusFailed { - return time.Since(task.CompletionTime) - } else { - return time.Duration(0) - } -} - -// returns true if the task has completed (successfully or not), false otherwise -func (task TransferTask) completed() bool { - for _, subtask := range task.Subtasks { - if subtask.TransferStatus.Code != TransferStatusSucceeded && - subtask.TransferStatus.Code != TransferStatusFailed { - return false - } - } - return true -} - -// requests that the task be canceled -func (task *TransferTask) cancel() error { - task.Canceled = true // mark as canceled - - for _, subtask := range task.Subtasks { - subtask.cancel() - } - return nil -} - -// updates the state of a subtask, setting its status as necessary -func (subtask *TransferSubtask) update() error { - var err error - if subtask.Staging.Valid { // we're staging - err = subtask.checkStaging() - } else if subtask.Transfer.Valid { // we're transferring - err = subtask.checkTransfer() - } - return err -} - -// updates the state of a task, setting its status as necessary -func (task *TransferTask) update() error { - var err error - if len(task.Subtasks) == 0 { // new task! - err = task.start() - } else if task.Canceled { // cancellation requested - for _, subtask := range task.Subtasks { - err = subtask.checkCancellation() - } - if task.completed() { - task.CompletionTime = time.Now() - } - } else if task.Manifest.Valid { // we're generating/sending a manifest - err = task.checkManifest() - } else { // update subtasks - - // track subtask failures - var subtaskFailed bool - var failedSubtaskStatus TransferStatus - - // update each subtask and check for failures - subtaskStaging := false - allTransfersSucceeded := true - for _, subtask := range task.Subtasks { - subErr := subtask.update() - // FIXME: vvv is this the right thing to do?? vvv - if subErr != nil { - err = subErr - } - - if subtask.StagingStatus == databases.StagingStatusFailed { - subtaskFailed = true - failedSubtaskStatus.Code = TransferStatusUnknown - failedSubtaskStatus.Message = "task cancelled because of staging failure" - } else if subtask.TransferStatus.Code == TransferStatusFailed { - subtaskFailed = true - failedSubtaskStatus.Code = TransferStatusFailed - failedSubtaskStatus.Message = "task cancelled because of transfer failure" - } - if subtask.TransferStatus.Code != TransferStatusSucceeded { - allTransfersSucceeded = false - } - } - - // if a subtask failed, cancel the task -- otherwise, update the task's - // status based on those of its subtasks - if subtaskFailed { - // overwrite only the error code and message fields - task.Status.Code = failedSubtaskStatus.Code - task.Status.Message = failedSubtaskStatus.Message - task.cancel() - } else { - // accumulate statistics - task.Status.NumFiles = 0 - task.Status.NumFilesTransferred = 0 - task.Status.NumFilesSkipped = 0 - for _, subtask := range task.Subtasks { - if subtask.Staging.Valid { - subtaskStaging = true - } else if subtask.Transfer.Valid { - task.Status.NumFiles += subtask.TransferStatus.NumFiles - task.Status.NumFilesTransferred += subtask.TransferStatus.NumFilesTransferred - task.Status.NumFilesSkipped += subtask.TransferStatus.NumFilesSkipped - } - } - } - - if subtaskStaging && task.Status.NumFiles == 0 { - task.Status = TransferStatus{ - Code: TransferStatusStaging, - } - } else if allTransfersSucceeded { // write a manifest - localEndpoint, err := endpoints.NewEndpoint(config.Service.Endpoint) - if err != nil { - return err - } - - // generate a manifest for the transfer - manifest := task.createManifest() - - // write the manifest to disk and begin transferring it to the - // destination endpoint - var manifestBytes []byte - manifestBytes, err = json.Marshal(manifest) - if err != nil { - return fmt.Errorf("marshalling manifest content: %s", err.Error()) - } - task.ManifestFile = filepath.Join(config.Service.ManifestDirectory, - fmt.Sprintf("manifest-%s.json", task.Id.String())) - manifestFile, err := os.Create(task.ManifestFile) - if err != nil { - return fmt.Errorf("creating manifest file: %s", err.Error()) - } - _, err = manifestFile.Write(manifestBytes) - if err != nil { - return fmt.Errorf("writing manifest file content: %s", err.Error()) - } - err = manifestFile.Close() - if err != nil { - return fmt.Errorf("closing manifest file: %s", err.Error()) - } - - // construct the source/destination file manifest paths - destination, err := databases.NewDatabase(task.UserInfo.Orcid, task.Destination) - if err != nil { - return err - } - fileXfers := []FileTransfer{ - { - SourcePath: task.ManifestFile, - DestinationPath: filepath.Join(task.DestinationFolder, "manifest.json"), - }, - } - - // begin transferring the manifest - destinationEndpoint, err := destination.Endpoint() - if err != nil { - return err - } - task.Manifest.UUID, err = localEndpoint.Transfer(destinationEndpoint, fileXfers) - if err != nil { - return fmt.Errorf("transferring manifest file: %s", err.Error()) - } - - task.Status = TransferStatus{ - Code: TransferStatusFinalizing, - } - task.Manifest.Valid = true - } else { - // the transfer failed, so make sure we cancel it in case it's still - // trying (because e.g. Globus continues trying transfers for ~3 days!!) - task.cancel() - } - } - return err -} - // loads a map of task IDs to tasks from a previously saved file if available, // or creates an empty map if no such file is available or valid func createOrLoadTasks(dataFile string) map[uuid.UUID]TransferTask {