Skip to content

Commit

Permalink
Merge pull request #68 from unchartedsoftware/add-unicorn-feature
Browse files Browse the repository at this point in the history
Add unicorn feature
  • Loading branch information
kbirk authored Jul 18, 2018
2 parents 67916c8 + a8c1543 commit 695411a
Show file tree
Hide file tree
Showing 7 changed files with 408 additions and 8 deletions.
29 changes: 29 additions & 0 deletions cluster_all.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/bin/bash

DATA_DIR=~/datasets/seed_datasets_current
SCHEMA=/datasetDoc.json
OUTPUT_DATA=clusters/clusters.csv
OUTPUT_SCHEMA=clustersDatasetDoc.json
DATASET_FOLDER_SUFFIX=_dataset
DATASETS=(26_radon_seed 32_wikiqa 60_jester 185_baseball 196_autoMpg 313_spectrometer 38_sick 1491_one_hundred_plants_margin 27_wordLevels 57_hypothyroid 299_libras_move 534_cps_85_wages 1567_poker_hand 22_handgeometry)
HAS_HEADER=1
CLUSTER_FUNCTION=fileupload
REST_ENDPOINT=HTTP://localhost:5000
DATA_SERVER=HTTP://10.108.4.104

for DATASET in "${DATASETS[@]}"
do
echo "--------------------------------------------------------------------------------"
echo " Clustering $DATASET dataset"
echo "--------------------------------------------------------------------------------"
go run cmd/distil-cluster/main.go \
--rest-endpoint="$REST_ENDPOINT" \
--cluster-function="$CLUSTER_FUNCTION" \
--dataset="$DATA_DIR/${DATASET}/TRAIN/dataset_TRAIN" \
--media-path="$DATA_SERVER/${DATASET}" \
--schema="$DATA_DIR/${DATASET}/TRAIN/dataset_TRAIN/$SCHEMA" \
--output="$DATA_DIR/${DATASET}/TRAIN/dataset_TRAIN" \
--output-data="$OUTPUT_DATA" \
--output-schema="$OUTPUT_SCHEMA" \
--has-header=$HAS_HEADER
done
158 changes: 158 additions & 0 deletions cmd/distil-cluster/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package main

import (
"os"
"path"
"runtime"
"strings"

"github.com/pkg/errors"
"github.com/unchartedsoftware/plog"
"github.com/urfave/cli"

"github.com/unchartedsoftware/distil-ingest/feature"
"github.com/unchartedsoftware/distil-ingest/metadata"
"github.com/unchartedsoftware/distil-ingest/rest"
)

func splitAndTrim(arg string) []string {
var res []string
if arg == "" {
return res
}
split := strings.Split(arg, ",")
for _, str := range split {
res = append(res, strings.TrimSpace(str))
}
return res
}

