Skip to content

Commit 397f188

Browse files
committed
Merge pull request amplab#211 from alig/master
Improve the RDD <-> Table conversion support
2 parents d3df894 + cec4dac commit 397f188

File tree

9 files changed

+1317
-101
lines changed

9 files changed

+1317
-101
lines changed
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
#!/usr/bin/python
2+
from string import Template
3+
import sys
4+
from generator_utils import *
5+
6+
## This script generates RDDtable.scala
7+
8+
p = sys.stdout
9+
10+
# e.g. createList(1,3, "T[", "]", ",") gives T[1],T[2],T[3]
11+
def createList(start, stop, prefix, suffix="", sep = ",", newlineAfter = 70, indent = 0):
12+
res = ""
13+
oneLine = res
14+
for y in range(start,stop+1):
15+
res += prefix + str(y) + suffix
16+
oneLine += prefix + str(y) + suffix
17+
if y != stop:
18+
res += sep
19+
oneLine += sep
20+
if len(oneLine) > newlineAfter:
21+
res += "\n" + " "*indent
22+
oneLine = ""
23+
return res
24+
25+
### The SparkContext declaration
26+
27+
prefix = """
28+
/*
29+
* Copyright (C) 2012 The Regents of The University California.
30+
* All rights reserved.
31+
*
32+
* Licensed under the Apache License, Version 2.0 (the "License");
33+
* you may not use this file except in compliance with the License.
34+
* You may obtain a copy of the License at
35+
*
36+
* http://www.apache.org/licenses/LICENSE-2.0
37+
*
38+
* Unless required by applicable law or agreed to in writing, software
39+
* distributed under the License is distributed on an "AS IS" BASIS,
40+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
41+
* See the License for the specific language governing permissions and
42+
* limitations under the License.
43+
*/
44+
45+
package shark.api
46+
47+
// *** This file is auto-generated from RDDTable_generator.py ***
48+
49+
import org.apache.spark.rdd.RDD
50+
51+
object RDDTableImplicits {
52+
private type M[T] = ClassManifest[T]
53+
54+
"""
55+
56+
p.write(prefix)
57+
58+
for x in range(2,23):
59+
60+
tableClass = Template(
61+
"""
62+
implicit def rddToTable$num[$tmlist]
63+
(rdd: RDD[($tlist)]): RDDTableFunctions = RDDTable(rdd)
64+
65+
""").substitute(num = x, tmlist = createList(1, x, "T", ": M", ", ", indent=4), tlist = createList(1, x, "T", "", ", ", indent=4))
66+
p.write(tableClass)
67+
68+
prefix = """
69+
}
70+
71+
object RDDTable {
72+
73+
private type M[T] = ClassManifest[T]
74+
private def m[T](implicit m : ClassManifest[T]) = classManifest[T](m)
75+
"""
76+
77+
p.write(prefix)
78+
79+
for x in range(2,23):
80+
81+
tableClass = Template(
82+
"""
83+
def apply[$tmlist]
84+
(rdd: RDD[($tlist)]) = {
85+
val cm = implicitly[Manifest[Seq[Any]]]
86+
val rddSeq: RDD[Seq[_]] = rdd.map(t => t.productIterator.toList.asInstanceOf[Seq[Any]])(cm)
87+
new RDDTableFunctions(rddSeq, Seq($mtlist))
88+
}
89+
90+
""").substitute(tmlist = createList(1, x, "T", ": M", ", ", indent=4), tlist = createList(1, x, "T", "", ", ", indent=4),
91+
mtlist = createList(1, x, "m[T", "]", ", ", indent=4))
92+
p.write(tableClass)
93+
94+
95+
p.write("}\n")
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#!/usr/bin/python
2+
from string import Template
3+
import sys
4+
5+
from generator_utils import *
6+
7+
## This script generates functions sqlRdd for SharkContext.scala
8+
9+
p = sys.stdout
10+
11+
# The SharkContext declarations
12+
for x in range(2,23):
13+
sqlRddFun = Template(
14+
"""
15+
def sqlRdd[$list1](cmd: String):
16+
RDD[Tuple$num[$list2]] = {
17+
new TableRDD$num[$list2](sql2rdd(cmd),
18+
Seq($list3))
19+
}
20+
""").substitute(num = x,
21+
list1 = createList(1, x, "T", ": M", ", ", 80, 4),
22+
list2 = createList(1, x, "T", sep=", ", indent = 4),
23+
list3 = createList(1, x, "m[T", "]", sep=", ", indent = 10))
24+
p.write(sqlRddFun)
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
#!/usr/bin/python
2+
from string import Template
3+
import sys
4+
from generator_utils import *
5+
6+
## This script generates TableRDDGenerated.scala
7+
8+
p = sys.stdout
9+
10+
p.write(
11+
"""
12+
/*
13+
* Copyright (C) 2013 The Regents of The University California.
14+
* All rights reserved.
15+
*
16+
* Licensed under the Apache License, Version 2.0 (the "License");
17+
* you may not use this file except in compliance with the License.
18+
* You may obtain a copy of the License at
19+
*
20+
* http://www.apache.org/licenses/LICENSE-2.0
21+
*
22+
* Unless required by applicable law or agreed to in writing, software
23+
* distributed under the License is distributed on an "AS IS" BASIS,
24+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25+
* See the License for the specific language governing permissions and
26+
* limitations under the License.
27+
*/
28+
29+
30+
31+
package shark.api
32+
33+
// *** This file is auto-generated from TableRDDGenerated_generator.py ***
34+
35+
import org.apache.spark.rdd.RDD
36+
import org.apache.spark.{TaskContext, Partition}
37+
38+
class TableSeqRDD(prev: TableRDD)
39+
extends RDD[Seq[Any]](prev) {
40+
41+
def getSchema = prev.schema
42+
43+
override def getPartitions = prev.getPartitions
44+
45+
override def compute(split: Partition, context: TaskContext): Iterator[Seq[Any]] = {
46+
prev.compute(split, context).map( row =>
47+
(0 until prev.schema.size).map(i => row.getPrimitive(i)) )
48+
}
49+
}
50+
51+
""")
52+
53+
for x in range(1,23):
54+
55+
inner = ""
56+
for y in range(1,x+1):
57+
if y % 3 == 1: inner += " "
58+
inner += Template(" row.getPrimitiveGeneric[T$num1]($num2)").substitute(num1=y, num2=y-1)
59+
if y != x: inner += ","
60+
if y % 3 == 0: inner += "\n"
61+
inner += " ) )\n"
62+
63+
tableClass = Template(
64+
"""
65+
class TableRDD$num[$list](prev: TableRDD,
66+
mans: Seq[ClassManifest[_]])
67+
extends RDD[Tuple$num[$list]](prev) {
68+
def schema = prev.schema
69+
70+
private val tableCols = schema.size
71+
require(tableCols == $num, "Table only has " + tableCols + " columns, expecting $num")
72+
73+
mans.zipWithIndex.foreach{ case (m, i) => if (DataTypes.fromManifest(m) != schema(i).dataType)
74+
throw new IllegalArgumentException(
75+
"Type mismatch on column " + (i + 1) + ", expected " + DataTypes.fromManifest(m) + " got " + schema(i).dataType) }
76+
77+
override def getPartitions = prev.getPartitions
78+
79+
override def compute(split: Partition, context: TaskContext):
80+
Iterator[Tuple$num[$list]] = {
81+
prev.compute(split, context).map( row =>
82+
new Tuple$num[$list](
83+
$innerfatlist
84+
}
85+
}
86+
""").substitute(num = x, list = createList(1, x, "T", "", ", ", indent=4), innerfatlist = inner)
87+
88+
89+
p.write(tableClass)
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#!/usr/bin/python
2+
import sys
3+
4+
# e.g. createList(1,3, "T[", "]", ",") gives T[1],T[2],T[3]
5+
def createList(start, stop, prefix, suffix="", sep = ",", newlineAfter = 70, indent = 0):
6+
res = ""
7+
oneLine = res
8+
for y in range(start,stop+1):
9+
res += prefix + str(y) + suffix
10+
oneLine += prefix + str(y) + suffix
11+
if y != stop:
12+
res += sep
13+
oneLine += sep
14+
if len(oneLine) > newlineAfter:
15+
res += "\n" + " "*indent
16+
oneLine = ""
17+
return res
18+

0 commit comments

Comments
 (0)