Skip to content

Commit

Permalink
A bit of reorganization and generalization of database endpoints.
Browse files Browse the repository at this point in the history
  • Loading branch information
jeff-cohere committed Nov 7, 2024
1 parent 5c81e7c commit c651ac9
Show file tree
Hide file tree
Showing 10 changed files with 638 additions and 557 deletions.
6 changes: 4 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,10 @@ func validateEndpoints(endpoints map[string]endpointConfig) error {

func validateDatabases(databases map[string]databaseConfig) error {
for name, db := range databases {
if db.Endpoint == "" {
return fmt.Errorf("No endpoint given for database '%s'", name)
if db.Endpoint == "" && len(db.Endpoints) == 0 {
return fmt.Errorf("No endpoints given for database '%s'", name)
} else if db.Endpoint != "" && len(db.Endpoints) > 0 {
return fmt.Errorf("Database '%s' may have EITHER endpoint OR endpoints specified, but not both", name)
}
}
return nil
Expand Down
8 changes: 6 additions & 2 deletions config/database_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ type databaseConfig struct {
Name string `yaml:"name"`
// the name of the organization hosting the database
Organization string `yaml:"organization"`
// the name of an endpoint for this database
Endpoint string `yaml:"endpoint"`
// if set, the name of the single endpoint available to this database
// (only one of Endpoint and Endpoints may be set)
Endpoint string `yaml:"endpoint,omitempty"`
// if set, a set of endpoints assigned functional names, available to thi
// database (only one of Endpoint and Endpoints may be set)
Endpoints map[string]string `yaml:"endpoints,omitempty"`
}
12 changes: 9 additions & 3 deletions databases/databases.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"github.com/google/uuid"

"github.com/kbase/dts/endpoints"
"github.com/kbase/dts/frictionless"
)

Expand Down Expand Up @@ -96,8 +95,6 @@ type Database interface {
StageFiles(fileIds []string) (uuid.UUID, error)
// returns the status of a given staging operation
StagingStatus(id uuid.UUID) (StagingStatus, error)
// returns the endpoint associated with this database
Endpoint() (endpoints.Endpoint, error)
// returns the local username associated with the given Orcid ID
LocalUser(orcid string) (string, error)
}
Expand Down Expand Up @@ -141,6 +138,15 @@ func (e InvalidSearchParameter) Error() string {
return fmt.Sprintf("Invalid search parameter for database %s: %s", e.Database, e.Message)
}

// this error type is returned when a database's endpoint configuration is invalid
type InvalidEndpointsError struct {
Database, Message string
}

func (e InvalidEndpointsError) Error() string {
return fmt.Sprintf("Invalid endpoint configuration for database %s: %s", e.Database, e.Message)
}

// we maintain a table of database instances, identified by their names
var allDatabases = make(map[string]Database)

Expand Down
20 changes: 15 additions & 5 deletions databases/jdp/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/kbase/dts/config"
"github.com/kbase/dts/credit"
"github.com/kbase/dts/databases"
"github.com/kbase/dts/endpoints"
"github.com/kbase/dts/frictionless"
)

Expand Down Expand Up @@ -359,6 +358,14 @@ func NewDatabase(orcid string) (databases.Database, error) {
}
}

// make sure we are using only a single endpoint
if config.Databases["jdp"].Endpoint == "" {
return nil, databases.InvalidEndpointsError{
Database: "jdp",
Message: "The JGI data portal should only have a single endpoint configured.",
}
}

return &Database{
Id: "jdp",
Orcid: orcid,
Expand Down Expand Up @@ -641,6 +648,10 @@ func (db *Database) Search(params databases.SearchParameters) (databases.SearchR
}

func (db *Database) Resources(fileIds []string) ([]frictionless.DataResource, error) {
// the JDP only uses a single Globus endpoint, so we can associate all
// resources with this endpoint
resourceEndpoint := config.Databases["jdp"].Endpoint

// strip the "JDP:" prefix from our files and create a mapping from IDs to
// their original order so we can hand back metadata accordingly
strippedFileIds := make([]string, len(fileIds))
Expand Down Expand Up @@ -728,6 +739,9 @@ func (db *Database) Resources(fileIds []string) ([]frictionless.DataResource, er
// NOTE: solution
resources[index].Format = formatFromFileName(resources[index].Path)
resources[index].MediaType = mimeTypeFromFormatAndTypes(resources[index].Format, []string{})

// set the endpoint for the resource
resources[index].Endpoint = resourceEndpoint
}
return resources, err
}
Expand Down Expand Up @@ -825,10 +839,6 @@ func (db *Database) StagingStatus(id uuid.UUID) (databases.StagingStatus, error)
}
}

