-
Notifications
You must be signed in to change notification settings - Fork 334
/
5xb-spark-trainpred.txt
85 lines (64 loc) · 2.72 KB
/
5xb-spark-trainpred.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
spark-1.5.0-bin-hadoop2.4/bin/spark-shell --driver-memory 120G --executor-memory 120G
// adapted from Joseph Bradley @jkbradley https://gist.github.com/jkbradley/1e3cc0b3116f2f615b3f
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.configuration.Strategy
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.rdd.RDD
// Paths
val trainDataPath = "spark1hot-train-0.1m.parquet"
val testDataPath = "spark1hot-test-0.1m.parquet"
// Load DataFrame, and convert to RDD of LabeledPoints
def toLP(df: DataFrame): RDD[LabeledPoint] = {
df.select("label", "features").map { case Row(label: Double, features: Vector) => LabeledPoint(label, features) }.repartition(32)
}
val train = toLP(sqlContext.read.parquet(trainDataPath)).cache()
val test = toLP(sqlContext.read.parquet(testDataPath)).cache()
(train.count(), test.count())
// Train model
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val numTrees = 100
val featureSubsetStrategy = "sqrt"
val impurity = "entropy"
val maxDepth = 20
val maxBins = 100
val now = System.nanoTime
val model = RandomForest.trainClassifier(train, numClasses, categoricalFeaturesInfo,
numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
val elapsed = ( System.nanoTime - now )/1e9
elapsed
// Compute soft predictions. For spark.mllib trees, this works for binary classification.
// Spark 1.5 will include it for multiclass under the spark.ml API.
//
import org.apache.spark.mllib.tree.configuration.FeatureType.Continuous
import org.apache.spark.mllib.tree.model.{DecisionTreeModel, Node}
def softPredict(node: Node, features: Vector): Double = {
if (node.isLeaf) {
if (node.predict.predict == 1.0) node.predict.prob else 1.0 - node.predict.prob
} else {
if (node.split.get.featureType == Continuous) {
if (features(node.split.get.feature) <= node.split.get.threshold) {
softPredict(node.leftNode.get, features)
} else {
softPredict(node.rightNode.get, features)
}
} else {
if (node.split.get.categories.contains(features(node.split.get.feature))) {
softPredict(node.leftNode.get, features)
} else {
softPredict(node.rightNode.get, features)
}
}
}
}
// Compute AUC
val scoreAndLabels = test.map { point =>
val score = model.trees.map(tree => softPredict(tree.topNode, point.features)).sum / model.numTrees
(score, point.label)
}
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
metrics.areaUnderROC()