-
Notifications
You must be signed in to change notification settings - Fork 334
/
5b-spark.txt
102 lines (80 loc) · 2.81 KB
/
5b-spark.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
spark-1.5.0-bin-hadoop2.4/bin/spark-shell --driver-memory 30G --executor-memory 30G
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
def csv_to_sparse_labpoint(fname:String) : org.apache.spark.rdd.RDD[LabeledPoint] = {
val rdd = sc.textFile(fname).map({ line =>
val vv = line.split(',').map(_.toDouble)
val label = vv(0)
val X = vv.slice(1,vv.size)
val n = X.filter(_!=0).length
var X_ids = Array.fill(n){0}
var X_vals = Array.fill(n){0.0}
var kk = 0
for( k <- 0 to X.length-1) {
if (X(k)!=0) {
X_ids(kk) = k
X_vals(kk) = X(k)
kk = kk + 1
}
}
val features = Vectors.sparse(X.length, X_ids, X_vals)
LabeledPoint(label, features)
})
return rdd
}
val d_train_0 = csv_to_sparse_labpoint("spark-train-10m.csv")
val d_test = csv_to_sparse_labpoint("spark-test-10m.csv")
d_train_0.partitions.size
val d_train = d_train_0.repartition(32)
d_train.partitions.size
d_train.cache()
d_test.cache()
d_test.count()
d_train.count()
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val numTrees = 500
val featureSubsetStrategy = "sqrt"
val impurity = "entropy"
val maxDepth = 20
val maxBins = 50
val now = System.nanoTime
val model = RandomForest.trainClassifier(d_train, numClasses, categoricalFeaturesInfo,
numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
( System.nanoTime - now )/1e9
// aggregation of probabilities rather than votes
// code by Joseph Bradley @jkbradley
// will be included in next Spark release
//
import org.apache.spark.mllib.tree.configuration.FeatureType.Continuous
import org.apache.spark.mllib.tree.model.{DecisionTreeModel, Node}
def softPredict(node: Node, features: Vector): Double = {
if (node.isLeaf) {
if (node.predict.predict == 1.0) node.predict.prob else 1.0 - node.predict.prob
} else {
if (node.split.get.featureType == Continuous) {
if (features(node.split.get.feature) <= node.split.get.threshold) {
softPredict(node.leftNode.get, features)
} else {
softPredict(node.rightNode.get, features)
}
} else {
if (node.split.get.categories.contains(features(node.split.get.feature))) {
softPredict(node.leftNode.get, features)
} else {
softPredict(node.rightNode.get, features)
}
}
}
}
def softPredict2(dt: DecisionTreeModel, features: Vector): Double = {
softPredict(dt.topNode, features)
}
val scoreAndLabels = d_test.map { point =>
val score = model.trees.map(tree => softPredict2(tree, point.features)).sum / model.numTrees
(score, point.label)
}
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
metrics.areaUnderROC()