Skip to content

Commit

Permalink
Workflow test to train a model with features of all feature types (#304)
Browse files Browse the repository at this point in the history
  • Loading branch information
tovbinm authored May 2, 2019
1 parent 6aba38e commit 72dff42
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 13 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ gradlew.bat

.env
.gradletasknamecache
.metals

derby.log
metastore_db/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,11 @@ abstract class Splitter(val uid: String) extends SplitterParams {
require(summary.nonEmpty, "Cannot call validationPrepare until preValidationPrepare has been called")

/**
* Add a splitter parameter to name the label column
* @param label
* @return
*/
* Add a splitter parameter to name the label column
*
* @param label
* @return
*/
def withLabelColumnName(label: String): Splitter = {
if (!isSet(labelColumnName)) {
set(labelColumnName, label)
Expand Down
64 changes: 55 additions & 9 deletions core/src/test/scala/com/salesforce/op/OpWorkflowTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,17 @@ import com.salesforce.op.readers._
import com.salesforce.op.stages.base.unary._
import com.salesforce.op.stages.impl.classification._
import com.salesforce.op.stages.impl.preparators.SanityChecker
import com.salesforce.op.stages.impl.selector.ModelSelectorNames.EstimatorType
import com.salesforce.op.stages.impl.tuning._
import com.salesforce.op.test.{Passenger, PassengerSparkFixtureTest, TestFeatureBuilder}
import com.salesforce.op.testkit.{RandomList, RandomText}
import com.salesforce.op.utils.spark.RichDataset._
import com.salesforce.op.utils.spark.{OpVectorColumnMetadata, OpVectorMetadata}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.param.{BooleanParam, ParamMap}
import org.apache.spark.ml.param.BooleanParam
import org.apache.spark.ml.tuning.ParamGridBuilder
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{DoubleType, StringType}
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.joda.time.DateTime
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
Expand All @@ -76,6 +76,7 @@ class OpWorkflowTest extends FlatSpec with PassengerSparkFixtureTest {
private lazy val workflowLocation2 = tempDir + "/op-workflow-test-model-2-" + DateTime.now().getMillis
private lazy val workflowLocation3 = tempDir + "/op-workflow-test-model-3-" + DateTime.now().getMillis
private lazy val workflowLocation4 = tempDir + "/op-workflow-test-model-4-" + DateTime.now().getMillis
private lazy val workflowLocation5 = tempDir + "/op-workflow-test-model-5-" + DateTime.now().getMillis

Spec[OpWorkflow] should "correctly trace the history of stages needed to create the final output" in {
workflow.getResultFeatures() shouldBe Array(whyNotNormed, weightNormed)
Expand Down Expand Up @@ -495,9 +496,8 @@ class OpWorkflowTest extends FlatSpec with PassengerSparkFixtureTest {
val rdd = ds.rdd
val f = (f1 + f2 + f3).fillMissingWithMean().zNormalize()
val wf = new OpWorkflow().setResultFeatures(f).setInputRDD(rdd)
val modelLocation = checkpointDir + "/setInputRDD"
wf.train().save(modelLocation)
val scores = wf.loadModel(modelLocation).setInputRDD(rdd).score()
wf.train().save(workflowLocation4)
val scores = wf.loadModel(workflowLocation4).setInputRDD(rdd).score()
scores.collect(f) shouldEqual Seq.fill(3)(0.0.toRealNN)
}

Expand All @@ -509,18 +509,64 @@ class OpWorkflowTest extends FlatSpec with PassengerSparkFixtureTest {
))
val f = (f1 + f2 + f3).fillMissingWithMean().zNormalize()
val wf = new OpWorkflow().setResultFeatures(f).setInputDataset(ds)
val modelLocation = checkpointDir + "/setInputDataset"
wf.train().save(modelLocation)
val scores = wf.loadModel(modelLocation).setInputDataset(ds).score()
wf.train().save(workflowLocation5)
val scores = wf.loadModel(workflowLocation5).setInputDataset(ds).score()
scores.collect(f) shouldEqual Seq.fill(3)(0.0.toRealNN)
}

it should "train a model with features of all feature types, save, load and score it" in {
// Generate features of all possible types
val numOfRows = 100
val (ds, features) = TestFeatureBuilder.random(numOfRows)(
// HashingTF transformer used in vectorization of text lists does not handle nulls well,
// therefore setting minLen = 1 for now
textLists = RandomList.ofTexts(RandomText.strings(0, 10), minLen = 1, maxLen = 10).limit(numOfRows)
)
// Prepare the label feature
val label = features.find(_.isSubtypeOf[RealNN]).head.asInstanceOf[Feature[RealNN]].transformWith(new Labelizer)

// Transmogrify all the features using default settings
val featureVector = features.transmogrify()

// Create a binary classification model selector with a single model type for simplicity
val prediction = BinaryClassificationModelSelector.withTrainValidationSplit(
modelsAndParameters = Seq(new OpLogisticRegression() -> new ParamGridBuilder().build())
).setInput(label, featureVector).getOutput()

// Use id feature as row key
val id = features.find(_.isSubtypeOf[ID]).head.asInstanceOf[Feature[ID]].name
val keyFn = (r: Row) => r.getAs[String](id)
val workflow = new OpWorkflow().setInputDataset(ds, keyFn).setResultFeatures(prediction)
// Train, score and save the model
val model = workflow.train()
val expectedScoresDF = model.score()
val expectedScores = expectedScoresDF.select(prediction.name, KeyFieldName).sort(KeyFieldName).collect()
model.save(workflowLocation)

// Load and score the model
val loaded = workflow.loadModel(workflowLocation)
val scoresDF = loaded.setInputDataset(ds, keyFn).score()
val scores = scoresDF.select(prediction.name, KeyFieldName).sort(KeyFieldName).collect()

// Compare the scores produced by the loaded model vs original model
scores should contain theSameElementsAs expectedScores

// TODO - once supported, load the model without the workflow and score it as well
val error = intercept[RuntimeException](OpWorkflowModel.load(workflowLocation))
error.getMessage should startWith("Failed to load Workflow from path")
}

}

class NoUidTest extends UnaryTransformer[Real, Real]("blarg", UID[NoUidTest]) {
def transformFn: Real => Real = identity
}

class Labelizer(uid: String = UID[Labelizer]) extends UnaryTransformer[RealNN, RealNN]("labelizer", uid) {
override def outputIsResponse: Boolean = true
def transformFn: RealNN => RealNN = v => v.value.map(x => if (x > 0.0) 1.0 else 0.0).toRealNN(0.0)
}

class NormEstimatorTest[I <: Real](uid: String = UID[NormEstimatorTest[_]])
(implicit tti: TypeTag[I], ttiv: TypeTag[I#Value])
extends UnaryEstimator[I, Real](operationName = "minMaxNorm", uid = uid) {
Expand Down

0 comments on commit 72dff42

Please sign in to comment.