func main() {

runtime.GOMAXPROCS(runtime.NumCPU())

app := cli.NewApp()
app.Name = "distil-cluster"
app.Version = "0.1.0"
app.Usage = "Cluster D3M datasets"
app.UsageText = "distil-cluster --rest-endpoint=<url> --cluster-function=<function> --dataset=<filepath> --output=<filepath>"
app.Flags = []cli.Flag{
cli.StringFlag{
Name: "rest-endpoint",
Value: "",
Usage: "The REST endpoint url",
},
cli.StringFlag{
Name: "cluster-function",
Value: "",
Usage: "The clustering function to use",
},
cli.StringFlag{
Name: "dataset",
Value: "",
Usage: "The dataset source path",
},
cli.StringFlag{
Name: "schema",
Value: "",
Usage: "The schema source path",
},
cli.StringFlag{
Name: "filetype",
Value: "csv",
Usage: "The dataset file type",
},
cli.StringFlag{
Name: "output",
Value: "",
Usage: "The clustering output file path",
},
cli.StringFlag{
Name: "media-path",
Value: "",
Usage: "The path to the folder containing the media subfolder that is accessible for clustering",
},
cli.StringFlag{
Name: "output-schema",
Value: "",
Usage: "The path to use as output for the clustered schema document",
},
cli.StringFlag{
Name: "output-data",
Value: "",
Usage: "The path to use as output for the clustered data",
},
cli.BoolFlag{
Name: "has-header",
Usage: "Whether or not the CSV file has a header row",
},
}
app.Action = func(c *cli.Context) error {
if c.String("rest-endpoint") == "" {
return cli.NewExitError("missing commandline flag `--rest-endpoint`", 1)
}
if c.String("cluster-function") == "" {
return cli.NewExitError("missing commandline flag `--cluster-function`", 1)
}
if c.String("dataset") == "" {
return cli.NewExitError("missing commandline flag `--dataset`", 1)
}

clusterFunction := c.String("cluster-function")
restBaseEndpoint := c.String("rest-endpoint")
datasetPath := c.String("dataset")
mediaPath := c.String("media-path")
outputSchema := c.String("output-schema")
outputData := c.String("output-data")
schemaPath := c.String("schema")
outputFilePath := c.String("output")
hasHeader := c.Bool("has-header")

// initialize REST client
log.Infof("Using REST interface at `%s` ", restBaseEndpoint)
client := rest.NewClient(restBaseEndpoint)

// create feature folder
clusterPath := path.Join(outputFilePath, "clusters")
if dirExists(clusterPath) {
// delete existing data to overwrite with latest
os.RemoveAll(clusterPath)
log.Infof("Deleted data at %s", clusterPath)
}
if err := os.MkdirAll(clusterPath, 0777); err != nil && !os.IsExist(err) {
log.Errorf("%v", err)
return cli.NewExitError(errors.Cause(err), 2)
}
os.Remove(path.Join(outputFilePath, "clusterDatasetDoc.json"))

// create featurizer
featurizer := rest.NewFeaturizer(clusterFunction, client)

// load metadata from original schema
meta, err := metadata.LoadMetadataFromOriginalSchema(schemaPath)
if err != nil {
log.Errorf("%v", err)
return cli.NewExitError(errors.Cause(err), 2)
}

// featurize data
err = feature.ClusterDataset(meta, featurizer, datasetPath, mediaPath, outputFilePath, outputData, outputSchema, hasHeader)
if err != nil {
log.Errorf("%v", err)
return cli.NewExitError(errors.Cause(err), 2)
}

log.Infof("Clustered data written to %s", outputFilePath)

return nil
}
// run app
app.Run(os.Args)
}

func dirExists(path string) bool {
if _, err := os.Stat(path); os.IsNotExist(err) {
return false
}
return true
}
146 changes: 146 additions & 0 deletions feature/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package feature

import (
"bytes"
"encoding/csv"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"strconv"

"github.com/pkg/errors"
"github.com/unchartedsoftware/plog"

"github.com/unchartedsoftware/distil-ingest/metadata"
"github.com/unchartedsoftware/distil-ingest/rest"
)

// ClusterDataset clusters data based on referenced data resources
// in the metadata. The clusters are added as a variable in the metadata.
func ClusterDataset(meta *metadata.Metadata, imageFeaturizer *rest.Featurizer, sourcePath string, mediaPath string, outputFolder string, outputPathData string, outputPathSchema string, hasHeader bool) error {
// find the main data resource
mainDR := meta.GetMainDataResource()

// cluster image columns
log.Infof("adding clusters to schema")
colsToFeaturize := addFeaturesToSchema(meta, mainDR, "_cluster_")

// read the data to process every row
log.Infof("opening data from source")
dataPath := path.Join(sourcePath, mainDR.ResPath)
csvFile, err := os.Open(dataPath)
if err != nil {
return errors.Wrap(err, "failed to open data file")
}
defer csvFile.Close()
reader := csv.NewReader(csvFile)

// initialize csv writer
output := &bytes.Buffer{}
writer := csv.NewWriter(output)

// write the header as needed
if hasHeader {
header := make([]string, len(mainDR.Variables))
for _, v := range mainDR.Variables {
header[v.Index] = v.Name
}
err = writer.Write(header)
if err != nil {
return errors.Wrap(err, "error writing header to output")
}
_, err = reader.Read()
if err != nil {
return errors.Wrap(err, "failed to read header from file")
}
}

// build the list of files to submit for clustering
files := make([]string, 0)
lines := make([][]string, 0)
log.Infof("reading data from source")
for {
line, err := reader.Read()
if err == io.EOF {
break
} else if err != nil {
return errors.Wrap(err, "failed to read line from file")
}
lines = append(lines, line)

// featurize the row as necessary
for index, colDR := range colsToFeaturize {
imagePath := fmt.Sprintf("%s/%s", mediaPath, path.Join(colDR.originalResPath, line[index]))
files = append(files, imagePath)
}
}

// cluster the files
log.Infof("Clustering data with featurizer")
clusteredImages, err := clusterImages(files, imageFeaturizer)
if err != nil {
return errors.Wrap(err, "failed to cluster images using featurizer")
}

// append and output the new clustered data
log.Infof("Adding cluster labels to source data")
for _, l := range lines {
for index, colDR := range colsToFeaturize {
imagePath := fmt.Sprintf("%s/%s", mediaPath, path.Join(colDR.originalResPath, l[index]))
l = append(l, clusteredImages[imagePath])
}

writer.Write(l)
if err != nil {
return errors.Wrap(err, "error storing featured output")
}
}

// output the data
log.Infof("Writing data to output")
dataPathToWrite := path.Join(outputFolder, outputPathData)
writer.Flush()
err = ioutil.WriteFile(dataPathToWrite, output.Bytes(), 0644)
if err != nil {
return errors.Wrap(err, "error writing feature output")
}

// main DR should point to new file
mainDR.ResPath = outputPathData

// output the schema
log.Infof("Writing schema to output")
schemaPathToWrite := path.Join(outputFolder, outputPathSchema)
err = meta.WriteSchema(schemaPathToWrite)

return err
}

