Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add segmentation pipeline #3106

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
d2ab220
Initial addition of segmentation route.
phorne-uncharted Nov 5, 2021
3cf15f9
Added working segmentation route as well as a placeholder band combin…
phorne-uncharted Nov 12, 2021
d126852
Added config param to disable segmentation layer.
phorne-uncharted Nov 16, 2021
1018d1e
Merge main
phorne-uncharted Nov 16, 2021
be7dc5f
Initial changes to run segmentation as a model search.
phorne-uncharted Jul 11, 2022
5122b29
Added fitted pipeline id to the output of a fully specified pipeline …
phorne-uncharted Jul 15, 2022
2b4aad3
Solution id for fully specified pipelines now captured as part of the…
phorne-uncharted Jul 15, 2022
efef6d9
Segmentation predictions updated to output the same data as the model…
phorne-uncharted Jul 19, 2022
a451847
Segmentation predictions now write the segmentation layer to disk.
phorne-uncharted Jul 20, 2022
173c808
User can specify task properly now.
phorne-uncharted Jul 21, 2022
c2961e9
Added postgres schema version check on startup.
phorne-uncharted Jul 22, 2022
d7f15b3
Added storage of request task to solution storage.
phorne-uncharted Jul 22, 2022
50bf17f
Persisted fitted solutions now also save the task.
phorne-uncharted Jul 22, 2022
a0f80ad
Prediction task uses the model task if it was saved.
phorne-uncharted Jul 22, 2022
ac6d853
Segmentation pipelines are no longer cached as they need to be treate…
phorne-uncharted Jul 25, 2022
38f1743
Adds parameters for GPU batch size and fixes segment map opacity
cdbethune Aug 19, 2022
b4207b9
updates to latest distil-compute
cdbethune Aug 19, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/compute/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func filterData(client *compute.Client, ds *api.Dataset, filterParams *api.Filte

// output the filtered results as the data in the filtered dataset
_, outputDataFile := getPreFilteringOutputDataFile(outputFolder)
err = util.CopyFile(filteredData, outputDataFile)
err = util.CopyFile(filteredData.ResultURI, outputDataFile)
if err != nil {
return "", nil, err
}
Expand Down
37 changes: 27 additions & 10 deletions api/compute/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,17 @@ type QueueItem struct {

// QueueResponse represents the result from processing a queue item.
type QueueResponse struct {
Output interface{}
Output *PipelineOutput
Error error
}

// PipelineOutput represents an output from executing a queued pipeline.
type PipelineOutput struct {
SolutionID string
ResultURI string
FittedSolutionID string
}

// Queue uses a buffered channel to queue tasks and provides the result via channels.
type Queue struct {
mu sync.RWMutex
Expand Down Expand Up @@ -194,6 +201,9 @@ func (q *Queue) Done() {
// InitializeCache sets up an empty cache or if a source file provided, reads
// the cache from the source file.
func InitializeCache(sourceFile string, readEnabled bool) error {
// register the output type for the cache!
gob.Register(&PipelineOutput{})

var c *gc.Cache
if util.FileExists(sourceFile) {
b, err := ioutil.ReadFile(sourceFile)
Expand Down Expand Up @@ -234,7 +244,7 @@ func InitializeQueue(config *env.Config) {

// SubmitPipeline executes pipelines using the client and returns the result URI.
func SubmitPipeline(client *compute.Client, datasets []string, datasetsProduce []string, searchRequest *pipeline.SearchSolutionsRequest,
fullySpecifiedStep *description.FullySpecifiedPipeline, allowedValueTypes []string, shouldCache bool) (string, error) {
fullySpecifiedStep *description.FullySpecifiedPipeline, allowedValueTypes []string, shouldCache bool) (*PipelineOutput, error) {

request := compute.NewExecPipelineRequest(datasets, datasetsProduce, fullySpecifiedStep.Pipeline)

Expand All @@ -254,12 +264,12 @@ func SubmitPipeline(client *compute.Client, datasets []string, datasetsProduce [
if cache.readEnabled {
if shouldCache {
if err != nil {
return "", err
return nil, err
}
entry, found := cache.cache.Get(hashedPipelineUniqueKey)
if found {
log.Infof("returning cached entry for pipeline")
return entry.(string), nil
return entry.(*PipelineOutput), nil
}
}
} else {
Expand All @@ -268,25 +278,24 @@ func SubmitPipeline(client *compute.Client, datasets []string, datasetsProduce [
// get equivalency key for enqueuing
hashedPipelineEquivKey, err := queueTask.hashEquivalent()
if err != nil {
return "", err
return nil, err
}

resultChan := queue.Enqueue(hashedPipelineEquivKey, queueTask)
defer queue.Done()

result := <-resultChan
if result.Error != nil {
return "", result.Error
return nil, result.Error
}

datasetURI := result.Output.(string)
cache.cache.Set(hashedPipelineUniqueKey, datasetURI, gc.DefaultExpiration)
cache.cache.Set(hashedPipelineUniqueKey, result.Output, gc.DefaultExpiration)
err = cache.PersistCache()
if err != nil {
log.Warnf("error persisting cache: %v", err)
}

return datasetURI, nil
return result.Output, nil
}

func runPipelineQueue(queue *Queue) {
Expand Down Expand Up @@ -316,6 +325,8 @@ func runPipelineQueue(queue *Queue) {
// listen for completion
var errPipeline error
var datasetURI string
var fittedSolutionID string
var solutionID string
err = pipelineTask.request.Listen(func(status compute.ExecPipelineStatus) {
// check for error
if status.Error != nil {
Expand All @@ -324,6 +335,8 @@ func runPipelineQueue(queue *Queue) {

if status.Progress == compute.RequestCompletedStatus {
datasetURI = status.ResultURI
fittedSolutionID = status.FittedSolutionID
solutionID = status.SolutionID
}
})
if err != nil {
Expand All @@ -342,7 +355,11 @@ func runPipelineQueue(queue *Queue) {

datasetURI = strings.Replace(datasetURI, "file://", "", -1)

queueTask.returnResult(&QueueResponse{Output: datasetURI})
queueTask.returnResult(&QueueResponse{&PipelineOutput{
ResultURI: datasetURI,
FittedSolutionID: fittedSolutionID,
SolutionID: solutionID,
}, nil})
}

log.Infof("ending queue processing")
Expand Down
3 changes: 2 additions & 1 deletion api/compute/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type searchResult struct {
type pipelineSearchContext struct {
searchID string
dataset string
task []string
storageName string
sourceDatasetURI string
trainDatasetURI string
Expand Down Expand Up @@ -278,7 +279,7 @@ func (s *SolutionRequest) dispatchSolutionSearchPipeline(statusChan chan Solutio
if ok {
// reformat result to have one row per d3m index since confidences
// can produce one row / class
resultURI, err = reformatResult(resultURI)
resultURI, err = reformatResult(resultURI, s.TargetFeature.HeaderName)
if err != nil {
return nil, err
}
Expand Down
58 changes: 58 additions & 0 deletions api/compute/segment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
//
// Copyright © 2021 Uncharted Software Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package compute

import (
"strconv"

"github.com/pkg/errors"

"github.com/uncharted-distil/distil/api/util/imagery"
)

// BuildSegmentationImage uses the raw segmentation output to build an image layer.
func BuildSegmentationImage(rawSegmentation [][]interface{}) (map[string][]byte, error) {
// output is mapping of d3m index to new segmentation layer
output := map[string][]byte{}
// need to output all the masks as images
for _, r := range rawSegmentation[1:] {
// create the image that captures the mask
d3mIndex := r[0].(string)
rawMask := r[1].([]interface{})
rawFloats := make([][]float64, len(rawMask))
for i, f := range rawMask {
dataF := f.([]interface{})
nestedFloats := make([]float64, len(dataF))
for j, nf := range dataF {
fp, err := strconv.ParseFloat(nf.(string), 64)
if err != nil {
return nil, errors.Wrapf(err, "unable to parse mask")
}
nestedFloats[j] = fp
}
rawFloats[i] = nestedFloats
}

filter := imagery.ConfidenceMatrixToImage(rawFloats, imagery.MagmaColorScale, uint8(255))
imageBytes, err := imagery.ImageToPNG(filter)
if err != nil {
return nil, err
}
output[d3mIndex] = imageBytes
}

return output, nil
}
Loading