func (db *Database) Endpoint() (endpoints.Endpoint, error) {
return endpoints.NewEndpoint(config.Databases[db.Id].Endpoint)
}

func (db *Database) LocalUser(orcid string) (string, error) {
// no current mechanism for this
return "localuser", nil
Expand Down
6 changes: 0 additions & 6 deletions databases/kbase/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ import (
"github.com/google/uuid"

"github.com/kbase/dts/auth"
"github.com/kbase/dts/config"
"github.com/kbase/dts/databases"
"github.com/kbase/dts/endpoints"
"github.com/kbase/dts/frictionless"
)

Expand Down Expand Up @@ -74,10 +72,6 @@ func (db *Database) StagingStatus(id uuid.UUID) (databases.StagingStatus, error)
return databases.StagingStatusUnknown, err
}

func (db *Database) Endpoint() (endpoints.Endpoint, error) {
return endpoints.NewEndpoint(config.Databases[db.Id].Endpoint)
}

func (db *Database) LocalUser(orcid string) (string, error) {
// for KBase user federation, we rely on a table maintained by our KBase
// auth server proxy
Expand Down
21 changes: 16 additions & 5 deletions databases/nmdc/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/kbase/dts/config"
"github.com/kbase/dts/credit"
"github.com/kbase/dts/databases"
"github.com/kbase/dts/endpoints"
"github.com/kbase/dts/frictionless"
)

Expand Down Expand Up @@ -220,6 +219,22 @@ func NewDatabase(orcid string) (databases.Database, error) {
}
*/

// check for "nersc" and "emsl" Globus endpoints
if config.Databases["nmdc"].Endpoint != "" {
return nil, databases.InvalidEndpointsError{
Database: "nmdc",
Message: "NMDC requires \"nersc\" and \"emsl\" endpoints to be specified",
}
}
for _, functionalName := range []string{"nersc", "esml"} {
if _, found := config.Databases["nmdc"].Endpoints[functionalName]; !found {
return nil, databases.InvalidEndpointsError{
Database: "nmdc",
Message: fmt.Sprintf("Could not find \"%s\" endpoint for NMDC database", functionalName),
}
}
}

return &Database{
Id: "nmdc",
Orcid: orcid,
Expand Down Expand Up @@ -811,10 +826,6 @@ func (db *Database) StagingStatus(id uuid.UUID) (databases.StagingStatus, error)
return databases.StagingStatusSucceeded, nil
}

func (db *Database) Endpoint() (endpoints.Endpoint, error) {
return endpoints.NewEndpoint(config.Databases[db.Id].Endpoint)
}