func clusterImages(filepaths []string, featurizer *rest.Featurizer) (map[string]string, error) {
feature, err := featurizer.ClusterImages(filepaths)
if err != nil {
return nil, errors.Wrap(err, "failed to cluster images")
}

preds, ok := feature.Image["pred_class"].(map[string]interface{})
if !ok {
return nil, errors.Errorf("image feature objects in unexpected format")
}

clusters := make(map[string]string)
for i, c := range preds {
index, err := strconv.ParseInt(i, 10, 64)
if err != nil {
return nil, errors.Wrap(err, "failed parse file index")
}
cluster, ok := c.(float64)
if !ok {
return nil, errors.Errorf("failed to parse file cluster")
}
clusters[filepaths[index]] = strconv.Itoa(int(cluster))
}

return clusters, nil
}
10 changes: 3 additions & 7 deletions feature/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func FeaturizeDataset(meta *metadata.Metadata, imageFeaturizer *rest.Featurizer,

// featurize image columns
log.Infof("adding features to schema")
colsToFeaturize := addFeaturesToSchema(meta, mainDR)
colsToFeaturize := addFeaturesToSchema(meta, mainDR, "_feature_")

// read the data to process every row
log.Infof("opening data from source")
Expand All @@ -68,10 +68,6 @@ func FeaturizeDataset(meta *metadata.Metadata, imageFeaturizer *rest.Featurizer,
if err != nil {
return errors.Wrap(err, "error writing header to output")
}
}

// skip header
if hasHeader {
_, err = reader.Read()
if err != nil {
return errors.Wrap(err, "failed to read header from file")
Expand Down Expand Up @@ -125,7 +121,7 @@ func FeaturizeDataset(meta *metadata.Metadata, imageFeaturizer *rest.Featurizer,
return err
}

func addFeaturesToSchema(meta *metadata.Metadata, mainDR *metadata.DataResource) map[int]*potentialFeature {
func addFeaturesToSchema(meta *metadata.Metadata, mainDR *metadata.DataResource, namePrefix string) map[int]*potentialFeature {
colsToFeaturize := make(map[int]*potentialFeature)
for _, v := range mainDR.Variables {
if v.RefersTo != nil && v.RefersTo["resID"] != nil {
Expand All @@ -137,7 +133,7 @@ func addFeaturesToSchema(meta *metadata.Metadata, mainDR *metadata.DataResource)
// check if needs to be featurized
if res.CanBeFeaturized() {
// create the new resource to hold the featured output
indexName := fmt.Sprintf("_feature_%s", v.Name)
indexName := fmt.Sprintf("%s%s", namePrefix, v.Name)

// add the feature variable
refVariable := &metadata.Variable{
Expand Down
2 changes: 1 addition & 1 deletion featurize_all.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash

DATA_DIR=~/datasets/seed_datasets_current
SCHEMA=/datasetDoc.json
SCHEMA=/clustersDatasetDoc.json
OUTPUT_DATA=features/features.csv
OUTPUT_SCHEMA=featuresDatasetDoc.json
DATASET_FOLDER_SUFFIX=_dataset
Expand Down
Loading

0 comments on commit 695411a

Please sign in to comment.