func (db *Database) LocalUser(orcid string) (string, error) {
// no current mechanism for this
return "localuser", nil
Expand Down
2 changes: 2 additions & 0 deletions frictionless/frictionless.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ type DataResource struct {
Sources []DataSource `json:"sources,omitempty"`
// a title or label for the resource (optional)
Title string `json:"title,omitempty"`
// the name of the endpoint at which this resource is accessed (not exposed to JSON)
Endpoint string
}

// call this to get a string containing the name of the hashing algorithm used
Expand Down
203 changes: 203 additions & 0 deletions tasks/subtask.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
// Copyright (c) 2023 The KBase Project and its Contributors
// Copyright (c) 2023 Cohere Consulting, LLC
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
// of the Software, and to permit persons to whom the Software is furnished to do
// so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package tasks

import (
"path/filepath"

"github.com/google/uuid"

"github.com/kbase/dts/auth"
"github.com/kbase/dts/databases"
"github.com/kbase/dts/endpoints"
)

// This type tracks subtasks within a transfer (e.g. files transferred from
// multiple endpoints attached to a single source/destination database pair).
// It holds multiple (possibly null) UUIDs corresponding to different
// states in the file transfer lifecycle
type TransferSubtask struct {
Destination string // name of destination database (in config)
DestinationEndpoint string // name of destination database (in config)
DestinationFolder string // folder path to which files are transferred
Resources []DataResource // Frictionless DataResources for files
Source string // name of source database (in config)
SourceEndpoint string // name of source endpoint (in config)
Staging uuid.NullUUID // staging UUID (if any)
StagingStatus databases.StagingStatus // staging status
Transfer uuid.NullUUID // file transfer UUID (if any)
TransferStatus TransferStatus // status of file transfer operation
UserInfo auth.UserInfo // info about user requesting transfer
}

func (subtask *TransferSubtask) start() error {
// are the files already staged? (only works for public data)
sourceEndpoint, err := endpoints.NewEndpoint(subtask.SourceEndpoint)
if err != nil {
return err
}
staged, err := sourceEndpoint.FilesStaged(subtask.Resources)
if err != nil {
return err
}

if staged {
err = subtask.beginTransfer()
} else {
// tell the source DB to stage the files, stash the task, and return
// its new ID
source, err := databases.NewDatabase(subtask.UserInfo.Orcid, subtask.Source)
if err != nil {
return err
}
fileIds := make([]string, len(subtask.Resources))
for i, resource := range subtask.Resources {
fileIds[i] = resource.Id
}
subtask.Staging.UUID, err = source.StageFiles(fileIds)
subtask.Staging.Valid = true
if err != nil {
return err
}
subtask.TransferStatus = TransferStatus{
Code: TransferStatusStaging,
NumFiles: len(subtask.Resources),
}
}
return err
}

// initiates a file transfer on a set of staged files
func (subtask *TransferSubtask) beginTransfer() error {
fileXfers := make([]FileTransfer, len(subtask.Resources))
for i, resource := range subtask.Resources {
destinationPath := filepath.Join(subtask.DestinationFolder, resource.Path)
fileXfers[i] = FileTransfer{
SourcePath: resource.Path,
DestinationPath: destinationPath,
Hash: resource.Hash,
}
}

// initiate the transfer
sourceEndpoint, err := endpoints.NewEndpoint(subtask.SourceEndpoint)
if err != nil {
return err
}
destinationEndpoint, err := endpoints.NewEndpoint(subtask.DestinationEndpoint)
if err != nil {
return err
}
subtask.Transfer.UUID, err = sourceEndpoint.Transfer(destinationEndpoint, fileXfers)
if err != nil {
return err
}

subtask.TransferStatus = TransferStatus{
Code: TransferStatusActive,
NumFiles: len(subtask.Resources),
}
subtask.Staging = uuid.NullUUID{}
subtask.Transfer.Valid = true
return nil
}

// checks whether files for a subtask are finished staging and, if so,
// initiates the transfer process
func (subtask *TransferSubtask) checkStaging() error {
source, err := databases.NewDatabase(subtask.UserInfo.Orcid, subtask.Source)
if err != nil {
return err
}
// check with the database first to see whether the files are staged
subtask.StagingStatus, err = source.StagingStatus(subtask.Staging.UUID)
if err != nil {
return err
}

if subtask.StagingStatus == databases.StagingStatusSucceeded { // staged!
return subtask.beginTransfer() // move along
}
return nil
}

// checks whether files for a task are finished transferring and, if so,
// initiates the generation of the file manifest
func (subtask *TransferSubtask) checkTransfer() error {
// has the data transfer completed?
sourceEndpoint, err := endpoints.NewEndpoint(subtask.SourceEndpoint)
if err != nil {
return err
}
xferStatus, err := sourceEndpoint.Status(subtask.Transfer.UUID)
if err != nil {
return err
}
if xferStatus.Code == TransferStatusSucceeded ||
xferStatus.Code == TransferStatusFailed { // transfer finished
subtask.Transfer = uuid.NullUUID{}
}
return nil
}

func (subtask *TransferSubtask) cancel() error {
if subtask.Transfer.Valid { // we're transferring
// fetch the source endpoint
endpoint, err := endpoints.NewEndpoint(subtask.SourceEndpoint)
if err != nil {
return err
}

// request that the task be canceled using its UUID
return endpoint.Cancel(subtask.Transfer.UUID)
}
return nil
}

// updates the status of a canceled subtask depending on where it is in its
// lifecycle
func (subtask *TransferSubtask) checkCancellation() error {
if subtask.Transfer.Valid {
endpoint, err := endpoints.NewEndpoint(subtask.SourceEndpoint)
if err != nil {
return err
}
subtask.TransferStatus, err = endpoint.Status(subtask.Transfer.UUID)
return err
} else {
// at any other point in the lifecycle, terminate the task
subtask.TransferStatus.Code = TransferStatusFailed
subtask.TransferStatus.Message = "Task canceled at user request"
}
return nil
}

// updates the state of a subtask, setting its status as necessary
func (subtask *TransferSubtask) update() error {
var err error
if subtask.Staging.Valid { // we're staging
err = subtask.checkStaging()
} else if subtask.Transfer.Valid { // we're transferring
err = subtask.checkTransfer()
}
return err
}
Loading

0 comments on commit c651ac9

Please sign in to comment.